#!/usr/bin/env python3 from __future__ import annotations from abc import ABC, abstractmethod import concurrent.futures as fut from collections import defaultdict from dataclasses import dataclass import logging import numpy import os import platform import random import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set import warnings import cloudpickle # type: ignore from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils import config from decorator_utils import singleton from exec_utils import run_silently, cmd_in_background, cmd_with_timeout import histogram as hist from thread_utils import background_thread logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Executors ({__file__})", "Args related to processing executors." ) parser.add_argument( '--executors_threadpool_size', type=int, metavar='#THREADS', help='Number of threads in the default threadpool, leave unset for default', default=None, ) parser.add_argument( '--executors_processpool_size', type=int, metavar='#PROCESSES', help='Number of processes in the default processpool, leave unset for default', default=None, ) parser.add_argument( '--executors_schedule_remote_backups', default=True, action=argparse_utils.ActionNoYes, help='Should we schedule duplicative backup work if a remote bundle is slow', ) parser.add_argument( '--executors_max_bundle_failures', type=int, default=3, metavar='#FAILURES', help='Maximum number of failures before giving up on a bundle', ) SSH = '/usr/bin/ssh -oForwardX11=no' SCP = '/usr/bin/scp' def make_cloud_pickle(fun, *args, **kwargs): logger.debug(f"Making cloudpickled bundle at {fun.__name__}") return cloudpickle.dumps((fun, args, kwargs)) class BaseExecutor(ABC): def __init__(self, *, title=''): self.title = title self.task_count = 0 self.histogram = hist.SimpleHistogram( hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50) ) @abstractmethod def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pass @abstractmethod def shutdown(self, wait: bool = True) -> None: pass def adjust_task_count(self, delta: int) -> None: self.task_count += delta logger.debug(f'Executor current task count is {self.task_count}') class ThreadExecutor(BaseExecutor): def __init__(self, max_workers: Optional[int] = None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_threadpool_size' in config.config: workers = config.config['executors_threadpool_size'] logger.debug(f'Creating threadpool executor with {workers} workers') self._thread_pool_executor = fut.ThreadPoolExecutor( max_workers=workers, thread_name_prefix="thread_executor_helper" ) def run_local_bundle(self, fun, *args, **kwargs): logger.debug(f"Running local bundle at {fun.__name__}") start = time.time() result = fun(*args, **kwargs) end = time.time() self.adjust_task_count(-1) duration = end - start logger.debug(f"{fun.__name__} finished; used {duration:.1f}s") self.histogram.add_item(duration) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: self.adjust_task_count(+1) newargs = [] newargs.append(function) for arg in args: newargs.append(arg) return self._thread_pool_executor.submit( self.run_local_bundle, *newargs, **kwargs ) @overrides def shutdown(self, wait=True) -> None: logger.debug(f'Shutting down threadpool executor {self.title}') print(self.histogram) self._thread_pool_executor.shutdown(wait) class ProcessExecutor(BaseExecutor): def __init__(self, max_workers=None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_processpool_size' in config.config: workers = config.config['executors_processpool_size'] logger.debug(f'Creating processpool executor with {workers} workers.') self._process_executor = fut.ProcessPoolExecutor( max_workers=workers, ) def run_cloud_pickle(self, pickle): fun, args, kwargs = cloudpickle.loads(pickle) logger.debug(f"Running pickled bundle at {fun.__name__}") result = fun(*args, **kwargs) self.adjust_task_count(-1) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: start = time.time() self.adjust_task_count(+1) pickle = make_cloud_pickle(function, *args, **kwargs) result = self._process_executor.submit(self.run_cloud_pickle, pickle) result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start)) return result @overrides def shutdown(self, wait=True) -> None: logger.debug(f'Shutting down processpool executor {self.title}') self._process_executor.shutdown(wait) print(self.histogram) def __getstate__(self): state = self.__dict__.copy() state['_process_executor'] = None return state class RemoteExecutorException(Exception): """Thrown when a bundle cannot be executed despite several retries.""" pass @dataclass class RemoteWorkerRecord: username: str machine: str weight: int count: int def __hash__(self): return hash((self.username, self.machine)) def __repr__(self): return f'{self.username}@{self.machine}' @dataclass class BundleDetails: pickled_code: bytes uuid: str fname: str worker: Optional[RemoteWorkerRecord] username: Optional[str] machine: Optional[str] hostname: str code_file: str result_file: str pid: int start_ts: float end_ts: float slower_than_local_p95: bool slower_than_global_p95: bool src_bundle: BundleDetails is_cancelled: threading.Event was_cancelled: bool backup_bundles: Optional[List[BundleDetails]] failure_count: int def __repr__(self): uuid = self.uuid if uuid[-9:-2] == '_backup': uuid = uuid[:-9] suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}' else: suffix = uuid[-6:] colorz = [ fg('violet red'), fg('red'), fg('orange'), fg('peach orange'), fg('yellow'), fg('marigold yellow'), fg('green yellow'), fg('tea green'), fg('cornflower blue'), fg('turquoise blue'), fg('tropical blue'), fg('lavender purple'), fg('medium purple'), ] c = colorz[int(uuid[-2:], 16) % len(colorz)] fname = self.fname if self.fname is not None else 'nofname' machine = self.machine if self.machine is not None else 'nomachine' return f'{c}{suffix}/{fname}/{machine}{reset()}' class RemoteExecutorStatus: def __init__(self, total_worker_count: int) -> None: self.worker_count: int = total_worker_count self.known_workers: Set[RemoteWorkerRecord] = set() self.start_time: float = time.time() self.start_per_bundle: Dict[str, float] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[ RemoteWorkerRecord, List[float] ] = {} self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} self.finished_bundle_timings: List[float] = [] self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted: int = 0 # Protects reads and modification using self. Also used # as a memory fence for modifications to bundle. self.lock: threading.Lock = threading.Lock() def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None: with self.lock: self.record_acquire_worker_already_locked(worker, uuid) def record_acquire_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str ) -> None: assert self.lock.locked() self.known_workers.add(worker) self.start_per_bundle[uuid] = None x = self.in_flight_bundles_by_worker.get(worker, set()) x.add(uuid) self.in_flight_bundles_by_worker[worker] = x def record_bundle_details(self, details: BundleDetails) -> None: with self.lock: self.record_bundle_details_already_locked(details) def record_bundle_details_already_locked(self, details: BundleDetails) -> None: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details def record_release_worker( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: with self.lock: self.record_release_worker_already_locked(worker, uuid, was_cancelled) def record_release_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: assert self.lock.locked() ts = time.time() self.end_per_bundle[uuid] = ts self.in_flight_bundles_by_worker[worker].remove(uuid) if not was_cancelled: bundle_latency = ts - self.start_per_bundle[uuid] x = self.finished_bundle_timings_per_worker.get(worker, list()) x.append(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x self.finished_bundle_timings.append(bundle_latency) def record_processing_began(self, uuid: str): with self.lock: self.start_per_bundle[uuid] = time.time() def total_in_flight(self) -> int: assert self.lock.locked() total_in_flight = 0 for worker in self.known_workers: total_in_flight += len(self.in_flight_bundles_by_worker[worker]) return total_in_flight def total_idle(self) -> int: assert self.lock.locked() return self.worker_count - self.total_in_flight() def __repr__(self): assert self.lock.locked() ts = time.time() total_finished = len(self.finished_bundle_timings) total_in_flight = self.total_in_flight() ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) ret += ( f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) else: ret += ( f'⏱={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' timings = self.finished_bundle_timings_per_worker.get(worker, []) count = len(timings) qworker = None if count > 1: qworker = numpy.quantile(timings, [0.5, 0.95]) ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' else: ret += '\n' if count > 0: ret += f' ...finished {count} total bundle(s) so far\n' in_flight = len(self.in_flight_bundles_by_worker[worker]) if in_flight > 0: ret += f' ...{in_flight} bundles currently in flight:\n' for bundle_uuid in self.in_flight_bundles_by_worker[worker]: details = self.bundle_details_by_uuid.get(bundle_uuid, None) pid = str(details.pid) if (details and details.pid != 0) else "TBD" if self.start_per_bundle[bundle_uuid] is not None: sec = ts - self.start_per_bundle[bundle_uuid] ret += f' (pid={pid}): {details} for {sec:.1f}s so far ' else: ret += f' {details} setting up / copying data...' sec = 0.0 if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' if details is not None: details.slower_than_local_p95 = True else: if details is not None: details.slower_than_local_p95 = False if qall is not None: if sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: details.slower_than_global_p95 = True else: details.slower_than_global_p95 = False ret += '\n' return ret def periodic_dump(self, total_bundles_submitted: int) -> None: assert self.lock.locked() self.total_bundles_submitted = total_bundles_submitted ts = time.time() if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0: print(self) self.last_periodic_dump = ts class RemoteWorkerSelectionPolicy(ABC): def register_worker_pool(self, workers): self.workers = workers @abstractmethod def is_worker_available(self) -> bool: pass @abstractmethod def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: pass class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False @overrides def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: for x in range(0, worker.count): for y in range(0, worker.weight): grabbag.append(worker) for _ in range(0, 5): random.shuffle(grabbag) worker = grabbag[0] if worker.machine != machine_to_avoid or _ > 2: if worker.count > 0: worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker msg = 'Unexpectedly could not find a worker, retrying...' logger.warning(msg) return None class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def __init__(self) -> None: self.index = 0 @overrides def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False @overrides def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: x = self.index while True: worker = self.workers[x] if worker.count > 0: worker.count -= 1 x += 1 if x >= len(self.workers): x = 0 self.index = x logger.debug(f'Selected worker {worker}') return worker x += 1 if x >= len(self.workers): x = 0 if x == self.index: msg = 'Unexpectedly could not find a worker, retrying...' logger.warning(msg) return None class RemoteExecutor(BaseExecutor): def __init__( self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy ) -> None: super().__init__() self.workers = workers self.policy = policy self.worker_count = 0 for worker in self.workers: self.worker_count += worker.count if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) raise RemoteExecutorException(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() logger.debug( f'Creating {self.worker_count} local threads, one per remote worker.' ) self._helper_executor = fut.ThreadPoolExecutor( thread_name_prefix="remote_executor_helper", max_workers=self.worker_count, ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 self.backup_lock = threading.Lock() self.last_backup = None ( self.heartbeat_thread, self.heartbeat_stop_event, ) = self.run_periodic_heartbeat() @background_thread def run_periodic_heartbeat(self, stop_event) -> None: while not stop_event.is_set(): time.sleep(5.0) logger.debug('Running periodic heartbeat code...') self.heartbeat() logger.debug('Periodic heartbeat thread shutting down.') def heartbeat(self) -> None: with self.status.lock: # Dump regular progress report self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule via executor.submit if config.config['executors_schedule_remote_backups']: self.maybe_schedule_backup_bundles() def maybe_schedule_backup_bundles(self): assert self.status.lock.locked() num_done = len(self.status.finished_bundle_timings) num_idle_workers = self.worker_count - self.task_count now = time.time() if ( num_done > 2 and num_idle_workers > 1 and (self.last_backup is None or (now - self.last_backup > 6.0)) and self.backup_lock.acquire(blocking=False) ): try: assert self.backup_lock.locked() bundle_to_backup = None best_score = None for ( worker, bundle_uuids, ) in self.status.in_flight_bundles_by_worker.items(): # Prefer to schedule backups of bundles running on # slower machines. base_score = 0 for record in self.workers: if worker.machine == record.machine: base_score = float(record.weight) base_score = 1.0 / base_score base_score *= 200.0 base_score = int(base_score) break for uuid in bundle_uuids: bundle = self.status.bundle_details_by_uuid.get(uuid, None) if ( bundle is not None and bundle.src_bundle is None and bundle.backup_bundles is not None ): score = base_score # Schedule backups of bundles running # longer; especially those that are # unexpectedly slow. start_ts = self.status.start_per_bundle[uuid] if start_ts is not None: runtime = now - start_ts score += runtime logger.debug( f'score[{bundle}] => {score} # latency boost' ) if bundle.slower_than_local_p95: score += runtime / 2 logger.debug( f'score[{bundle}] => {score} # >worker p95' ) if bundle.slower_than_global_p95: score += runtime / 4 logger.debug( f'score[{bundle}] => {score} # >global p95' ) # Prefer backups of bundles that don't # have backups already. backup_count = len(bundle.backup_bundles) if backup_count == 0: score *= 2 elif backup_count == 1: score /= 2 elif backup_count == 2: score /= 8 else: score = 0 logger.debug( f'score[{bundle}] => {score} # {backup_count} dup backup factor' ) if score != 0 and ( best_score is None or score > best_score ): bundle_to_backup = bundle assert bundle is not None assert bundle.backup_bundles is not None assert bundle.src_bundle is None best_score = score # Note: this is all still happening on the heartbeat # runner thread. That's ok because # schedule_backup_for_bundle uses the executor to # submit the bundle again which will cause it to be # picked up by a worker thread and allow this thread # to return to run future heartbeats. if bundle_to_backup is not None: self.last_backup = now logger.info( f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====' ) self.schedule_backup_for_bundle(bundle_to_backup) finally: self.backup_lock.release() def is_worker_available(self) -> bool: return self.policy.is_worker_available() def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: return self.policy.acquire_worker(machine_to_avoid) def find_available_worker_or_block( self, machine_to_avoid: str = None ) -> RemoteWorkerRecord: with self.cv: while not self.is_worker_available(): self.cv.wait() worker = self.acquire_worker(machine_to_avoid) if worker is not None: return worker msg = "We should never reach this point in the code" logger.critical(msg) raise Exception(msg) def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None: worker = bundle.worker assert worker is not None logger.debug(f'Released worker {worker}') self.status.record_release_worker( worker, bundle.uuid, was_cancelled, ) with self.cv: worker.count += 1 self.cv.notify() self.adjust_task_count(-1) def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: if bundle.is_cancelled.wait(timeout=0.0): logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.') bundle.was_cancelled = True return True return False def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any: """Find a worker for bundle or block until one is available.""" self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname avoid_machine = override_avoid_machine is_original = bundle.src_bundle is None # Try not to schedule a backup on the same host as the original. if avoid_machine is None and bundle.src_bundle is not None: avoid_machine = bundle.src_bundle.machine worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) assert worker # Ok, found a worker. bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username self.status.record_acquire_worker(worker, uuid) logger.debug(f'{bundle}: Running bundle on {worker}...') # Before we do any work, make sure the bundle is still viable. # It may have been some time between when it was submitted and # now due to lack of worker availability and someone else may # have already finished it. if self.check_if_cancelled(bundle): try: return self.process_work_result(bundle) except Exception as e: logger.warning( f'{bundle}: bundle says it\'s cancelled upfront but no results?!' ) self.release_worker(bundle) if is_original: # Weird. We are the original owner of this # bundle. For it to have been cancelled, a backup # must have already started and completed before # we even for started. Moreover, the backup says # it is done but we can't find the results it # should have copied over. Reschedule the whole # thing. logger.exception(e) logger.error( f'{bundle}: We are the original owner thread and yet there are ' + 'no results for this bundle. This is unexpected and bad.' ) return self.emergency_retry_nasty_bundle(bundle) else: # Expected(?). We're a backup and our bundle is # cancelled before we even got started. Something # went bad in process_work_result (I acutually don't # see what?) but probably not worth worrying # about. Let the original thread worry about # either finding the results or complaining about # it. return None # Send input code / data to worker machine if it's not local. if hostname not in machine: try: cmd = ( f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' ) start_ts = time.time() logger.info(f"{bundle}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.") except Exception as e: self.release_worker(bundle) if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. logger.exception(e) logger.error( f"{bundle}: Failed to send instructions to the worker machine?! " + "This is not expected; we\'re the original bundle so this shouldn\'t " + "be a race condition. Attempting an emergency retry..." ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. msg = f'{bundle}: Failed to send instructions to the worker machine... ' msg += 'We\'re a backup and this may be caused by the original (or some ' msg += 'other backup) already finishing this work. Ignoring this.' logger.warning(msg) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = ( f'{SSH} {bundle.username}@{bundle.machine} ' f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"' ) logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = p.pid logger.debug( f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.' ) return self.wait_for_process(p, bundle, 0) def wait_for_process( self, p: subprocess.Popen, bundle: BundleDetails, depth: int ) -> Any: machine = bundle.machine pid = p.pid if depth > 3: logger.error( f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}." ) p.terminate() self.release_worker(bundle) return self.emergency_retry_nasty_bundle(bundle) # Spin until either the ssh job we scheduled finishes the # bundle or some backup worker signals that they finished it # before we could. while True: try: p.wait(timeout=0.25) except subprocess.TimeoutExpired: if self.check_if_cancelled(bundle): logger.info( f'{bundle}: looks like another worker finished bundle...' ) break else: logger.info(f"{bundle}: pid {pid} ({machine}) is finished!") p = None break # If we get here we believe the bundle is done; either the ssh # subprocess finished (hopefully successfully) or we noticed # that some other worker seems to have completed the bundle # and we're bailing out. try: ret = self.process_work_result(bundle) if ret is not None and p is not None: p.terminate() return ret # Something went wrong; e.g. we could not copy the results # back, cleanup after ourselves on the remote machine, or # unpickle the results we got from the remove machine. If we # still have an active ssh subprocess, keep waiting on it. # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." logger.warning(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.release_worker(bundle) return self.emergency_retry_nasty_bundle(bundle) def process_work_result(self, bundle: BundleDetails) -> Any: with self.status.lock: is_original = bundle.src_bundle is None was_cancelled = bundle.was_cancelled username = bundle.username machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a # remote machine. bundle.end_ts = time.time() if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( f"{bundle}: Fetching results back from {username}@{machine} via {cmd}" ) # If either of these throw they are handled in # wait_for_process. attempts = 0 while True: try: run_silently(cmd) except Exception as e: attempts += 1 if attempts >= 3: raise (e) else: break run_silently( f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"' ) logger.debug( f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.' ) dur = bundle.end_ts - bundle.start_ts self.histogram.add_item(dur) # Only the original worker should unpickle the file contents # though since it's the only one whose result matters. The # original is also the only job that may delete result_file # from disk. Note that the original may have been cancelled # if one of the backups finished first; it still must read the # result from disk. if is_original: logger.debug(f"{bundle}: Unpickling {result_file}.") try: with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: logger.exception(e) msg = f'Failed to load {result_file}... this is bad news.' logger.critical(msg) self.release_worker(bundle) # Re-raise the exception; the code in wait_for_process may # decide to emergency_retry_nasty_bundle here. raise Exception(e) logger.debug(f'Removing local (master) {code_file} and {result_file}.') os.remove(f'{result_file}') os.remove(f'{code_file}') # Notify any backups that the original is done so they # should stop ASAP. Do this whether or not we # finished first since there could be more than one # backup. if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled' ) backup.is_cancelled.set() # This is a backup job and, by now, we have already fetched # the bundle results. else: # Backup results don't matter, they just need to leave the # result file in the right place for their originals to # read/unpickle later. result = None # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' ) bundle.src_bundle.is_cancelled.set() self.release_worker(bundle, was_cancelled=was_cancelled) return result def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid uuid = generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' logger.debug(f'Writing pickled code to {code_file}') with open(f'{code_file}', 'wb') as wb: wb.write(pickle) bundle = BundleDetails( pickled_code=pickle, uuid=uuid, fname=fname, worker=None, username=None, machine=None, hostname=platform.node(), code_file=code_file, result_file=result_file, pid=0, start_ts=time.time(), end_ts=0.0, slower_than_local_p95=False, slower_than_global_p95=False, src_bundle=None, is_cancelled=threading.Event(), was_cancelled=False, backup_bundles=[], failure_count=0, ) self.status.record_bundle_details(bundle) logger.debug(f'{bundle}: Created an original bundle') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): assert src_bundle.backup_bundles is not None n = len(src_bundle.backup_bundles) uuid = src_bundle.uuid + f'_backup#{n}' backup_bundle = BundleDetails( pickled_code=src_bundle.pickled_code, uuid=uuid, fname=src_bundle.fname, worker=None, username=None, machine=None, hostname=src_bundle.hostname, code_file=src_bundle.code_file, result_file=src_bundle.result_file, pid=0, start_ts=time.time(), end_ts=0.0, slower_than_local_p95=False, slower_than_global_p95=False, src_bundle=src_bundle, is_cancelled=threading.Event(), was_cancelled=False, backup_bundles=None, # backup backups not allowed failure_count=0, ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) logger.debug(f'{backup_bundle}: Created a backup bundle') return backup_bundle def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() assert src_bundle is not None backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...' ) self._helper_executor.submit(self.launch, backup_bundle) # Results from backups don't matter; if they finish first # they will move the result_file to this machine and let # the original pick them up and unpickle them. def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future: is_original = bundle.src_bundle is None bundle.worker = None avoid_last_machine = bundle.machine bundle.machine = None bundle.username = None bundle.failure_count += 1 if is_original: retry_limit = 3 else: retry_limit = 2 if bundle.failure_count > retry_limit: logger.error( f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: raise RemoteExecutorException( f'{bundle}: This bundle can\'t be completed despite several backups and retries' ) else: logger.error( f'{bundle}: At least it\'s only a backup; better luck with the others.' ) return None else: msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' logger.warning(msg) warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pickle = make_cloud_pickle(function, *args, **kwargs) bundle = self.create_original_bundle(pickle, function.__name__) self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) @overrides def shutdown(self, wait=True) -> None: logging.debug(f'Shutting down RemoteExecutor {self.title}') self.heartbeat_stop_event.set() self.heartbeat_thread.join() self._helper_executor.shutdown(wait) print(self.histogram) @singleton class DefaultExecutors(object): def __init__(self): self.thread_executor: Optional[ThreadExecutor] = None self.process_executor: Optional[ProcessExecutor] = None self.remote_executor: Optional[RemoteExecutor] = None def ping(self, host) -> bool: logger.debug(f'RUN> ping -c 1 {host}') try: x = cmd_with_timeout( f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0 ) return x == 0 except Exception: return False def thread_pool(self) -> ThreadExecutor: if self.thread_executor is None: self.thread_executor = ThreadExecutor() return self.thread_executor def process_pool(self) -> ProcessExecutor: if self.process_executor is None: self.process_executor = ProcessExecutor() return self.process_executor def remote_pool(self) -> RemoteExecutor: if self.remote_executor is None: logger.info('Looking for some helper machines...') pool: List[RemoteWorkerRecord] = [] if self.ping('cheetah.house'): logger.info('Found cheetah.house') pool.append( RemoteWorkerRecord( username='scott', machine='cheetah.house', weight=25, count=6, ), ) if self.ping('meerkat.cabin'): logger.info('Found meerkat.cabin') pool.append( RemoteWorkerRecord( username='scott', machine='meerkat.cabin', weight=12, count=2, ), ) if self.ping('wannabe.house'): logger.info('Found wannabe.house') pool.append( RemoteWorkerRecord( username='scott', machine='wannabe.house', weight=30, count=10, ), ) if self.ping('puma.cabin'): logger.info('Found puma.cabin') pool.append( RemoteWorkerRecord( username='scott', machine='puma.cabin', weight=25, count=6, ), ) if self.ping('backup.house'): logger.info('Found backup.house') pool.append( RemoteWorkerRecord( username='scott', machine='backup.house', weight=7, count=2, ), ) # The controller machine has a lot to do; go easy on it. for record in pool: if record.machine == platform.node() and record.count > 1: logger.info(f'Reducing workload for {record.machine}.') record.count = 1 policy = WeightedRandomRemoteWorkerSelectionPolicy() policy.register_worker_pool(pool) self.remote_executor = RemoteExecutor(pool, policy) return self.remote_executor def shutdown(self) -> None: if self.thread_executor is not None: self.thread_executor.shutdown() self.thread_executor = None if self.process_executor is not None: self.process_executor.shutdown() self.process_executor = None if self.remote_executor is not None: self.remote_executor.shutdown() self.remote_executor = None