#!/usr/bin/env python3 from __future__ import annotations from abc import ABC, abstractmethod import concurrent.futures as fut from collections import defaultdict from dataclasses import dataclass import logging import numpy import os import platform import random import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore from overrides import overrides from ansi import bg, fg, underline, reset import argparse_utils import config from exec_utils import run_silently, cmd_in_background from decorator_utils import singleton import histogram as hist logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Executors ({__file__})", "Args related to processing executors." ) parser.add_argument( '--executors_threadpool_size', type=int, metavar='#THREADS', help='Number of threads in the default threadpool, leave unset for default', default=None ) parser.add_argument( '--executors_processpool_size', type=int, metavar='#PROCESSES', help='Number of processes in the default processpool, leave unset for default', default=None, ) parser.add_argument( '--executors_schedule_remote_backups', default=True, action=argparse_utils.ActionNoYes, help='Should we schedule duplicative backup work if a remote bundle is slow', ) parser.add_argument( '--executors_max_bundle_failures', type=int, default=3, metavar='#FAILURES', help='Maximum number of failures before giving up on a bundle', ) RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z' SSH = 'ssh -oForwardX11=no' def make_cloud_pickle(fun, *args, **kwargs): logger.debug(f"Making cloudpickled bundle at {fun.__name__}") return cloudpickle.dumps((fun, args, kwargs)) class BaseExecutor(ABC): def __init__(self, *, title=''): self.title = title self.task_count = 0 self.histogram = hist.SimpleHistogram( hist.SimpleHistogram.n_evenly_spaced_buckets( int(0), int(500), 50 ) ) @abstractmethod def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pass @abstractmethod def shutdown(self, wait: bool = True) -> None: pass def adjust_task_count(self, delta: int) -> None: self.task_count += delta logger.debug(f'Executor current task count is {self.task_count}') class ThreadExecutor(BaseExecutor): def __init__(self, max_workers: Optional[int] = None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_threadpool_size' in config.config: workers = config.config['executors_threadpool_size'] logger.debug(f'Creating threadpool executor with {workers} workers') self._thread_pool_executor = fut.ThreadPoolExecutor( max_workers=workers, thread_name_prefix="thread_executor_helper" ) def run_local_bundle(self, fun, *args, **kwargs): logger.debug(f"Running local bundle at {fun.__name__}") start = time.time() result = fun(*args, **kwargs) end = time.time() self.adjust_task_count(-1) duration = end - start logger.debug(f"{fun.__name__} finished; used {duration:.1f}s") self.histogram.add_item(duration) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: self.adjust_task_count(+1) newargs = [] newargs.append(function) for arg in args: newargs.append(arg) return self._thread_pool_executor.submit( self.run_local_bundle, *newargs, **kwargs) @overrides def shutdown(self, wait = True) -> None: logger.debug(f'Shutting down threadpool executor {self.title}') print(self.histogram) self._thread_pool_executor.shutdown(wait) class ProcessExecutor(BaseExecutor): def __init__(self, max_workers=None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_processpool_size' in config.config: workers = config.config['executors_processpool_size'] logger.debug(f'Creating processpool executor with {workers} workers.') self._process_executor = fut.ProcessPoolExecutor( max_workers=workers, ) def run_cloud_pickle(self, pickle): fun, args, kwargs = cloudpickle.loads(pickle) logger.debug(f"Running pickled bundle at {fun.__name__}") result = fun(*args, **kwargs) self.adjust_task_count(-1) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: start = time.time() self.adjust_task_count(+1) pickle = make_cloud_pickle(function, *args, **kwargs) result = self._process_executor.submit( self.run_cloud_pickle, pickle ) result.add_done_callback( lambda _: self.histogram.add_item( time.time() - start ) ) return result @overrides def shutdown(self, wait=True) -> None: logger.debug(f'Shutting down processpool executor {self.title}') self._process_executor.shutdown(wait) print(self.histogram) def __getstate__(self): state = self.__dict__.copy() state['_process_executor'] = None return state @dataclass class RemoteWorkerRecord: username: str machine: str weight: int count: int def __hash__(self): return hash((self.username, self.machine)) def __repr__(self): return f'{self.username}@{self.machine}' @dataclass class BundleDetails: pickled_code: bytes uuid: str fname: str worker: Optional[RemoteWorkerRecord] username: Optional[str] machine: Optional[str] hostname: str code_file: str result_file: str pid: int start_ts: float end_ts: float too_slow: bool super_slow: bool src_bundle: BundleDetails is_cancelled: threading.Event was_cancelled: bool backup_bundles: Optional[List[BundleDetails]] failure_count: int class RemoteExecutorStatus: def __init__(self, total_worker_count: int) -> None: self.worker_count = total_worker_count self.known_workers: Set[RemoteWorkerRecord] = set() self.start_per_bundle: Dict[str, float] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[ RemoteWorkerRecord, List[float] ] = {} self.in_flight_bundles_by_worker: Dict[ RemoteWorkerRecord, Set[str] ] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} self.finished_bundle_timings: List[float] = [] self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted = 0 # Protects reads and modification using self. Also used # as a memory fence for modifications to bundle. self.lock = threading.Lock() def record_acquire_worker( self, worker: RemoteWorkerRecord, uuid: str ) -> None: with self.lock: self.record_acquire_worker_already_locked( worker, uuid ) def record_acquire_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str ) -> None: assert self.lock.locked() self.known_workers.add(worker) self.start_per_bundle[uuid] = None x = self.in_flight_bundles_by_worker.get(worker, set()) x.add(uuid) self.in_flight_bundles_by_worker[worker] = x def record_bundle_details( self, details: BundleDetails) -> None: with self.lock: self.record_bundle_details_already_locked(details) def record_bundle_details_already_locked( self, details: BundleDetails) -> None: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details def record_release_worker( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: with self.lock: self.record_release_worker_already_locked( worker, uuid, was_cancelled ) def record_release_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: assert self.lock.locked() ts = time.time() self.end_per_bundle[uuid] = ts self.in_flight_bundles_by_worker[worker].remove(uuid) if not was_cancelled: bundle_latency = ts - self.start_per_bundle[uuid] x = self.finished_bundle_timings_per_worker.get(worker, list()) x.append(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x self.finished_bundle_timings.append(bundle_latency) def record_processing_began(self, uuid: str): with self.lock: self.start_per_bundle[uuid] = time.time() def total_in_flight(self) -> int: assert self.lock.locked() total_in_flight = 0 for worker in self.known_workers: total_in_flight += len(self.in_flight_bundles_by_worker[worker]) return total_in_flight def total_idle(self) -> int: assert self.lock.locked() return self.worker_count - self.total_in_flight() def __repr__(self): assert self.lock.locked() ts = time.time() total_finished = len(self.finished_bundle_timings) total_in_flight = self.total_in_flight() ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) ret += ( f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) else: ret += ( f' ✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' timings = self.finished_bundle_timings_per_worker.get(worker, []) count = len(timings) qworker = None if count > 1: qworker = numpy.quantile(timings, [0.5, 0.95]) ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' else: ret += '\n' if count > 0: ret += f' ...finished {count} total bundle(s) so far\n' in_flight = len(self.in_flight_bundles_by_worker[worker]) if in_flight > 0: ret += f' ...{in_flight} bundles currently in flight:\n' for bundle_uuid in self.in_flight_bundles_by_worker[worker]: details = self.bundle_details_by_uuid.get( bundle_uuid, None ) pid = str(details.pid) if details is not None else "TBD" if self.start_per_bundle[bundle_uuid] is not None: sec = ts - self.start_per_bundle[bundle_uuid] ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far ' else: ret += f' {bundle_uuid} setting up / copying data...' sec = 0.0 if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' elif sec > qworker[0]: ret += f'{fg("red")}>💻p50{reset()} ' if qall is not None: if sec > qall[1] * 1.5: ret += f'{bg("red")}!!!{reset()}' if details is not None: logger.debug(f'Flagging {details.uuid} for another backup') details.super_slow = True elif sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: logger.debug(f'Flagging {details.uuid} for a backup') details.too_slow = True elif sec > qall[0]: ret += f'{fg("red")}>∀p50{reset()}' ret += '\n' return ret def periodic_dump(self, total_bundles_submitted: int) -> None: assert self.lock.locked() self.total_bundles_submitted = total_bundles_submitted ts = time.time() if ( self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0 ): print(self) self.last_periodic_dump = ts class RemoteWorkerSelectionPolicy(ABC): def register_worker_pool(self, workers): random.seed() self.workers = workers @abstractmethod def is_worker_available(self) -> bool: pass @abstractmethod def acquire_worker( self, machine_to_avoid = None ) -> Optional[RemoteWorkerRecord]: pass class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False def acquire_worker( self, machine_to_avoid = None ) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: for x in range(0, worker.count): for y in range(0, worker.weight): grabbag.append(worker) for _ in range(0, 5): random.shuffle(grabbag) worker = grabbag[0] if worker.machine != machine_to_avoid or _ > 2: if worker.count > 0: worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker logger.warning("Couldn't find a worker; go fish.") return None class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def __init__(self) -> None: self.index = 0 def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: x = self.index while True: worker = self.workers[x] if worker.count > 0: worker.count -= 1 x += 1 if x >= len(self.workers): x = 0 self.index = x logger.debug(f'Selected worker {worker}') return worker x += 1 if x >= len(self.workers): x = 0 if x == self.index: logger.warning("Couldn't find a worker; go fish.") return None class RemoteExecutor(BaseExecutor): def __init__(self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy) -> None: super().__init__() self.workers = workers self.policy = policy self.worker_count = 0 for worker in self.workers: self.worker_count += worker.count if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) raise Exception(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.') self._helper_executor = fut.ThreadPoolExecutor( thread_name_prefix="remote_executor_helper", max_workers=self.worker_count, ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 logger.debug( f'Creating remote processpool with {self.worker_count} remote worker threads.' ) def is_worker_available(self) -> bool: return self.policy.is_worker_available() def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: return self.policy.acquire_worker(machine_to_avoid) def find_available_worker_or_block( self, machine_to_avoid: str = None ) -> RemoteWorkerRecord: with self.cv: while not self.is_worker_available(): self.cv.wait() worker = self.acquire_worker(machine_to_avoid) if worker is not None: return worker msg = "We should never reach this point in the code" logger.critical(msg) raise Exception(msg) def release_worker(self, worker: RemoteWorkerRecord) -> None: logger.debug(f'Released worker {worker}') with self.cv: worker.count += 1 self.cv.notify() def heartbeat(self) -> None: with self.status.lock: # Regular progress report self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule num_done = len(self.status.finished_bundle_timings) if num_done > 7 or (num_done > 5 and self.is_worker_available()): for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): for uuid in bundle_uuids: bundle = self.status.bundle_details_by_uuid.get(uuid, None) if ( bundle is not None and bundle.too_slow and bundle.src_bundle is None and config.config['executors_schedule_remote_backups'] ): self.consider_backup_for_bundle(bundle) def consider_backup_for_bundle(self, bundle: BundleDetails) -> None: assert self.status.lock.locked() if ( bundle.too_slow and len(bundle.backup_bundles) == 0 # one backup per ): msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return elif ( bundle.super_slow and len(bundle.backup_bundles) < 2 # two backups in dire situations and self.status.total_idle() > 4 ): msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: if bundle.is_cancelled.wait(timeout=0.0): logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.') bundle.was_cancelled = True return True return False def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any: """Find a worker for bundle or block until one is available.""" self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname avoid_machine = override_avoid_machine is_original = bundle.src_bundle is None # Try not to schedule a backup on the same host as the original. if avoid_machine is None and bundle.src_bundle is not None: avoid_machine = bundle.src_bundle.machine worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) # Ok, found a worker. bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username fname = bundle.fname self.status.record_acquire_worker(worker, uuid) logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...') # Before we do any work, make sure the bundle is still viable. if self.check_if_cancelled(bundle): try: return self.post_launch_work(bundle) except Exception as e: logger.exception(e) logger.error( f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!' ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, bundle.uuid, True, ) self.release_worker(bundle.worker) self.adjust_task_count(-1) if is_original: # Weird. We are the original owner of this # bundle. For it to have been cancelled, a backup # must have already started and completed before # we even for started. Moreover, the backup says # it is done but we can't find the results it # should have copied over. Reschedule the whole # thing. return self.emergency_retry_nasty_bundle(bundle) else: # Expected(?). We're a backup and our bundle is # cancelled before we even got started. Something # went bad in post_launch_work (I acutually don't # see what?) but probably not worth worrying # about. return None # Send input code / data to worker machine if it's not local. if hostname not in machine: try: cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.") except Exception as e: logger.exception(e) logger.error( f'{uuid}/{fname}: failed to send instructions to worker machine?!?' ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, bundle.uuid, True, ) self.release_worker(bundle.worker) self.adjust_task_count(-1) if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' f'"source py39-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.') return self.wait_for_process(p, bundle, 0) def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any: uuid = bundle.uuid machine = bundle.machine fname = bundle.fname pid = p.pid if depth > 3: logger.error( f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}." ) p.terminate() self.status.record_release_worker( bundle.worker, bundle.uuid, True, ) self.release_worker(bundle.worker) self.adjust_task_count(-1) return self.emergency_retry_nasty_bundle(bundle) # Spin until either the ssh job we scheduled finishes the # bundle or some backup worker signals that they finished it # before we could. while True: try: p.wait(timeout=0.25) except subprocess.TimeoutExpired: self.heartbeat() if self.check_if_cancelled(bundle): logger.info( f'{uuid}/{fname}: another worker finished bundle, checking it out...' ) break else: logger.info( f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..." ) p = None break # If we get here we believe the bundle is done; either the ssh # subprocess finished (hopefully successfully) or we noticed # that some other worker seems to have completed the bundle # and we're bailing out. try: ret = self.post_launch_work(bundle) if ret is not None and p is not None: p.terminate() return ret # Something went wrong; e.g. we could not copy the results # back, cleanup after ourselves on the remote machine, or # unpickle the results we got from the remove machine. If we # still have an active ssh subprocess, keep waiting on it. # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) logger.error(f'{uuid}/{fname}: Something unexpected just happened...') if p is not None: logger.warning( f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." ) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( bundle.worker, bundle.uuid, True, ) self.release_worker(bundle.worker) self.adjust_task_count(-1) return self.emergency_retry_nasty_bundle(bundle) def post_launch_work(self, bundle: BundleDetails) -> Any: with self.status.lock: is_original = bundle.src_bundle is None was_cancelled = bundle.was_cancelled username = bundle.username machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file fname = bundle.fname uuid = bundle.uuid # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a # remote machine. bundle.end_ts = time.time() if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}" ) # If either of these throw they are handled in # wait_for_process. run_silently(cmd) run_silently(f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"') dur = bundle.end_ts - bundle.start_ts self.histogram.add_item(dur) # Only the original worker should unpickle the file contents # though since it's the only one whose result matters. The # original is also the only job that may delete result_file # from disk. Note that the original may have been cancelled # if one of the backups finished first; it still must read the # result from disk. if is_original: logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.") try: with open(f'{result_file}', 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: msg = f'Failed to load {result_file}, this is bad news.' logger.critical(msg) self.status.record_release_worker( bundle.worker, bundle.uuid, True, ) self.release_worker(bundle.worker) # Re-raise the exception; the code in wait_for_process may # decide to emergency_retry_nasty_bundle here. raise Exception(e) logger.debug( f'Removing local (master) {code_file} and {result_file}.' ) os.remove(f'{result_file}') os.remove(f'{code_file}') # Notify any backups that the original is done so they # should stop ASAP. Do this whether or not we # finished first since there could be more than one # backup. if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled' ) backup.is_cancelled.set() # This is a backup job and, by now, we have already fetched # the bundle results. else: # Backup results don't matter, they just need to leave the # result file in the right place for their originals to # read/unpickle later. result = None # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.' ) bundle.src_bundle.is_cancelled.set() assert bundle.worker is not None self.status.record_release_worker( bundle.worker, bundle.uuid, was_cancelled, ) self.release_worker(bundle.worker) self.adjust_task_count(-1) return result def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid uuid = generate_uuid(as_hex=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' logger.debug(f'Writing pickled code to {code_file}') with open(f'{code_file}', 'wb') as wb: wb.write(pickle) bundle = BundleDetails( pickled_code = pickle, uuid = uuid, fname = fname, worker = None, username = None, machine = None, hostname = platform.node(), code_file = code_file, result_file = result_file, pid = 0, start_ts = time.time(), end_ts = 0.0, too_slow = False, super_slow = False, src_bundle = None, is_cancelled = threading.Event(), was_cancelled = False, backup_bundles = [], failure_count = 0, ) self.status.record_bundle_details(bundle) logger.debug(f'{uuid}/{fname}: Created original bundle') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): assert src_bundle.backup_bundles is not None n = len(src_bundle.backup_bundles) uuid = src_bundle.uuid + f'_backup#{n}' backup_bundle = BundleDetails( pickled_code = src_bundle.pickled_code, uuid = uuid, fname = src_bundle.fname, worker = None, username = None, machine = None, hostname = src_bundle.hostname, code_file = src_bundle.code_file, result_file = src_bundle.result_file, pid = 0, start_ts = time.time(), end_ts = 0.0, too_slow = False, super_slow = False, src_bundle = src_bundle, is_cancelled = threading.Event(), was_cancelled = False, backup_bundles = None, # backup backups not allowed failure_count = 0, ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle') return backup_bundle def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...' ) self._helper_executor.submit(self.launch, backup_bundle) # Results from backups don't matter; if they finish first # they will move the result_file to this machine and let # the original pick them up and unpickle them. def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future: uuid = bundle.uuid is_original = bundle.src_bundle is None bundle.worker = None avoid_last_machine = bundle.machine bundle.machine = None bundle.username = None bundle.failure_count += 1 if is_original: retry_limit = 3 else: retry_limit = 2 if bundle.failure_count > retry_limit: logger.error( f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: logger.critical( f'{uuid}: This is the original of the bundle; results will be incomplete.' ) else: logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.') return None else: logger.warning( f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<' ) return self.launch(bundle, avoid_last_machine) @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pickle = make_cloud_pickle(function, *args, **kwargs) bundle = self.create_original_bundle(pickle, function.__name__) self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) @overrides def shutdown(self, wait=True) -> None: self._helper_executor.shutdown(wait) logging.debug(f'Shutting down RemoteExecutor {self.title}') print(self.histogram) @singleton class DefaultExecutors(object): def __init__(self): self.thread_executor: Optional[ThreadExecutor] = None self.process_executor: Optional[ProcessExecutor] = None self.remote_executor: Optional[RemoteExecutor] = None def ping(self, host) -> bool: logger.debug(f'RUN> ping -c 1 {host}') command = ['ping', '-c', '1', host] return subprocess.call( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) == 0 def thread_pool(self) -> ThreadExecutor: if self.thread_executor is None: self.thread_executor = ThreadExecutor() return self.thread_executor def process_pool(self) -> ProcessExecutor: if self.process_executor is None: self.process_executor = ProcessExecutor() return self.process_executor def remote_pool(self) -> RemoteExecutor: if self.remote_executor is None: logger.info('Looking for some helper machines...') pool: List[RemoteWorkerRecord] = [] if self.ping('cheetah.house'): logger.info('Found cheetah.house') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', weight = 12, count = 4, ), ) if self.ping('video.house'): logger.info('Found video.house') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'video.house', weight = 1, count = 4, ), ) if self.ping('wannabe.house'): logger.info('Found wannabe.house') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'wannabe.house', weight = 2, count = 4, ), ) if self.ping('meerkat.cabin'): logger.info('Found meerkat.cabin') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'meerkat.cabin', weight = 5, count = 2, ), ) if self.ping('backup.house'): logger.info('Found backup.house') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'backup.house', weight = 1, count = 4, ), ) if self.ping('kiosk.house'): logger.info('Found kiosk.house') pool.append( RemoteWorkerRecord( username = 'pi', machine = 'kiosk.house', weight = 1, count = 2, ), ) if self.ping('puma.cabin'): logger.info('Found puma.cabin') pool.append( RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', weight = 12, count = 4, ), ) # The controller machine has a lot to do; go easy on it. for record in pool: if record.machine == platform.node() and record.count > 1: logger.info(f'Reducing workload for {record.machine}.') record.count = 1 policy = WeightedRandomRemoteWorkerSelectionPolicy() policy.register_worker_pool(pool) self.remote_executor = RemoteExecutor(pool, policy) return self.remote_executor