#!/usr/bin/env python3 # -*- coding: utf-8 -*- # © Copyright 2021-2022, Scott Gasch """Defines three executors: a thread executor for doing work using a threadpool, a process executor for doing work in other processes on the same machine and a remote executor for farming out work to other machines. Also defines DefaultExecutors which is a container for references to global executors / worker pools with automatic shutdown semantics.""" from __future__ import annotations import concurrent.futures as fut import logging import os import platform import random import subprocess import threading import time import warnings from abc import ABC, abstractmethod from collections import defaultdict from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore import numpy from overrides import overrides import argparse_utils import config import histogram as hist import string_utils from ansi import bg, fg, reset, underline from decorator_utils import singleton from exec_utils import cmd_in_background, cmd_with_timeout, run_silently from thread_utils import background_thread logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Executors ({__file__})", "Args related to processing executors." ) parser.add_argument( '--executors_threadpool_size', type=int, metavar='#THREADS', help='Number of threads in the default threadpool, leave unset for default', default=None, ) parser.add_argument( '--executors_processpool_size', type=int, metavar='#PROCESSES', help='Number of processes in the default processpool, leave unset for default', default=None, ) parser.add_argument( '--executors_schedule_remote_backups', default=True, action=argparse_utils.ActionNoYes, help='Should we schedule duplicative backup work if a remote bundle is slow', ) parser.add_argument( '--executors_max_bundle_failures', type=int, default=3, metavar='#FAILURES', help='Maximum number of failures before giving up on a bundle', ) SSH = '/usr/bin/ssh -oForwardX11=no' SCP = '/usr/bin/scp -C' def make_cloud_pickle(fun, *args, **kwargs): logger.debug("Making cloudpickled bundle at %s", fun.__name__) return cloudpickle.dumps((fun, args, kwargs)) class BaseExecutor(ABC): """The base executor interface definition.""" def __init__(self, *, title=''): self.title = title self.histogram = hist.SimpleHistogram( hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50) ) self.task_count = 0 @abstractmethod def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pass @abstractmethod def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: pass def shutdown_if_idle(self, *, quiet: bool = False) -> bool: """Shutdown the executor and return True if the executor is idle (i.e. there are no pending or active tasks). Return False otherwise. Note: this should only be called by the launcher process. """ if self.task_count == 0: self.shutdown(wait=True, quiet=quiet) return True return False def adjust_task_count(self, delta: int) -> None: """Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine. """ self.task_count += delta logger.debug('Adjusted task count by %d to %d.', delta, self.task_count) def get_task_count(self) -> int: """Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine. """ return self.task_count class ThreadExecutor(BaseExecutor): """A threadpool executor instance.""" def __init__(self, max_workers: Optional[int] = None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_threadpool_size' in config.config: workers = config.config['executors_threadpool_size'] logger.debug('Creating threadpool executor with %d workers', workers) self._thread_pool_executor = fut.ThreadPoolExecutor( max_workers=workers, thread_name_prefix="thread_executor_helper" ) self.already_shutdown = False # This is run on a different thread; do not adjust task count here. @staticmethod def run_local_bundle(fun, *args, **kwargs): logger.debug("Running local bundle at %s", fun.__name__) result = fun(*args, **kwargs) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: if self.already_shutdown: raise Exception('Submitted work after shutdown.') self.adjust_task_count(+1) newargs = [] newargs.append(function) for arg in args: newargs.append(arg) start = time.time() result = self._thread_pool_executor.submit( ThreadExecutor.run_local_bundle, *newargs, **kwargs ) result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start)) result.add_done_callback(lambda _: self.adjust_task_count(-1)) return result @overrides def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logger.debug('Shutting down threadpool executor %s', self.title) self._thread_pool_executor.shutdown(wait) if not quiet: print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True class ProcessExecutor(BaseExecutor): """A processpool executor.""" def __init__(self, max_workers=None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_processpool_size' in config.config: workers = config.config['executors_processpool_size'] logger.debug('Creating processpool executor with %d workers.', workers) self._process_executor = fut.ProcessPoolExecutor( max_workers=workers, ) self.already_shutdown = False # This is run in another process; do not adjust task count here. @staticmethod def run_cloud_pickle(pickle): fun, args, kwargs = cloudpickle.loads(pickle) logger.debug("Running pickled bundle at %s", fun.__name__) result = fun(*args, **kwargs) return result @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: if self.already_shutdown: raise Exception('Submitted work after shutdown.') start = time.time() self.adjust_task_count(+1) pickle = make_cloud_pickle(function, *args, **kwargs) result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle) result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start)) result.add_done_callback(lambda _: self.adjust_task_count(-1)) return result @overrides def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logger.debug('Shutting down processpool executor %s', self.title) self._process_executor.shutdown(wait) if not quiet: print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True def __getstate__(self): state = self.__dict__.copy() state['_process_executor'] = None return state class RemoteExecutorException(Exception): """Thrown when a bundle cannot be executed despite several retries.""" pass @dataclass class RemoteWorkerRecord: """A record of info about a remote worker.""" username: str machine: str weight: int count: int def __hash__(self): return hash((self.username, self.machine)) def __repr__(self): return f'{self.username}@{self.machine}' @dataclass class BundleDetails: """All info necessary to define some unit of work that needs to be done, where it is being run, its state, whether it is an original bundle of a backup bundle, how many times it has failed, etc... """ pickled_code: bytes uuid: str fname: str worker: Optional[RemoteWorkerRecord] username: Optional[str] machine: Optional[str] hostname: str code_file: str result_file: str pid: int start_ts: float end_ts: float slower_than_local_p95: bool slower_than_global_p95: bool src_bundle: Optional[BundleDetails] is_cancelled: threading.Event was_cancelled: bool backup_bundles: Optional[List[BundleDetails]] failure_count: int def __repr__(self): uuid = self.uuid if uuid[-9:-2] == '_backup': uuid = uuid[:-9] suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}' else: suffix = uuid[-6:] colorz = [ fg('violet red'), fg('red'), fg('orange'), fg('peach orange'), fg('yellow'), fg('marigold yellow'), fg('green yellow'), fg('tea green'), fg('cornflower blue'), fg('turquoise blue'), fg('tropical blue'), fg('lavender purple'), fg('medium purple'), ] c = colorz[int(uuid[-2:], 16) % len(colorz)] fname = self.fname if self.fname is not None else 'nofname' machine = self.machine if self.machine is not None else 'nomachine' return f'{c}{suffix}/{fname}/{machine}{reset()}' class RemoteExecutorStatus: """A status 'scoreboard' for a remote executor.""" def __init__(self, total_worker_count: int) -> None: self.worker_count: int = total_worker_count self.known_workers: Set[RemoteWorkerRecord] = set() self.start_time: float = time.time() self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {} self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} self.finished_bundle_timings: List[float] = [] self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted: int = 0 # Protects reads and modification using self. Also used # as a memory fence for modifications to bundle. self.lock: threading.Lock = threading.Lock() def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None: with self.lock: self.record_acquire_worker_already_locked(worker, uuid) def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None: assert self.lock.locked() self.known_workers.add(worker) self.start_per_bundle[uuid] = None x = self.in_flight_bundles_by_worker.get(worker, set()) x.add(uuid) self.in_flight_bundles_by_worker[worker] = x def record_bundle_details(self, details: BundleDetails) -> None: with self.lock: self.record_bundle_details_already_locked(details) def record_bundle_details_already_locked(self, details: BundleDetails) -> None: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details def record_release_worker( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: with self.lock: self.record_release_worker_already_locked(worker, uuid, was_cancelled) def record_release_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: assert self.lock.locked() ts = time.time() self.end_per_bundle[uuid] = ts self.in_flight_bundles_by_worker[worker].remove(uuid) if not was_cancelled: start = self.start_per_bundle[uuid] assert start is not None bundle_latency = ts - start x = self.finished_bundle_timings_per_worker.get(worker, []) x.append(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x self.finished_bundle_timings.append(bundle_latency) def record_processing_began(self, uuid: str): with self.lock: self.start_per_bundle[uuid] = time.time() def total_in_flight(self) -> int: assert self.lock.locked() total_in_flight = 0 for worker in self.known_workers: total_in_flight += len(self.in_flight_bundles_by_worker[worker]) return total_in_flight def total_idle(self) -> int: assert self.lock.locked() return self.worker_count - self.total_in_flight() def __repr__(self): assert self.lock.locked() ts = time.time() total_finished = len(self.finished_bundle_timings) total_in_flight = self.total_in_flight() ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) ret += ( f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) else: ret += ( f'⏱={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' timings = self.finished_bundle_timings_per_worker.get(worker, []) count = len(timings) qworker = None if count > 1: qworker = numpy.quantile(timings, [0.5, 0.95]) ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' else: ret += '\n' if count > 0: ret += f' ...finished {count} total bundle(s) so far\n' in_flight = len(self.in_flight_bundles_by_worker[worker]) if in_flight > 0: ret += f' ...{in_flight} bundles currently in flight:\n' for bundle_uuid in self.in_flight_bundles_by_worker[worker]: details = self.bundle_details_by_uuid.get(bundle_uuid, None) pid = str(details.pid) if (details and details.pid != 0) else "TBD" if self.start_per_bundle[bundle_uuid] is not None: sec = ts - self.start_per_bundle[bundle_uuid] ret += f' (pid={pid}): {details} for {sec:.1f}s so far ' else: ret += f' {details} setting up / copying data...' sec = 0.0 if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' if details is not None: details.slower_than_local_p95 = True else: if details is not None: details.slower_than_local_p95 = False if qall is not None: if sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: details.slower_than_global_p95 = True else: details.slower_than_global_p95 = False ret += '\n' return ret def periodic_dump(self, total_bundles_submitted: int) -> None: assert self.lock.locked() self.total_bundles_submitted = total_bundles_submitted ts = time.time() if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0: print(self) self.last_periodic_dump = ts class RemoteWorkerSelectionPolicy(ABC): """A policy for selecting a remote worker base class.""" def __init__(self): self.workers: Optional[List[RemoteWorkerRecord]] = None def register_worker_pool(self, workers: List[RemoteWorkerRecord]): self.workers = workers @abstractmethod def is_worker_available(self) -> bool: pass @abstractmethod def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: pass class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): """A remote worker selector that uses weighted RNG.""" @overrides def is_worker_available(self) -> bool: if self.workers: for worker in self.workers: if worker.count > 0: return True return False @overrides def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: grabbag = [] if self.workers: for worker in self.workers: if worker.machine != machine_to_avoid: if worker.count > 0: for _ in range(worker.count * worker.weight): grabbag.append(worker) if len(grabbag) == 0: logger.debug('There are no available workers that avoid %s', machine_to_avoid) if self.workers: for worker in self.workers: if worker.count > 0: for _ in range(worker.count * worker.weight): grabbag.append(worker) if len(grabbag) == 0: logger.warning('There are no available workers?!') return None worker = random.sample(grabbag, 1)[0] assert worker.count > 0 worker.count -= 1 logger.debug('Selected worker %s', worker) return worker class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): """A remote worker selector that just round robins.""" def __init__(self) -> None: super().__init__() self.index = 0 @overrides def is_worker_available(self) -> bool: if self.workers: for worker in self.workers: if worker.count > 0: return True return False @overrides def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]: if self.workers: x = self.index while True: worker = self.workers[x] if worker.count > 0: worker.count -= 1 x += 1 if x >= len(self.workers): x = 0 self.index = x logger.debug('Selected worker %s', worker) return worker x += 1 if x >= len(self.workers): x = 0 if x == self.index: logger.warning('Unexpectedly could not find a worker, retrying...') return None return None class RemoteExecutor(BaseExecutor): """A remote work executor.""" def __init__( self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy, ) -> None: super().__init__() self.workers = workers self.policy = policy self.worker_count = 0 for worker in self.workers: self.worker_count += worker.count if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) raise RemoteExecutorException(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() logger.debug('Creating %d local threads, one per remote worker.', self.worker_count) self._helper_executor = fut.ThreadPoolExecutor( thread_name_prefix="remote_executor_helper", max_workers=self.worker_count, ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 self.backup_lock = threading.Lock() self.last_backup = None ( self.heartbeat_thread, self.heartbeat_stop_event, ) = self.run_periodic_heartbeat() self.already_shutdown = False @background_thread def run_periodic_heartbeat(self, stop_event: threading.Event) -> None: while not stop_event.is_set(): time.sleep(5.0) logger.debug('Running periodic heartbeat code...') self.heartbeat() logger.debug('Periodic heartbeat thread shutting down.') def heartbeat(self) -> None: # Note: this is invoked on a background thread, not an # executor thread. Be careful what you do with it b/c it # needs to get back and dump status again periodically. with self.status.lock: self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule via executor.submit if config.config['executors_schedule_remote_backups']: self.maybe_schedule_backup_bundles() def maybe_schedule_backup_bundles(self): assert self.status.lock.locked() num_done = len(self.status.finished_bundle_timings) num_idle_workers = self.worker_count - self.task_count now = time.time() if ( num_done >= 2 and num_idle_workers > 0 and (self.last_backup is None or (now - self.last_backup > 9.0)) and self.backup_lock.acquire(blocking=False) ): try: assert self.backup_lock.locked() bundle_to_backup = None best_score = None for ( worker, bundle_uuids, ) in self.status.in_flight_bundles_by_worker.items(): # Prefer to schedule backups of bundles running on # slower machines. base_score = 0 for record in self.workers: if worker.machine == record.machine: base_score = float(record.weight) base_score = 1.0 / base_score base_score *= 200.0 base_score = int(base_score) break for uuid in bundle_uuids: bundle = self.status.bundle_details_by_uuid.get(uuid, None) if ( bundle is not None and bundle.src_bundle is None and bundle.backup_bundles is not None ): score = base_score # Schedule backups of bundles running # longer; especially those that are # unexpectedly slow. start_ts = self.status.start_per_bundle[uuid] if start_ts is not None: runtime = now - start_ts score += runtime logger.debug('score[%s] => %.1f # latency boost', bundle, score) if bundle.slower_than_local_p95: score += runtime / 2 logger.debug('score[%s] => %.1f # >worker p95', bundle, score) if bundle.slower_than_global_p95: score += runtime / 4 logger.debug('score[%s] => %.1f # >global p95', bundle, score) # Prefer backups of bundles that don't # have backups already. backup_count = len(bundle.backup_bundles) if backup_count == 0: score *= 2 elif backup_count == 1: score /= 2 elif backup_count == 2: score /= 8 else: score = 0 logger.debug( 'score[%s] => %.1f # {backup_count} dup backup factor', bundle, score, ) if score != 0 and (best_score is None or score > best_score): bundle_to_backup = bundle assert bundle is not None assert bundle.backup_bundles is not None assert bundle.src_bundle is None best_score = score # Note: this is all still happening on the heartbeat # runner thread. That's ok because # schedule_backup_for_bundle uses the executor to # submit the bundle again which will cause it to be # picked up by a worker thread and allow this thread # to return to run future heartbeats. if bundle_to_backup is not None: self.last_backup = now logger.info( '=====> SCHEDULING BACKUP %s (score=%.1f) <=====', bundle_to_backup, best_score, ) self.schedule_backup_for_bundle(bundle_to_backup) finally: self.backup_lock.release() def is_worker_available(self) -> bool: return self.policy.is_worker_available() def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]: return self.policy.acquire_worker(machine_to_avoid) def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord: with self.cv: while not self.is_worker_available(): self.cv.wait() worker = self.acquire_worker(machine_to_avoid) if worker is not None: return worker msg = "We should never reach this point in the code" logger.critical(msg) raise Exception(msg) def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None: worker = bundle.worker assert worker is not None logger.debug('Released worker %s', worker) self.status.record_release_worker( worker, bundle.uuid, was_cancelled, ) with self.cv: worker.count += 1 self.cv.notify() self.adjust_task_count(-1) def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: if bundle.is_cancelled.wait(timeout=0.0): logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid) bundle.was_cancelled = True return True return False def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any: """Find a worker for bundle or block until one is available.""" self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname avoid_machine = override_avoid_machine is_original = bundle.src_bundle is None # Try not to schedule a backup on the same host as the original. if avoid_machine is None and bundle.src_bundle is not None: avoid_machine = bundle.src_bundle.machine worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) assert worker is not None # Ok, found a worker. bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username self.status.record_acquire_worker(worker, uuid) logger.debug('%s: Running bundle on %s...', bundle, worker) # Before we do any work, make sure the bundle is still viable. # It may have been some time between when it was submitted and # now due to lack of worker availability and someone else may # have already finished it. if self.check_if_cancelled(bundle): try: return self.process_work_result(bundle) except Exception as e: logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle) self.release_worker(bundle) if is_original: # Weird. We are the original owner of this # bundle. For it to have been cancelled, a backup # must have already started and completed before # we even for started. Moreover, the backup says # it is done but we can't find the results it # should have copied over. Reschedule the whole # thing. logger.exception(e) logger.error( '%s: We are the original owner thread and yet there are ' 'no results for this bundle. This is unexpected and bad.', bundle, ) return self.emergency_retry_nasty_bundle(bundle) else: # We're a backup and our bundle is cancelled # before we even got started. Do nothing and let # the original bundle's thread worry about either # finding the results or complaining about it. return None # Send input code / data to worker machine if it's not local. if hostname not in machine: try: cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd) run_silently(cmd) xfer_latency = time.time() - start_ts logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency) except Exception as e: self.release_worker(bundle) if is_original: # Weird. We tried to copy the code to the worker # and it failed... And we're the original bundle. # We have to retry. logger.exception(e) logger.error( "%s: Failed to send instructions to the worker machine?! " "This is not expected; we\'re the original bundle so this shouldn\'t " "be a race condition. Attempting an emergency retry...", bundle, ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code_file before we could copy it. Ignore. logger.warning( '%s: Failed to send instructions to the worker machine... ' 'We\'re a backup and this may be caused by the original (or ' 'some other backup) already finishing this work. Ignoring.', bundle, ) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = ( f'{SSH} {bundle.username}@{bundle.machine} ' f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"' ) logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd) p = cmd_in_background(cmd, silent=True) bundle.pid = p.pid logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine) return self.wait_for_process(p, bundle, 0) def wait_for_process( self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int ) -> Any: machine = bundle.machine assert p is not None pid = p.pid if depth > 3: logger.error( "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid ) p.terminate() self.release_worker(bundle) return self.emergency_retry_nasty_bundle(bundle) # Spin until either the ssh job we scheduled finishes the # bundle or some backup worker signals that they finished it # before we could. while True: try: p.wait(timeout=0.25) except subprocess.TimeoutExpired: if self.check_if_cancelled(bundle): logger.info('%s: looks like another worker finished bundle...', bundle) break else: logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine) p = None break # If we get here we believe the bundle is done; either the ssh # subprocess finished (hopefully successfully) or we noticed # that some other worker seems to have completed the bundle # and we're bailing out. try: ret = self.process_work_result(bundle) if ret is not None and p is not None: p.terminate() return ret # Something went wrong; e.g. we could not copy the results # back, cleanup after ourselves on the remote machine, or # unpickle the results we got from the remove machine. If we # still have an active ssh subprocess, keep waiting on it. # Otherwise, time for an emergency reschedule. except Exception as e: logger.exception(e) logger.error('%s: Something unexpected just happened...', bundle) if p is not None: logger.warning( "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle ) return self.wait_for_process(p, bundle, depth + 1) else: self.release_worker(bundle) return self.emergency_retry_nasty_bundle(bundle) def process_work_result(self, bundle: BundleDetails) -> Any: with self.status.lock: is_original = bundle.src_bundle is None was_cancelled = bundle.was_cancelled username = bundle.username machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a # remote machine. bundle.end_ts = time.time() if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( "%s: Fetching results back from %s@%s via %s", bundle, username, machine, cmd, ) # If either of these throw they are handled in # wait_for_process. attempts = 0 while True: try: run_silently(cmd) except Exception as e: attempts += 1 if attempts >= 3: raise e else: break # Cleanup remote /tmp files. run_silently( f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"' ) logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts) dur = bundle.end_ts - bundle.start_ts self.histogram.add_item(dur) # Only the original worker should unpickle the file contents # though since it's the only one whose result matters. The # original is also the only job that may delete result_file # from disk. Note that the original may have been cancelled # if one of the backups finished first; it still must read the # result from disk. It still does that here with is_cancelled # set. if is_original: logger.debug("%s: Unpickling %s.", bundle, result_file) try: with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: logger.exception(e) logger.error('Failed to load %s... this is bad news.', result_file) self.release_worker(bundle) # Re-raise the exception; the code in wait_for_process may # decide to emergency_retry_nasty_bundle here. raise e logger.debug('Removing local (master) %s and %s.', code_file, result_file) os.remove(result_file) os.remove(code_file) # Notify any backups that the original is done so they # should stop ASAP. Do this whether or not we # finished first since there could be more than one # backup. if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid ) backup.is_cancelled.set() # This is a backup job and, by now, we have already fetched # the bundle results. else: # Backup results don't matter, they just need to leave the # result file in the right place for their originals to # read/unpickle later. result = None # Tell the original to stop if we finished first. if not was_cancelled: orig_bundle = bundle.src_bundle assert orig_bundle is not None logger.debug( '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid ) orig_bundle.is_cancelled.set() self.release_worker(bundle, was_cancelled=was_cancelled) return result def create_original_bundle(self, pickle, fname: str): uuid = string_utils.generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' logger.debug('Writing pickled code to %s', code_file) with open(code_file, 'wb') as wb: wb.write(pickle) bundle = BundleDetails( pickled_code=pickle, uuid=uuid, fname=fname, worker=None, username=None, machine=None, hostname=platform.node(), code_file=code_file, result_file=result_file, pid=0, start_ts=time.time(), end_ts=0.0, slower_than_local_p95=False, slower_than_global_p95=False, src_bundle=None, is_cancelled=threading.Event(), was_cancelled=False, backup_bundles=[], failure_count=0, ) self.status.record_bundle_details(bundle) logger.debug('%s: Created an original bundle', bundle) return bundle def create_backup_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() assert src_bundle.backup_bundles is not None n = len(src_bundle.backup_bundles) uuid = src_bundle.uuid + f'_backup#{n}' backup_bundle = BundleDetails( pickled_code=src_bundle.pickled_code, uuid=uuid, fname=src_bundle.fname, worker=None, username=None, machine=None, hostname=src_bundle.hostname, code_file=src_bundle.code_file, result_file=src_bundle.result_file, pid=0, start_ts=time.time(), end_ts=0.0, slower_than_local_p95=False, slower_than_global_p95=False, src_bundle=src_bundle, is_cancelled=threading.Event(), was_cancelled=False, backup_bundles=None, # backup backups not allowed failure_count=0, ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) logger.debug('%s: Created a backup bundle', backup_bundle) return backup_bundle def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() assert src_bundle is not None backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname ) self._helper_executor.submit(self.launch, backup_bundle) # Results from backups don't matter; if they finish first # they will move the result_file to this machine and let # the original pick them up and unpickle them (and return # a result). def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]: is_original = bundle.src_bundle is None bundle.worker = None avoid_last_machine = bundle.machine bundle.machine = None bundle.username = None bundle.failure_count += 1 if is_original: retry_limit = 3 else: retry_limit = 2 if bundle.failure_count > retry_limit: logger.error( '%s: Tried this bundle too many times already (%dx); giving up.', bundle, retry_limit, ) if is_original: raise RemoteExecutorException( f'{bundle}: This bundle can\'t be completed despite several backups and retries', ) else: logger.error( '%s: At least it\'s only a backup; better luck with the others.', bundle ) return None else: msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' logger.warning(msg) warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides def submit(self, function: Callable, *args, **kwargs) -> fut.Future: if self.already_shutdown: raise Exception('Submitted work after shutdown.') pickle = make_cloud_pickle(function, *args, **kwargs) bundle = self.create_original_bundle(pickle, function.__name__) self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) @overrides def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None: if not self.already_shutdown: logging.debug('Shutting down RemoteExecutor %s', self.title) self.heartbeat_stop_event.set() self.heartbeat_thread.join() self._helper_executor.shutdown(wait) if not quiet: print(self.histogram.__repr__(label_formatter='%ds')) self.already_shutdown = True @singleton class DefaultExecutors(object): """A container for a default thread, process and remote executor. These are not created until needed and we take care to clean up before process exit. """ def __init__(self): self.thread_executor: Optional[ThreadExecutor] = None self.process_executor: Optional[ProcessExecutor] = None self.remote_executor: Optional[RemoteExecutor] = None @staticmethod def ping(host) -> bool: logger.debug('RUN> ping -c 1 %s', host) try: x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0) return x == 0 except Exception: return False def thread_pool(self) -> ThreadExecutor: if self.thread_executor is None: self.thread_executor = ThreadExecutor() return self.thread_executor def process_pool(self) -> ProcessExecutor: if self.process_executor is None: self.process_executor = ProcessExecutor() return self.process_executor def remote_pool(self) -> RemoteExecutor: if self.remote_executor is None: logger.info('Looking for some helper machines...') pool: List[RemoteWorkerRecord] = [] if self.ping('cheetah.house'): logger.info('Found cheetah.house') pool.append( RemoteWorkerRecord( username='scott', machine='cheetah.house', weight=24, count=5, ), ) if self.ping('meerkat.cabin'): logger.info('Found meerkat.cabin') pool.append( RemoteWorkerRecord( username='scott', machine='meerkat.cabin', weight=12, count=2, ), ) if self.ping('wannabe.house'): logger.info('Found wannabe.house') pool.append( RemoteWorkerRecord( username='scott', machine='wannabe.house', weight=14, count=2, ), ) if self.ping('puma.cabin'): logger.info('Found puma.cabin') pool.append( RemoteWorkerRecord( username='scott', machine='puma.cabin', weight=24, count=5, ), ) if self.ping('backup.house'): logger.info('Found backup.house') pool.append( RemoteWorkerRecord( username='scott', machine='backup.house', weight=9, count=2, ), ) # The controller machine has a lot to do; go easy on it. for record in pool: if record.machine == platform.node() and record.count > 1: logger.info('Reducing workload for %s.', record.machine) record.count = max(int(record.count / 2), 1) policy = WeightedRandomRemoteWorkerSelectionPolicy() policy.register_worker_pool(pool) self.remote_executor = RemoteExecutor(pool, policy) return self.remote_executor def shutdown(self) -> None: if self.thread_executor is not None: self.thread_executor.shutdown(wait=True, quiet=True) self.thread_executor = None if self.process_executor is not None: self.process_executor.shutdown(wait=True, quiet=True) self.process_executor = None if self.remote_executor is not None: self.remote_executor.shutdown(wait=True, quiet=True) self.remote_executor = None