#!/usr/bin/env python3
-from numbers import Number
-from typing import Callable
+from typing import Callable, SupportsFloat
import constants
self.from_canonical_f = from_canonical
self.unit = unit
- def to_canonical(self, n: Number) -> Number:
+ def to_canonical(self, n: SupportsFloat) -> SupportsFloat:
return self.to_canonical_f(n)
- def from_canonical(self, n: Number) -> Number:
+ def from_canonical(self, n: SupportsFloat) -> SupportsFloat:
return self.from_canonical_f(n)
def unit_suffix(self) -> str:
}
-def convert(magnitude: Number, from_thing: str, to_thing: str) -> float:
+def convert(magnitude: SupportsFloat, from_thing: str, to_thing: str) -> float:
src = conversion_catalog.get(from_thing, None)
dst = conversion_catalog.get(to_thing, None)
if src is None or dst is None:
return _convert(magnitude, src, dst)
-def _convert(magnitude: Number, from_unit: Converter, to_unit: Converter) -> float:
+def _convert(
+ magnitude: SupportsFloat, from_unit: Converter, to_unit: Converter
+) -> float:
canonical = from_unit.to_canonical(magnitude)
converted = to_unit.from_canonical(canonical)
return float(converted)
import hashlib
import logging
import os
-from typing import Any, Optional
+from typing import Any, Dict, Optional, Set
logger = logging.getLogger(__name__)
if not file_utils.does_directory_exist(directory):
raise ValueError(directory)
self.directory = directory
- self.md5_by_filename = {}
- self.mtime_by_filename = {}
+ self.md5_by_filename: Dict[str, str] = {}
+ self.mtime_by_filename: Dict[str, float] = {}
self._update()
def _update(self):
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
+ assert mtime
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
"""
def __init__(self, directory: str):
- self.all_md5s = set()
+ self.all_md5s: Set[str] = set()
super().__init__(directory)
def _update_file(self, filename: str, mtime: Optional[float] = None):
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
+ assert mtime
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
self.mtime_by_filename[filename] = mtime
self.md5_by_filename[filename] = md5
self.all_md5s.add(md5)
- def apply(self, item: Any) -> bool:
+ def apply(self, item: Any, ignored_filename: str = None) -> bool:
+ assert not ignored_filename
self._update()
mem_hash = hashlib.md5()
mem_hash.update(item)
return lst
-def population_counts(lst: List[Any]) -> Counter:
+def population_counts(lst: Sequence[Any]) -> Counter:
"""
Return a population count mapping for the list (i.e. the keys are
list items and the values are the number of occurrances of that
class LockFileContents:
pid: int
commandline: str
- expiration_timestamp: float
+ expiration_timestamp: Optional[float]
class LockFile(object):
# Has the lock expiration expired?
if contents.expiration_timestamp is not None:
now = datetime.datetime.now().timestamp()
- if now > contents.expiration_datetime:
+ if now > contents.expiration_timestamp:
msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
logger.warning(msg)
self.release()
import threading
import sys
import time
+from typing import Optional
import cloudpickle # type: ignore
import psutil # type: ignore
import argparse_utils
import bootstrap
import config
+from stopwatch import Timer
from thread_utils import background_thread
time.sleep(1.0)
+def cleanup_and_exit(
+ thread: Optional[threading.Thread],
+ stop_thread: Optional[threading.Event],
+ exit_code: int,
+) -> None:
+ if stop_thread is not None:
+ stop_thread.set()
+ assert thread is not None
+ thread.join()
+ sys.exit(exit_code)
+
+
@bootstrap.initialize
def main() -> None:
in_file = config.config['code_file']
out_file = config.config['result_file']
+ thread = None
stop_thread = None
if config.config['watch_for_cancel']:
(thread, stop_thread) = watch_for_cancel()
except Exception as e:
logger.exception(e)
logger.critical(f'Problem reading {in_file}. Aborting.')
- stop_thread.set()
- sys.exit(-1)
+ cleanup_and_exit(thread, stop_thread, 1)
logger.debug(f'Deserializing {in_file}.')
try:
except Exception as e:
logger.exception(e)
logger.critical(f'Problem deserializing {in_file}. Aborting.')
- stop_thread.set()
- sys.exit(-1)
+ cleanup_and_exit(thread, stop_thread, 2)
logger.debug('Invoking user code...')
- start = time.time()
- ret = fun(*args, **kwargs)
- end = time.time()
- logger.debug(f'User code took {end - start:.1f}s')
+ with Timer() as t:
+ ret = fun(*args, **kwargs)
+ logger.debug(f'User code took {t():.1f}s')
logger.debug('Serializing results')
try:
except Exception as e:
logger.exception(e)
logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
- stop_thread.set()
- sys.exit(-1)
+ cleanup_and_exit(thread, stop_thread, 3)
logger.debug(f'Writing {out_file}.')
try:
except Exception as e:
logger.exception(e)
logger.critical(f'Error writing {out_file}. Aborting.')
- stop_thread.set()
- sys.exit(-1)
-
- if stop_thread is not None:
- stop_thread.set()
- thread.join()
+ cleanup_and_exit(thread, stop_thread, 4)
+ cleanup_and_exit(thread, stop_thread, 0)
if __name__ == '__main__':
e.g.
- with timer.Timer() as t:
+ with stopwatch.Timer() as t:
do_the_thing()
walltime = t()
import statistics
import time
import tempfile
-from typing import Callable, Dict, List
+from typing import Any, Callable, Dict, List, Optional
import unittest
import warnings
pass
@abstractmethod
- def load_performance_data(self) -> Dict[str, List[float]]:
+ def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
pass
@abstractmethod
class FileBasedPerfRegressionDataPersister(PerfRegressionDataPersister):
def __init__(self, filename: str):
self.filename = filename
- self.traces_to_delete = []
+ self.traces_to_delete: List[str] = []
def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
with open(self.filename, 'rb') as f:
f'SELECT * FROM runtimes_by_function WHERE function = "{method_id}";'
)
)
- ret = {method_id: []}
+ ret: Dict[str, List[float]] = {method_id: []}
for result in results.all():
ret[method_id].append(result['runtime'])
results.close()
def __init__(self) -> None:
self.destination = tempfile.SpooledTemporaryFile(mode='r+')
- self.recorder = None
+ self.recorder: Optional[contextlib.redirect_stdout] = None
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
self.recorder = contextlib.redirect_stdout(self.destination)
+ assert self.recorder
self.recorder.__enter__()
return lambda: self.destination
- def __exit__(self, *args) -> bool:
+ def __exit__(self, *args) -> Optional[bool]:
+ assert self.recorder
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
def __init__(self) -> None:
self.destination = tempfile.SpooledTemporaryFile(mode='r+')
- self.recorder = None
+ self.recorder: Optional[contextlib.redirect_stdout[Any]] = None
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
- self.recorder = contextlib.redirect_stderr(self.destination)
+ self.recorder = contextlib.redirect_stderr(self.destination) # type: ignore
+ assert self.recorder
self.recorder.__enter__()
return lambda: self.destination
- def __exit__(self, *args) -> bool:
+ def __exit__(self, *args) -> Optional[bool]:
+ assert self.recorder
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
def __init__(self, *files) -> None:
self.files = [*files]
self.destination = tempfile.SpooledTemporaryFile(mode='r+')
- self.saved_writes = []
+ self.saved_writes: List[Callable[..., Any]] = []
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
for f in self.files:
f.write = self.destination.write
return lambda: self.destination
- def __exit__(self, *args) -> bool:
+ def __exit__(self, *args) -> Optional[bool]:
for f in self.files:
f.write = self.saved_writes.pop()
self.destination.seek(0)
+ return None
if __name__ == '__main__':
unless you want to populate the same exact files.
"""
- words_by_sigs = {}
+ words_by_sigs: Dict[int, str] = {}
seen = set()
with open(dictfile, "r") as f:
for word in f:
if self.someone_is_home is None:
raise Exception("Too Soon!")
if self.someone_is_home:
+ assert self.someone_home_since
return (True, self.someone_home_since)
else:
+ assert self.everyone_gone_since
return (False, self.everyone_gone_since)