action=argparse_utils.ActionNoYes,
help='Should we schedule duplicative backup work if a remote bundle is slow',
)
+parser.add_argument(
+ '--executors_max_bundle_failures',
+ type=int,
+ default=3,
+ metavar='#FAILURES',
+ help='Maximum number of failures before giving up on a bundle',
+)
-rsync = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
-ssh = 'ssh -oForwardX11=no'
-
-
-hist = histogram.SimpleHistogram(
+RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
+SSH = 'ssh -oForwardX11=no'
+HIST = histogram.SimpleHistogram(
histogram.SimpleHistogram.n_evenly_spaced_buckets(
int(0), int(500), 25
)
end = time.time()
duration = end - start
logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
- hist.add_item(duration)
+ HIST.add_item(duration)
return result
def shutdown(self,
wait = True) -> None:
logger.debug("Shutting down threadpool executor.")
- print(hist)
+ print(HIST)
self._thread_pool_executor.shutdown(wait)
def shutdown(self, wait=True) -> None:
logger.debug('Shutting down processpool executor')
- print(hist)
+ print(HIST)
self._process_executor.shutdown(wait)
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
+ failure_count: int
class RemoteExecutorStatus:
return False
def launch(self, bundle: BundleDetails) -> Any:
- # Find a worker for bundle or block until one is available.
+ """Find a worker for bundle or block until one is available."""
uuid = bundle.uuid
hostname = bundle.hostname
avoid_machine = None
+
+ # Try not to schedule a backup on the same host as the original.
if bundle.src_bundle is not None:
avoid_machine = bundle.src_bundle.machine
worker = None
self.status.record_acquire_worker(worker, uuid)
logger.debug(f'Running bundle {uuid} on {worker}...')
- # Before we do work, make sure it's still viable.
+ # Before we do any work, make sure the bundle is still viable.
if self.check_if_cancelled(bundle):
- return self.post_launch_work(bundle)
+ try:
+ return self.post_launch_work(bundle)
+ except Exception as e:
+ logger.exception(e)
+ logger.info(f"Bundle {uuid} seems to have failed?!")
+ if bundle.failure_count < config.config['executors_max_bundle_failures']:
+ return self.launch(bundle)
+ else:
+ logger.info(f"Bundle {uuid} is poison, giving up on it.")
+ return None
# Send input to machine if it's not local.
if hostname not in machine:
- cmd = f'{rsync} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- logger.debug(f"Copying work to {worker} via {cmd}")
+ cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ logger.info(f"Copying work to {worker} via {cmd}")
exec_utils.run_silently(cmd)
- # Before we do more work, make sure it's still viable.
- if self.check_if_cancelled(bundle):
- return self.post_launch_work(bundle)
-
- # Fucking Apple has a python3 binary in /usr/sbin that is not
- # the one we want and is protected by the OS so make sure that
- # /usr/local/bin is early in the path.
- cmd = (f'{ssh} {bundle.username}@{bundle.machine} '
- f'"export PATH=/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/home/scott/bin:/home/scott/.local/bin; /home/scott/lib/python_modules/remote_worker.py'
+ # Do it.
+ cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
+ f'"source remote-execution/bin/activate &&'
+ f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
p = exec_utils.cmd_in_background(cmd, silent=True)
bundle.pid = pid = p.pid
- logger.debug(f"Running {cmd} in the background as process {pid}")
+ logger.info(f"Running {cmd} in the background as process {pid}")
while True:
try:
f"{pid}/{bundle.uuid} has finished its work normally."
)
break
- return self.post_launch_work(bundle)
+
+ try:
+ return self.post_launch_work(bundle)
+ except Exception as e:
+ logger.exception(e)
+ logger.info(f"Bundle {uuid} seems to have failed?!")
+ if bundle.failure_count < config.config['executors_max_bundle_failures']:
+ return self.launch(bundle)
+ logger.info(f"Bundle {uuid} is poison, giving up on it.")
+ return None
def post_launch_work(self, bundle: BundleDetails) -> Any:
with self.status.lock:
if not was_cancelled:
assert bundle.machine is not None
if bundle.hostname not in bundle.machine:
- cmd = f'{rsync} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
- logger.debug(
+ cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
+ logger.info(
f"Fetching results from {username}@{machine} via {cmd}"
)
try:
exec_utils.run_silently(cmd)
except subprocess.CalledProcessError:
pass
- exec_utils.run_silently(f'{ssh} {username}@{machine}'
+ exec_utils.run_silently(f'{SSH} {username}@{machine}'
f' "/bin/rm -f {code_file} {result_file}"')
bundle.end_ts = time.time()
assert bundle.worker is not None
)
if not was_cancelled:
dur = bundle.end_ts - bundle.start_ts
- hist.add_item(dur)
+ HIST.add_item(dur)
+
+ # Original or not, the results should be back on the local
+ # machine. Are they?
+ if not os.path.exists(result_file):
+ msg = f'{result_file} unexpectedly missing, wtf?!'
+ logger.critical(msg)
+ bundle.failure_count += 1
+ self.release_worker(bundle.worker)
+ raise Exception(msg)
# Only the original worker should unpickle the file contents
# though since it's the only one whose result matters.
if is_original:
logger.debug(f"Unpickling {result_file}.")
- with open(f'{result_file}', 'rb') as rb:
- serialized = rb.read()
- result = cloudpickle.loads(serialized)
+ try:
+ with open(f'{result_file}', 'rb') as rb:
+ serialized = rb.read()
+ result = cloudpickle.loads(serialized)
+ except Exception as e:
+ msg = f'Failed to load {result_file}'
+ logger.critical(msg)
+ bundle.failure_count += 1
+ self.release_worker(bundle.worker)
+ raise Exception(e)
os.remove(f'{result_file}')
os.remove(f'{code_file}')
is_cancelled = threading.Event(),
was_cancelled = False,
backup_bundles = [],
+ failure_count = 0,
)
self.status.record_bundle_details(bundle)
logger.debug(f'Created original bundle {uuid}')
is_cancelled = threading.Event(),
was_cancelled = False,
backup_bundles = None, # backup backups not allowed
+ failure_count = 0,
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
def shutdown(self, wait=True) -> None:
self._helper_executor.shutdown(wait)
- print(hist)
+ print(HIST)
@singleton
RemoteWorkerRecord(
username = 'scott',
machine = 'cheetah.house',
- weight = 10,
- count = 6,
+ weight = 12,
+ count = 4,
),
)
if self.ping('video.house'):
RemoteWorkerRecord(
username = 'scott',
machine = 'video.house',
- weight = 2,
+ weight = 1,
count = 4,
),
)
RemoteWorkerRecord(
username = 'scott',
machine = 'backup.house',
- weight = 3,
- count = 2,
+ weight = 1,
+ count = 4,
),
)
if self.ping('puma.cabin'):
RemoteWorkerRecord(
username = 'scott',
machine = 'puma.cabin',
- weight = 10,
- count = 6,
+ weight = 12,
+ count = 4,
),
)
policy = WeightedRandomRemoteWorkerSelectionPolicy()