if self._check_if_cancelled(bundle):
try:
return self._process_work_result(bundle)
- except Exception as e:
+ except Exception:
logger.warning(
'%s: bundle says it\'s cancelled upfront but no results?!', bundle
)
# it is done but we can't find the results it
# should have copied over. Reschedule the whole
# thing.
- logger.exception(e)
- logger.error(
+ logger.exception(
'%s: We are the original owner thread and yet there are '
- 'no results for this bundle. This is unexpected and bad.',
+ 'no results for this bundle. This is unexpected and bad. '
+ 'Attempting an emergency retry...',
bundle,
)
return self._emergency_retry_nasty_bundle(bundle)
logger.debug(
"%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
)
- except Exception as e:
+ except Exception:
self._release_worker(bundle)
if is_original:
# Weird. We tried to copy the code to the worker
# and it failed... And we're the original bundle.
# We have to retry.
- logger.exception(e)
- logger.error(
+ logger.exception(
"%s: Failed to send instructions to the worker machine?! "
"This is not expected; we\'re the original bundle so this shouldn\'t "
"be a race condition. Attempting an emergency retry...",
# unpickle the results we got from the remove machine. If we
# still have an active ssh subprocess, keep waiting on it.
# Otherwise, time for an emergency reschedule.
- except Exception as e:
- logger.exception(e)
- logger.error('%s: Something unexpected just happened...', bundle)
+ except Exception:
+ logger.exception('%s: Something unexpected just happened...', bundle)
if p is not None:
logger.warning(
"%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
- logger.exception(e)
- logger.error('Failed to load %s... this is bad news.', result_file)
+ logger.exception('Failed to load %s... this is bad news.', result_file)
self._release_worker(bundle)
# Re-raise the exception; the code in _wait_for_process may
"Helper to run pickled code remotely and return results",
)
cfg.add_argument(
- '--code_file',
+ "--code_file",
type=str,
required=True,
- metavar='FILENAME',
- help='The location of the bundle of code to execute.',
+ metavar="FILENAME",
+ help="The location of the bundle of code to execute.",
)
cfg.add_argument(
- '--result_file',
+ "--result_file",
type=str,
required=True,
- metavar='FILENAME',
- help='The location where we should write the computation results.',
+ metavar="FILENAME",
+ help="The location where we should write the computation results.",
)
cfg.add_argument(
- '--watch_for_cancel',
+ "--watch_for_cancel",
action=argparse_utils.ActionNoYes,
default=True,
- help='Should we watch for the cancellation of our parent ssh process?',
+ help="Should we watch for the cancellation of our parent ssh process?",
)
@background_thread
def _watch_for_cancel(terminate_event: threading.Event) -> None:
- logger.debug('Starting up background thread...')
+ logger.debug("Starting up background thread...")
p = psutil.Process(os.getpid())
while True:
saw_sshd = False
for ancestor in ancestors:
name = ancestor.name()
pid = ancestor.pid
- logger.debug('Ancestor process %s (pid=%d)', name, pid)
- if 'ssh' in name.lower():
+ logger.debug("Ancestor process %s (pid=%d)", name, pid)
+ if "ssh" in name.lower():
saw_sshd = True
break
if not saw_sshd:
logger.error(
- 'Did not see sshd in our ancestors list?! Committing suicide.'
+ "Did not see sshd in our ancestors list?! Committing suicide."
)
- os.system('pstree')
+ os.system("pstree")
os.kill(os.getpid(), signal.SIGTERM)
time.sleep(5.0)
os.kill(os.getpid(), signal.SIGKILL)
def main() -> None:
"""Remote worker entry point."""
- in_file = config.config['code_file']
+ in_file = config.config["code_file"]
assert in_file and type(in_file) == str
- out_file = config.config['result_file']
+ out_file = config.config["result_file"]
assert out_file and type(out_file) == str
thread = None
stop_event = None
- if config.config['watch_for_cancel']:
+ if config.config["watch_for_cancel"]:
thread, stop_event = _watch_for_cancel()
- logger.debug('Reading %s.', in_file)
+ logger.debug("Reading %s.", in_file)
try:
- with open(in_file, 'rb') as rb:
+ with open(in_file, "rb") as rb:
serialized = rb.read()
- except Exception as e:
- logger.exception(e)
- logger.critical('Problem reading %s. Aborting.', in_file)
+ except Exception:
+ logger.exception("Problem reading %s; aborting.", in_file)
_cleanup_and_exit(thread, stop_event, 1)
- logger.debug('Deserializing %s', in_file)
+ logger.debug("Deserializing %s", in_file)
try:
fun, args, kwargs = cloudpickle.loads(serialized)
- except Exception as e:
- logger.exception(e)
- logger.critical('Problem deserializing %s. Aborting.', in_file)
+ except Exception:
+ logger.exception("Problem deserializing %s. Aborting.", in_file)
_cleanup_and_exit(thread, stop_event, 2)
- logger.debug('Invoking user-defined code...')
+ logger.debug("Invoking user-defined code...")
with Timer() as t:
ret = fun(*args, **kwargs)
- logger.debug('User code took %.1fs', t())
+ logger.debug("User code took %.1fs", t())
- logger.debug('Serializing results')
+ logger.debug("Serializing results")
try:
serialized = cloudpickle.dumps(ret)
- except Exception as e:
- logger.exception(e)
- logger.critical('Could not serialize result (%s). Aborting.', type(ret))
+ except Exception:
+ logger.exception("Could not serialize result (%s). Aborting.", type(ret))
_cleanup_and_exit(thread, stop_event, 3)
- logger.debug('Writing %s', out_file)
+ logger.debug("Writing %s", out_file)
try:
- with open(out_file, 'wb') as wb:
+ with open(out_file, "wb") as wb:
wb.write(serialized)
- except Exception as e:
- logger.exception(e)
- logger.critical('Error writing %s. Aborting.', out_file)
+ except Exception:
+ logger.exception("Error writing %s. Aborting.", out_file)
_cleanup_and_exit(thread, stop_event, 4)
_cleanup_and_exit(thread, stop_event, 0)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
logger = logging.getLogger(__name__)
cfg = config.add_commandline_args(
- f'Logging ({__file__})', 'Args related to function decorators'
+ f"Logging ({__file__})", "Args related to function decorators"
)
cfg.add_argument(
- '--unittests_ignore_perf',
- action='store_true',
+ "--unittests_ignore_perf",
+ action="store_true",
default=False,
- help='Ignore unittest perf regression in @check_method_for_perf_regressions',
+ help="Ignore unittest perf regression in @check_method_for_perf_regressions",
)
cfg.add_argument(
- '--unittests_num_perf_samples',
+ "--unittests_num_perf_samples",
type=int,
default=50,
- help='The count of perf timing samples we need to see before blocking slow runs on perf grounds',
+ help="The count of perf timing samples we need to see before blocking slow runs on perf grounds",
)
cfg.add_argument(
- '--unittests_drop_perf_traces',
+ "--unittests_drop_perf_traces",
type=str,
nargs=1,
default=None,
- help='The identifier (i.e. file!test_fixture) for which we should drop all perf data',
+ help="The identifier (i.e. file!test_fixture) for which we should drop all perf data",
)
cfg.add_argument(
- '--unittests_persistance_strategy',
- choices=['FILE', 'DATABASE'],
- default='FILE',
- help='Should we persist perf data in a file or db?',
+ "--unittests_persistance_strategy",
+ choices=["FILE", "DATABASE"],
+ default="FILE",
+ help="Should we persist perf data in a file or db?",
)
cfg.add_argument(
- '--unittests_perfdb_filename',
+ "--unittests_perfdb_filename",
type=str,
- metavar='FILENAME',
+ metavar="FILENAME",
default=f'{os.environ["HOME"]}/.python_unittest_performance_db',
- help='File in which to store perf data (iff --unittests_persistance_strategy is FILE)',
+ help="File in which to store perf data (iff --unittests_persistance_strategy is FILE)",
)
cfg.add_argument(
- '--unittests_perfdb_spec',
+ "--unittests_perfdb_spec",
type=str,
- metavar='DBSPEC',
- default='mariadb+pymysql://python_unittest:<PASSWORD>@db.house:3306/python_unittest_performance',
- help='Db connection spec for perf data (iff --unittest_persistance_strategy is DATABASE)',
+ metavar="DBSPEC",
+ default="mariadb+pymysql://python_unittest:<PASSWORD>@db.house:3306/python_unittest_performance",
+ help="Db connection spec for perf data (iff --unittest_persistance_strategy is DATABASE)",
)
unittest.main = bootstrap.initialize(unittest.main)
self.traces_to_delete: List[str] = []
def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
- with open(self.filename, 'rb') as f:
+ with open(self.filename, "rb") as f:
return pickle.load(f)
def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
if trace in data:
data[trace] = []
- with open(self.filename, 'wb') as f:
+ with open(self.filename, "wb") as f:
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
def delete_performance_data(self, method_id: str):
@functools.wraps(func)
def wrapper_perf_monitor(*args, **kwargs):
- if config.config['unittests_ignore_perf']:
+ if config.config["unittests_ignore_perf"]:
return func(*args, **kwargs)
- if config.config['unittests_persistance_strategy'] == 'FILE':
- filename = config.config['unittests_perfdb_filename']
+ if config.config["unittests_persistance_strategy"] == "FILE":
+ filename = config.config["unittests_perfdb_filename"]
helper = FileBasedPerfRegressionDataPersister(filename)
- elif config.config['unittests_persistance_strategy'] == 'DATABASE':
+ elif config.config["unittests_persistance_strategy"] == "DATABASE":
raise NotImplementedError(
- 'Persisting to a database is not implemented in this version'
+ "Persisting to a database is not implemented in this version"
)
else:
- raise Exception('Unknown/unexpected --unittests_persistance_strategy value')
+ raise Exception("Unknown/unexpected --unittests_persistance_strategy value")
func_id = function_utils.function_identifier(func)
func_name = func.__name__
- logger.debug('Watching %s\'s performance...', func_name)
+ logger.debug("Watching %s's performance...", func_name)
logger.debug('Canonical function identifier = "%s"', func_id)
try:
perfdb = helper.load_performance_data(func_id)
- except Exception as e:
- logger.exception(e)
- msg = 'Unable to load perfdb; skipping it...'
- logger.warning(msg)
+ except Exception:
+ msg = "Unable to load perfdb; skipping it..."
+ logger.exception(msg)
warnings.warn(msg)
perfdb = {}
# cmdline arg to forget perf traces for function
- drop_id = config.config['unittests_drop_perf_traces']
+ drop_id = config.config["unittests_drop_perf_traces"]
if drop_id is not None:
helper.delete_performance_data(drop_id)
# See if it was unexpectedly slow.
hist = perfdb.get(func_id, [])
- if len(hist) < config.config['unittests_num_perf_samples']:
+ if len(hist) < config.config["unittests_num_perf_samples"]:
hist.append(run_time)
- logger.debug('Still establishing a perf baseline for %s', func_name)
+ logger.debug("Still establishing a perf baseline for %s", func_name)
else:
stdev = statistics.stdev(hist)
- logger.debug('For %s, performance stdev=%.2f', func_name, stdev)
+ logger.debug("For %s, performance stdev=%.2f", func_name, stdev)
slowest = hist[-1]
- logger.debug('For %s, slowest perf on record is %.2fs', func_name, slowest)
+ logger.debug("For %s, slowest perf on record is %.2fs", func_name, slowest)
limit = slowest + stdev * 4
- logger.debug('For %s, max acceptable runtime is %.2fs', func_name, limit)
+ logger.debug("For %s, max acceptable runtime is %.2fs", func_name, limit)
logger.debug(
- 'For %s, actual observed runtime was %.2fs', func_name, run_time
+ "For %s, actual observed runtime was %.2fs", func_name, run_time
)
if run_time > limit:
- msg = f'''{func_id} performance has regressed unacceptably.
+ msg = f"""{func_id} performance has regressed unacceptably.
{slowest:f}s is the slowest runtime on record in {len(hist)} perf samples.
It just ran in {run_time:f}s which is 4+ stdevs slower than the slowest.
Here is the current, full db perf timing distribution:
-'''
+"""
for x in hist:
- msg += f'{x:f}\n'
+ msg += f"{x:f}\n"
logger.error(msg)
slf = args[0] # Peek at the wrapped function's self ref.
slf.fail(msg) # ...to fail the testcase.
# Don't spam the database with samples; just pick a random
# sample from what we have and store that back.
- n = min(config.config['unittests_num_perf_samples'], len(hist))
+ n = min(config.config["unittests_num_perf_samples"], len(hist))
hist = random.sample(hist, n)
hist.sort()
perfdb[func_id] = hist
return wrapper_perf_monitor
-def check_all_methods_for_perf_regressions(prefix='test_'):
+def check_all_methods_for_perf_regressions(prefix="test_"):
"""This decorator is meant to apply to classes that subclass from
:class:`unittest.TestCase` and, when applied, has the affect of
decorating each method that matches the `prefix` given with the
for name, m in inspect.getmembers(cls, inspect.isfunction):
if name.startswith(prefix):
setattr(cls, name, check_method_for_perf_regressions(m))
- logger.debug('Wrapping %s:%s.', cls.__name__, name)
+ logger.debug("Wrapping %s:%s.", cls.__name__, name)
return cls
return decorate_the_testcase
def __init__(self) -> None:
super().__init__()
- self.destination = tempfile.SpooledTemporaryFile(mode='r+')
+ self.destination = tempfile.SpooledTemporaryFile(mode="r+")
self.recorder: Optional[contextlib.redirect_stdout] = None
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
def __init__(self) -> None:
super().__init__()
- self.destination = tempfile.SpooledTemporaryFile(mode='r+')
+ self.destination = tempfile.SpooledTemporaryFile(mode="r+")
self.recorder: Optional[contextlib.redirect_stdout[Any]] = None
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
def __init__(self, *files) -> None:
super().__init__()
self.files = [*files]
- self.destination = tempfile.SpooledTemporaryFile(mode='r+')
+ self.destination = tempfile.SpooledTemporaryFile(mode="r+")
self.saved_writes: List[Callable[..., Any]] = []
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
return False
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()