X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=unittest_utils.py;h=28b577e2086af4ff20647d05cd9be24761839d6d;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=f229df75e8b88825d66ca227d7e907d3dc725e1a;hpb=713a609bd19d491de03debf8a4a6ddf2540b13dc;p=python_utils.git diff --git a/unittest_utils.py b/unittest_utils.py index f229df7..28b577e 100644 --- a/unittest_utils.py +++ b/unittest_utils.py @@ -1,10 +1,12 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + """Helpers for unittests. Note that when you import this we - automatically wrap unittest.main() with a call to - bootstrap.initialize so that we getLogger config, commandline args, - logging control, etc... this works fine but it's a little hacky so - caveat emptor. +automatically wrap unittest.main() with a call to bootstrap.initialize +so that we getLogger config, commandline args, logging control, +etc... this works fine but it's a little hacky so caveat emptor. + """ import contextlib @@ -20,7 +22,7 @@ import time import unittest import warnings from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Literal, Optional import sqlalchemy as sa @@ -76,6 +78,9 @@ unittest.main = bootstrap.initialize(unittest.main) class PerfRegressionDataPersister(ABC): + """A base class for a signature dealing with persisting perf + regression data.""" + def __init__(self): pass @@ -93,7 +98,10 @@ class PerfRegressionDataPersister(ABC): class FileBasedPerfRegressionDataPersister(PerfRegressionDataPersister): + """A perf regression data persister that uses files.""" + def __init__(self, filename: str): + super().__init__() self.filename = filename self.traces_to_delete: List[str] = [] @@ -114,7 +122,10 @@ class FileBasedPerfRegressionDataPersister(PerfRegressionDataPersister): class DatabasePerfRegressionDataPersister(PerfRegressionDataPersister): + """A perf regression data persister that uses a database backend.""" + def __init__(self, dbspec: str): + super().__init__() self.dbspec = dbspec self.engine = sa.create_engine(self.dbspec) self.conn = self.engine.connect() @@ -131,10 +142,10 @@ class DatabasePerfRegressionDataPersister(PerfRegressionDataPersister): def save_performance_data(self, method_id: str, data: Dict[str, List[float]]): self.delete_performance_data(method_id) - for (method_id, perf_data) in data.items(): + for (mid, perf_data) in data.items(): sql = 'INSERT INTO runtimes_by_function (function, runtime) VALUES ' for perf in perf_data: - self.conn.execute(sql + f'("{method_id}", {perf});') + self.conn.execute(sql + f'("{mid}", {perf});') def delete_performance_data(self, method_id: str): sql = f'DELETE FROM runtimes_by_function WHERE function = "{method_id}"' @@ -168,8 +179,8 @@ def check_method_for_perf_regressions(func: Callable) -> Callable: func_id = function_utils.function_identifier(func) func_name = func.__name__ - logger.debug(f'Watching {func_name}\'s performance...') - logger.debug(f'Canonical function identifier = {func_id}') + logger.debug('Watching %s\'s performance...', func_name) + logger.debug('Canonical function identifier = "%s"', func_id) try: perfdb = helper.load_performance_data(func_id) @@ -195,15 +206,15 @@ def check_method_for_perf_regressions(func: Callable) -> Callable: hist = perfdb.get(func_id, []) if len(hist) < config.config['unittests_num_perf_samples']: hist.append(run_time) - logger.debug(f'Still establishing a perf baseline for {func_name}') + logger.debug('Still establishing a perf baseline for %s', func_name) else: stdev = statistics.stdev(hist) - logger.debug(f'For {func_name}, performance stdev={stdev}') + logger.debug('For %s, performance stdev=%.2f', func_name, stdev) slowest = hist[-1] - logger.debug(f'For {func_name}, slowest perf on record is {slowest:f}s') + logger.debug('For %s, slowest perf on record is %.2fs', func_name, slowest) limit = slowest + stdev * 4 - logger.debug(f'For {func_name}, max acceptable runtime is {limit:f}s') - logger.debug(f'For {func_name}, actual observed runtime was {run_time:f}s') + logger.debug('For %s, max acceptable runtime is %.2fs', func_name, limit) + logger.debug('For %s, actual observed runtime was %.2fs', func_name, run_time) if run_time > limit: msg = f'''{func_id} performance has regressed unacceptably. {slowest:f}s is the slowest runtime on record in {len(hist)} perf samples. @@ -250,20 +261,13 @@ def check_all_methods_for_perf_regressions(prefix='test_'): for name, m in inspect.getmembers(cls, inspect.isfunction): if name.startswith(prefix): setattr(cls, name, check_method_for_perf_regressions(m)) - logger.debug(f'Wrapping {cls.__name__}:{name}.') + logger.debug('Wrapping %s:%s.', cls.__name__, name) return cls return decorate_the_testcase -def breakpoint(): - """Hard code a breakpoint somewhere; drop into pdb.""" - import pdb - - pdb.set_trace() - - -class RecordStdout(object): +class RecordStdout(contextlib.AbstractContextManager): """ Record what is emitted to stdout. @@ -275,6 +279,7 @@ class RecordStdout(object): """ def __init__(self) -> None: + super().__init__() self.destination = tempfile.SpooledTemporaryFile(mode='r+') self.recorder: Optional[contextlib.redirect_stdout] = None @@ -284,14 +289,14 @@ class RecordStdout(object): self.recorder.__enter__() return lambda: self.destination - def __exit__(self, *args) -> Optional[bool]: + def __exit__(self, *args) -> Literal[False]: assert self.recorder is not None self.recorder.__exit__(*args) self.destination.seek(0) - return None + return False -class RecordStderr(object): +class RecordStderr(contextlib.AbstractContextManager): """ Record what is emitted to stderr. @@ -304,6 +309,7 @@ class RecordStderr(object): """ def __init__(self) -> None: + super().__init__() self.destination = tempfile.SpooledTemporaryFile(mode='r+') self.recorder: Optional[contextlib.redirect_stdout[Any]] = None @@ -313,19 +319,20 @@ class RecordStderr(object): self.recorder.__enter__() return lambda: self.destination - def __exit__(self, *args) -> Optional[bool]: + def __exit__(self, *args) -> Literal[False]: assert self.recorder is not None self.recorder.__exit__(*args) self.destination.seek(0) - return None + return False -class RecordMultipleStreams(object): +class RecordMultipleStreams(contextlib.AbstractContextManager): """ Record the output to more than one stream. """ def __init__(self, *files) -> None: + super().__init__() self.files = [*files] self.destination = tempfile.SpooledTemporaryFile(mode='r+') self.saved_writes: List[Callable[..., Any]] = [] @@ -336,11 +343,11 @@ class RecordMultipleStreams(object): f.write = self.destination.write return lambda: self.destination - def __exit__(self, *args) -> Optional[bool]: + def __exit__(self, *args) -> Literal[False]: for f in self.files: f.write = self.saved_writes.pop() self.destination.seek(0) - return None + return False if __name__ == '__main__':