3 # © Copyright 2021-2022, Scott Gasch
5 """Helpers for unittests.
9 When you import this we automatically wrap unittest.main()
10 with a call to bootstrap.initialize so that we getLogger
11 config, commandline args, logging control, etc... this works
12 fine but it's a little hacky so caveat emptor.
27 from abc import ABC, abstractmethod
28 from typing import Any, Callable, Dict, List, Literal, Optional
30 import sqlalchemy as sa
37 logger = logging.getLogger(__name__)
38 cfg = config.add_commandline_args(f'Logging ({__file__})', 'Args related to function decorators')
40 '--unittests_ignore_perf',
43 help='Ignore unittest perf regression in @check_method_for_perf_regressions',
46 '--unittests_num_perf_samples',
49 help='The count of perf timing samples we need to see before blocking slow runs on perf grounds',
52 '--unittests_drop_perf_traces',
56 help='The identifier (i.e. file!test_fixture) for which we should drop all perf data',
59 '--unittests_persistance_strategy',
60 choices=['FILE', 'DATABASE'],
62 help='Should we persist perf data in a file or db?',
65 '--unittests_perfdb_filename',
68 default=f'{os.environ["HOME"]}/.python_unittest_performance_db',
69 help='File in which to store perf data (iff --unittests_persistance_strategy is FILE)',
72 '--unittests_perfdb_spec',
75 default='mariadb+pymysql://python_unittest:<PASSWORD>@db.house:3306/python_unittest_performance',
76 help='Db connection spec for perf data (iff --unittest_persistance_strategy is DATABASE)',
79 # >>> This is the hacky business, FYI. <<<
80 unittest.main = bootstrap.initialize(unittest.main)
83 class PerfRegressionDataPersister(ABC):
84 """A base class for a signature dealing with persisting perf
91 def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
95 def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
99 def delete_performance_data(self, method_id: str):
103 class FileBasedPerfRegressionDataPersister(PerfRegressionDataPersister):
104 """A perf regression data persister that uses files."""
106 def __init__(self, filename: str):
108 self.filename = filename
109 self.traces_to_delete: List[str] = []
111 def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
112 with open(self.filename, 'rb') as f:
113 return pickle.load(f)
115 def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
116 for trace in self.traces_to_delete:
120 with open(self.filename, 'wb') as f:
121 pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
123 def delete_performance_data(self, method_id: str):
124 self.traces_to_delete.append(method_id)
127 class DatabasePerfRegressionDataPersister(PerfRegressionDataPersister):
128 """A perf regression data persister that uses a database backend."""
130 def __init__(self, dbspec: str):
133 self.engine = sa.create_engine(self.dbspec)
134 self.conn = self.engine.connect()
136 def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
137 results = self.conn.execute(
138 sa.text(f'SELECT * FROM runtimes_by_function WHERE function = "{method_id}";')
140 ret: Dict[str, List[float]] = {method_id: []}
141 for result in results.all():
142 ret[method_id].append(result['runtime'])
146 def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
147 self.delete_performance_data(method_id)
148 for (mid, perf_data) in data.items():
149 sql = 'INSERT INTO runtimes_by_function (function, runtime) VALUES '
150 for perf in perf_data:
151 self.conn.execute(sql + f'("{mid}", {perf});')
153 def delete_performance_data(self, method_id: str):
154 sql = f'DELETE FROM runtimes_by_function WHERE function = "{method_id}"'
155 self.conn.execute(sql)
158 def check_method_for_perf_regressions(func: Callable) -> Callable:
160 This is meant to be used on a method in a class that subclasses
161 unittest.TestCase. When thus decorated it will time the execution
162 of the code in the method, compare it with a database of
163 historical perfmance, and fail the test with a perf-related
164 message if it has become too slow.
168 @functools.wraps(func)
169 def wrapper_perf_monitor(*args, **kwargs):
170 if config.config['unittests_ignore_perf']:
171 return func(*args, **kwargs)
173 if config.config['unittests_persistance_strategy'] == 'FILE':
174 filename = config.config['unittests_perfdb_filename']
175 helper = FileBasedPerfRegressionDataPersister(filename)
176 elif config.config['unittests_persistance_strategy'] == 'DATABASE':
177 dbspec = config.config['unittests_perfdb_spec']
178 dbspec = dbspec.replace('<PASSWORD>', scott_secrets.MARIADB_UNITTEST_PERF_PASSWORD)
179 helper = DatabasePerfRegressionDataPersister(dbspec)
181 raise Exception('Unknown/unexpected --unittests_persistance_strategy value')
183 func_id = function_utils.function_identifier(func)
184 func_name = func.__name__
185 logger.debug('Watching %s\'s performance...', func_name)
186 logger.debug('Canonical function identifier = "%s"', func_id)
189 perfdb = helper.load_performance_data(func_id)
190 except Exception as e:
192 msg = 'Unable to load perfdb; skipping it...'
197 # cmdline arg to forget perf traces for function
198 drop_id = config.config['unittests_drop_perf_traces']
199 if drop_id is not None:
200 helper.delete_performance_data(drop_id)
202 # Run the wrapped test paying attention to latency.
203 start_time = time.perf_counter()
204 value = func(*args, **kwargs)
205 end_time = time.perf_counter()
206 run_time = end_time - start_time
208 # See if it was unexpectedly slow.
209 hist = perfdb.get(func_id, [])
210 if len(hist) < config.config['unittests_num_perf_samples']:
211 hist.append(run_time)
212 logger.debug('Still establishing a perf baseline for %s', func_name)
214 stdev = statistics.stdev(hist)
215 logger.debug('For %s, performance stdev=%.2f', func_name, stdev)
217 logger.debug('For %s, slowest perf on record is %.2fs', func_name, slowest)
218 limit = slowest + stdev * 4
219 logger.debug('For %s, max acceptable runtime is %.2fs', func_name, limit)
220 logger.debug('For %s, actual observed runtime was %.2fs', func_name, run_time)
222 msg = f'''{func_id} performance has regressed unacceptably.
223 {slowest:f}s is the slowest runtime on record in {len(hist)} perf samples.
224 It just ran in {run_time:f}s which is 4+ stdevs slower than the slowest.
225 Here is the current, full db perf timing distribution:
231 slf = args[0] # Peek at the wrapped function's self ref.
232 slf.fail(msg) # ...to fail the testcase.
234 hist.append(run_time)
236 # Don't spam the database with samples; just pick a random
237 # sample from what we have and store that back.
238 n = min(config.config['unittests_num_perf_samples'], len(hist))
239 hist = random.sample(hist, n)
241 perfdb[func_id] = hist
242 helper.save_performance_data(func_id, perfdb)
245 return wrapper_perf_monitor
248 def check_all_methods_for_perf_regressions(prefix='test_'):
249 """Decorate unittests with this to pay attention to the perf of the
250 testcode and flag perf regressions. e.g.
252 import unittest_utils as uu
254 @uu.check_all_methods_for_perf_regressions()
255 class TestMyClass(unittest.TestCase):
257 def test_some_part_of_my_class(self):
262 def decorate_the_testcase(cls):
263 if issubclass(cls, unittest.TestCase):
264 for name, m in inspect.getmembers(cls, inspect.isfunction):
265 if name.startswith(prefix):
266 setattr(cls, name, check_method_for_perf_regressions(m))
267 logger.debug('Wrapping %s:%s.', cls.__name__, name)
270 return decorate_the_testcase
273 class RecordStdout(contextlib.AbstractContextManager):
275 Record what is emitted to stdout.
277 >>> with RecordStdout() as record:
278 ... print("This is a test!")
279 >>> print({record().readline()})
280 {'This is a test!\\n'}
284 def __init__(self) -> None:
286 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
287 self.recorder: Optional[contextlib.redirect_stdout] = None
289 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
290 self.recorder = contextlib.redirect_stdout(self.destination)
291 assert self.recorder is not None
292 self.recorder.__enter__()
293 return lambda: self.destination
295 def __exit__(self, *args) -> Literal[False]:
296 assert self.recorder is not None
297 self.recorder.__exit__(*args)
298 self.destination.seek(0)
302 class RecordStderr(contextlib.AbstractContextManager):
304 Record what is emitted to stderr.
307 >>> with RecordStderr() as record:
308 ... print("This is a test!", file=sys.stderr)
309 >>> print({record().readline()})
310 {'This is a test!\\n'}
314 def __init__(self) -> None:
316 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
317 self.recorder: Optional[contextlib.redirect_stdout[Any]] = None
319 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
320 self.recorder = contextlib.redirect_stderr(self.destination) # type: ignore
321 assert self.recorder is not None
322 self.recorder.__enter__()
323 return lambda: self.destination
325 def __exit__(self, *args) -> Literal[False]:
326 assert self.recorder is not None
327 self.recorder.__exit__(*args)
328 self.destination.seek(0)
332 class RecordMultipleStreams(contextlib.AbstractContextManager):
334 Record the output to more than one stream.
337 def __init__(self, *files) -> None:
339 self.files = [*files]
340 self.destination = tempfile.SpooledTemporaryFile(mode='r+')
341 self.saved_writes: List[Callable[..., Any]] = []
343 def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
345 self.saved_writes.append(f.write)
346 f.write = self.destination.write
347 return lambda: self.destination
349 def __exit__(self, *args) -> Literal[False]:
351 f.write = self.saved_writes.pop()
352 self.destination.seek(0)
356 if __name__ == '__main__':