From: Scott Gasch Date: Tue, 22 Mar 2022 17:38:16 +0000 (-0700) Subject: Loosen backup policy and cleanup code a little. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=3cd90acd08d5a95f5fb95b79c8bfddebbe099291;p=python_utils.git Loosen backup policy and cleanup code a little. --- diff --git a/executors.py b/executors.py index e1949fb..fcdd3e7 100644 --- a/executors.py +++ b/executors.py @@ -31,6 +31,7 @@ from overrides import overrides import argparse_utils import config import histogram as hist +import string_utils from ansi import bg, fg, reset, underline from decorator_utils import singleton from exec_utils import cmd_in_background, cmd_with_timeout, run_silently @@ -619,8 +620,8 @@ class RemoteExecutor(BaseExecutor): num_idle_workers = self.worker_count - self.task_count now = time.time() if ( - num_done > 2 - and num_idle_workers > 1 + num_done >= 2 + and num_idle_workers > 0 and (self.last_backup is None or (now - self.last_backup > 9.0)) and self.backup_lock.acquire(blocking=False) ): @@ -753,6 +754,7 @@ class RemoteExecutor(BaseExecutor): def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any: """Find a worker for bundle or block until one is available.""" + self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname @@ -800,13 +802,10 @@ class RemoteExecutor(BaseExecutor): ) return self.emergency_retry_nasty_bundle(bundle) else: - # Expected(?). We're a backup and our bundle is - # cancelled before we even got started. Something - # went bad in process_work_result (I acutually don't - # see what?) but probably not worth worrying - # about. Let the original thread worry about - # either finding the results or complaining about - # it. + # We're a backup and our bundle is cancelled + # before we even got started. Do nothing and let + # the original bundle's thread worry about either + # finding the results or complaining about it. return None # Send input code / data to worker machine if it's not local. @@ -821,8 +820,9 @@ class RemoteExecutor(BaseExecutor): except Exception as e: self.release_worker(bundle) if is_original: - # Weird. We tried to copy the code to the worker and it failed... - # And we're the original bundle. We have to retry. + # Weird. We tried to copy the code to the worker + # and it failed... And we're the original bundle. + # We have to retry. logger.exception(e) logger.error( "%s: Failed to send instructions to the worker machine?! " @@ -835,7 +835,7 @@ class RemoteExecutor(BaseExecutor): # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source - # code file before we could copy it. No biggie. + # code_file before we could copy it. Ignore. logger.warning( '%s: Failed to send instructions to the worker machine... ' 'We\'re a backup and this may be caused by the original (or ' @@ -953,6 +953,7 @@ class RemoteExecutor(BaseExecutor): else: break + # Cleanup remote /tmp files. run_silently( f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"' ) @@ -965,7 +966,8 @@ class RemoteExecutor(BaseExecutor): # original is also the only job that may delete result_file # from disk. Note that the original may have been cancelled # if one of the backups finished first; it still must read the - # result from disk. + # result from disk. It still does that here with is_cancelled + # set. if is_original: logger.debug("%s: Unpickling %s.", bundle, result_file) try: @@ -1015,9 +1017,7 @@ class RemoteExecutor(BaseExecutor): return result def create_original_bundle(self, pickle, fname: str): - from string_utils import generate_uuid - - uuid = generate_uuid(omit_dashes=True) + uuid = string_utils.generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' @@ -1051,6 +1051,7 @@ class RemoteExecutor(BaseExecutor): return bundle def create_backup_bundle(self, src_bundle: BundleDetails): + assert self.status.lock.locked() assert src_bundle.backup_bundles is not None n = len(src_bundle.backup_bundles) uuid = src_bundle.uuid + f'_backup#{n}' @@ -1092,7 +1093,8 @@ class RemoteExecutor(BaseExecutor): # Results from backups don't matter; if they finish first # they will move the result_file to this machine and let - # the original pick them up and unpickle them. + # the original pick them up and unpickle them (and return + # a result). def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]: is_original = bundle.src_bundle is None @@ -1239,7 +1241,7 @@ class DefaultExecutors(object): for record in pool: if record.machine == platform.node() and record.count > 1: logger.info('Reducing workload for %s.', record.machine) - record.count = 1 + record.count = max(int(record.count / 2), 1) policy = WeightedRandomRemoteWorkerSelectionPolicy() policy.register_worker_pool(pool)