3 """Helpers for unittests. Note that when you import this we
4 automatically wrap unittest.main() with a call to bootstrap.initialize
5 so that we getLogger config, commandline args, logging control,
6 etc... this works fine but it's a little hacky so caveat emptor.
18 from typing import Callable
25 logger = logging.getLogger(__name__)
26 cfg = config.add_commandline_args(
27 f'Logging ({__file__})',
28 'Args related to function decorators')
30 '--unittests_ignore_perf',
33 help='Ignore unittest perf regression in @check_method_for_perf_regressions',
36 '--unittests_num_perf_samples',
39 help='The count of perf timing samples we need to see before blocking slow runs on perf grounds'
42 '--unittests_drop_perf_traces',
46 help='The identifier (i.e. file!test_fixture) for which we should drop all perf data'
50 # >>> This is the hacky business, FYI. <<<
51 unittest.main = bootstrap.initialize(unittest.main)
54 _db = '/home/scott/.python_unittest_performance_db'
57 def check_method_for_perf_regressions(func: Callable) -> Callable:
58 """This is meant to be used on a method in a class that subclasses
59 unittest.TestCase. When thus decorated it will time the execution
60 of the code in the method, compare it with a database of
61 historical perfmance, and fail the test with a perf-related
62 message if it has become too slow.
65 def load_known_test_performance_characteristics():
66 with open(_db, 'rb') as f:
69 def save_known_test_performance_characteristics(perfdb):
70 with open(_db, 'wb') as f:
71 pickle.dump(perfdb, f, pickle.HIGHEST_PROTOCOL)
73 @functools.wraps(func)
74 def wrapper_perf_monitor(*args, **kwargs):
76 perfdb = load_known_test_performance_characteristics()
77 except Exception as e:
79 logger.warning(f'Unable to load perfdb from {_db}')
82 # This is a unique identifier for a test: filepath!function
83 logger.debug(f'Watching {func.__name__}\'s performance...')
84 func_id = f'{func.__globals__["__file__"]}!{func.__name__}'
85 logger.debug(f'Canonical function identifier = {func_id}')
87 # cmdline arg to forget perf traces for function
88 drop_id = config.config['unittests_drop_perf_traces']
89 if drop_id is not None:
93 # Run the wrapped test paying attention to latency.
94 start_time = time.perf_counter()
95 value = func(*args, **kwargs)
96 end_time = time.perf_counter()
97 run_time = end_time - start_time
98 logger.debug(f'{func.__name__} executed in {run_time:f}s.')
100 # Check the db; see if it was unexpectedly slow.
101 hist = perfdb.get(func_id, [])
102 if len(hist) < config.config['unittests_num_perf_samples']:
103 hist.append(run_time)
105 f'Still establishing a perf baseline for {func.__name__}'
108 stdev = statistics.stdev(hist)
109 limit = hist[-1] + stdev * 3
111 f'Max acceptable performace for {func.__name__} is {limit:f}s'
115 not config.config['unittests_ignore_perf']
117 msg = f'''{func_id} performance has regressed unacceptably.
118 {hist[-1]:f}s is the slowest record in {len(hist)} db perf samples.
119 It just ran in {run_time:f}s which is >3 stdevs slower than the slowest sample.
120 Here is the current, full db perf timing distribution:
127 hist.append(run_time)
129 n = min(config.config['unittests_num_perf_samples'], len(hist))
130 hist = random.sample(hist, n)
132 perfdb[func_id] = hist
133 save_known_test_performance_characteristics(perfdb)
135 return wrapper_perf_monitor
138 def check_all_methods_for_perf_regressions(prefix='test_'):
139 def decorate_the_testcase(cls):
140 if issubclass(cls, unittest.TestCase):
141 for name, m in inspect.getmembers(cls, inspect.isfunction):
142 if name.startswith(prefix):
143 setattr(cls, name, check_method_for_perf_regressions(m))
144 logger.debug(f'Wrapping {cls.__name__}:{name}.')
146 return decorate_the_testcase
154 class RecordStdout(object):
156 with uu.RecordStdout() as record:
157 print("This is a test!")
158 print({record().readline()})
161 def __init__(self) -> None:
162 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
165 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
166 self.recorder = contextlib.redirect_stdout(self.destination)
167 self.recorder.__enter__()
168 return lambda: self.destination
170 def __exit__(self, *args) -> bool:
171 self.recorder.__exit__(*args)
172 self.destination.seek(0)
176 class RecordStderr(object):
178 with uu.RecordStderr() as record:
179 print("This is a test!", file=sys.stderr)
180 print({record().readline()})
183 def __init__(self) -> None:
184 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
187 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
188 self.recorder = contextlib.redirect_stderr(self.destination)
189 self.recorder.__enter__()
190 return lambda: self.destination
192 def __exit__(self, *args) -> bool:
193 self.recorder.__exit__(*args)
194 self.destination.seek(0)
198 class RecordMultipleStreams(object):
200 with uu.RecordStreams(sys.stderr, sys.stdout) as record:
201 print("This is a test!")
202 print("This is one too.", file=sys.stderr)
204 print(record().readlines())
206 def __init__(self, *files) -> None:
207 self.files = [*files]
208 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
209 self.saved_writes = []
211 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
213 self.saved_writes.append(f.write)
214 f.write = self.destination.write
215 return lambda: self.destination
217 def __exit__(self, *args) -> bool:
219 f.write = self.saved_writes.pop()
220 self.destination.seek(0)