import argparse_utils
import config
import histogram as hist
+import string_utils
from ansi import bg, fg, reset, underline
from decorator_utils import singleton
from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
num_idle_workers = self.worker_count - self.task_count
now = time.time()
if (
- num_done > 2
- and num_idle_workers > 1
+ num_done >= 2
+ and num_idle_workers > 0
and (self.last_backup is None or (now - self.last_backup > 9.0))
and self.backup_lock.acquire(blocking=False)
):
def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
"""Find a worker for bundle or block until one is available."""
+
self.adjust_task_count(+1)
uuid = bundle.uuid
hostname = bundle.hostname
)
return self.emergency_retry_nasty_bundle(bundle)
else:
- # Expected(?). We're a backup and our bundle is
- # cancelled before we even got started. Something
- # went bad in process_work_result (I acutually don't
- # see what?) but probably not worth worrying
- # about. Let the original thread worry about
- # either finding the results or complaining about
- # it.
+ # We're a backup and our bundle is cancelled
+ # before we even got started. Do nothing and let
+ # the original bundle's thread worry about either
+ # finding the results or complaining about it.
return None
# Send input code / data to worker machine if it's not local.
except Exception as e:
self.release_worker(bundle)
if is_original:
- # Weird. We tried to copy the code to the worker and it failed...
- # And we're the original bundle. We have to retry.
+ # Weird. We tried to copy the code to the worker
+ # and it failed... And we're the original bundle.
+ # We have to retry.
logger.exception(e)
logger.error(
"%s: Failed to send instructions to the worker machine?! "
# This is actually expected; we're a backup.
# There's a race condition where someone else
# already finished the work and removed the source
- # code file before we could copy it. No biggie.
+ # code_file before we could copy it. Ignore.
logger.warning(
'%s: Failed to send instructions to the worker machine... '
'We\'re a backup and this may be caused by the original (or '
else:
break
+ # Cleanup remote /tmp files.
run_silently(
f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
# original is also the only job that may delete result_file
# from disk. Note that the original may have been cancelled
# if one of the backups finished first; it still must read the
- # result from disk.
+ # result from disk. It still does that here with is_cancelled
+ # set.
if is_original:
logger.debug("%s: Unpickling %s.", bundle, result_file)
try:
return result
def create_original_bundle(self, pickle, fname: str):
- from string_utils import generate_uuid
-
- uuid = generate_uuid(omit_dashes=True)
+ uuid = string_utils.generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
return bundle
def create_backup_bundle(self, src_bundle: BundleDetails):
+ assert self.status.lock.locked()
assert src_bundle.backup_bundles is not None
n = len(src_bundle.backup_bundles)
uuid = src_bundle.uuid + f'_backup#{n}'
# Results from backups don't matter; if they finish first
# they will move the result_file to this machine and let
- # the original pick them up and unpickle them.
+ # the original pick them up and unpickle them (and return
+ # a result).
def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
for record in pool:
if record.machine == platform.node() and record.count > 1:
logger.info('Reducing workload for %s.', record.machine)
- record.count = 1
+ record.count = max(int(record.count / 2), 1)
policy = WeightedRandomRemoteWorkerSelectionPolicy()
policy.register_worker_pool(pool)