#!/usr/bin/env python3 from __future__ import annotations from abc import ABC, abstractmethod import concurrent.futures as fut from collections import defaultdict from dataclasses import dataclass import logging import numpy import os import platform import random import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore from ansi import bg, fg, underline, reset import argparse_utils import config from exec_utils import run_silently, cmd_in_background from decorator_utils import singleton import histogram as hist logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Executors ({__file__})", "Args related to processing executors." ) parser.add_argument( '--executors_threadpool_size', type=int, metavar='#THREADS', help='Number of threads in the default threadpool, leave unset for default', default=None ) parser.add_argument( '--executors_processpool_size', type=int, metavar='#PROCESSES', help='Number of processes in the default processpool, leave unset for default', default=None, ) parser.add_argument( '--executors_schedule_remote_backups', default=True, action=argparse_utils.ActionNoYes, help='Should we schedule duplicative backup work if a remote bundle is slow', ) parser.add_argument( '--executors_max_bundle_failures', type=int, default=3, metavar='#FAILURES', help='Maximum number of failures before giving up on a bundle', ) RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z' SSH = 'ssh -oForwardX11=no' def make_cloud_pickle(fun, *args, **kwargs): logger.info(f"Making cloudpickled bundle at {fun.__name__}") return cloudpickle.dumps((fun, args, kwargs)) class BaseExecutor(ABC): def __init__(self, *, title=''): self.title = title self.task_count = 0 self.histogram = hist.SimpleHistogram( hist.SimpleHistogram.n_evenly_spaced_buckets( int(0), int(500), 50 ) ) @abstractmethod def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pass @abstractmethod def shutdown(self, wait: bool = True) -> None: pass def adjust_task_count(self, delta: int) -> None: self.task_count += delta logger.debug(f'Executor current task count is {self.task_count}') class ThreadExecutor(BaseExecutor): def __init__(self, max_workers: Optional[int] = None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_threadpool_size' in config.config: workers = config.config['executors_threadpool_size'] logger.debug(f'Creating threadpool executor with {workers} workers') self._thread_pool_executor = fut.ThreadPoolExecutor( max_workers=workers, thread_name_prefix="thread_executor_helper" ) def run_local_bundle(self, fun, *args, **kwargs): logger.debug(f"Running local bundle at {fun.__name__}") start = time.time() result = fun(*args, **kwargs) end = time.time() self.adjust_task_count(-1) duration = end - start logger.debug(f"{fun.__name__} finished; used {duration:.1f}s") self.histogram.add_item(duration) return result def submit(self, function: Callable, *args, **kwargs) -> fut.Future: self.adjust_task_count(+1) newargs = [] newargs.append(function) for arg in args: newargs.append(arg) return self._thread_pool_executor.submit( self.run_local_bundle, *newargs, **kwargs) def shutdown(self, wait = True) -> None: logger.debug(f'Shutting down threadpool executor {self.title}') print(self.histogram) self._thread_pool_executor.shutdown(wait) class ProcessExecutor(BaseExecutor): def __init__(self, max_workers=None): super().__init__() workers = None if max_workers is not None: workers = max_workers elif 'executors_processpool_size' in config.config: workers = config.config['executors_processpool_size'] logger.debug(f'Creating processpool executor with {workers} workers.') self._process_executor = fut.ProcessPoolExecutor( max_workers=workers, ) def run_cloud_pickle(self, pickle): fun, args, kwargs = cloudpickle.loads(pickle) logger.debug(f"Running pickled bundle at {fun.__name__}") result = fun(*args, **kwargs) self.adjust_task_count(-1) return result def submit(self, function: Callable, *args, **kwargs) -> fut.Future: start = time.time() self.adjust_task_count(+1) pickle = make_cloud_pickle(function, *args, **kwargs) result = self._process_executor.submit( self.run_cloud_pickle, pickle ) result.add_done_callback( lambda _: self.histogram.add_item( time.time() - start ) ) return result def shutdown(self, wait=True) -> None: logger.debug(f'Shutting down processpool executor {self.title}') self._process_executor.shutdown(wait) print(self.histogram) def __getstate__(self): state = self.__dict__.copy() state['_process_executor'] = None return state @dataclass class RemoteWorkerRecord: username: str machine: str weight: int count: int def __hash__(self): return hash((self.username, self.machine)) def __repr__(self): return f'{self.username}@{self.machine}' @dataclass class BundleDetails: pickled_code: bytes uuid: str worker: Optional[RemoteWorkerRecord] username: Optional[str] machine: Optional[str] hostname: str code_file: str result_file: str pid: int start_ts: float end_ts: float too_slow: bool super_slow: bool src_bundle: BundleDetails is_cancelled: threading.Event was_cancelled: bool backup_bundles: Optional[List[BundleDetails]] failure_count: int class RemoteExecutorStatus: def __init__(self, total_worker_count: int) -> None: self.worker_count = total_worker_count self.known_workers: Set[RemoteWorkerRecord] = set() self.start_per_bundle: Dict[str, float] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[ RemoteWorkerRecord, List[float] ] = {} self.in_flight_bundles_by_worker: Dict[ RemoteWorkerRecord, Set[str] ] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} self.finished_bundle_timings: List[float] = [] self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted = 0 # Protects reads and modification using self. Also used # as a memory fence for modifications to bundle. self.lock = threading.Lock() def record_acquire_worker( self, worker: RemoteWorkerRecord, uuid: str ) -> None: with self.lock: self.record_acquire_worker_already_locked( worker, uuid ) def record_acquire_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str ) -> None: assert self.lock.locked() self.known_workers.add(worker) self.start_per_bundle[uuid] = time.time() x = self.in_flight_bundles_by_worker.get(worker, set()) x.add(uuid) self.in_flight_bundles_by_worker[worker] = x def record_bundle_details( self, details: BundleDetails) -> None: with self.lock: self.record_bundle_details_already_locked(details) def record_bundle_details_already_locked( self, details: BundleDetails) -> None: assert self.lock.locked() self.bundle_details_by_uuid[details.uuid] = details def record_release_worker_already_locked( self, worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool, ) -> None: assert self.lock.locked() ts = time.time() self.end_per_bundle[uuid] = ts self.in_flight_bundles_by_worker[worker].remove(uuid) if not was_cancelled: bundle_latency = ts - self.start_per_bundle[uuid] x = self.finished_bundle_timings_per_worker.get(worker, list()) x.append(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x self.finished_bundle_timings.append(bundle_latency) def total_in_flight(self) -> int: assert self.lock.locked() total_in_flight = 0 for worker in self.known_workers: total_in_flight += len(self.in_flight_bundles_by_worker[worker]) return total_in_flight def total_idle(self) -> int: assert self.lock.locked() return self.worker_count - self.total_in_flight() def __repr__(self): assert self.lock.locked() ts = time.time() total_finished = len(self.finished_bundle_timings) total_in_flight = self.total_in_flight() ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) ret += ( f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) else: ret += ( f' ✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' timings = self.finished_bundle_timings_per_worker.get(worker, []) count = len(timings) qworker = None if count > 1: qworker = numpy.quantile(timings, [0.5, 0.95]) ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' else: ret += '\n' if count > 0: ret += f' ...finished {count} total bundle(s) so far\n' in_flight = len(self.in_flight_bundles_by_worker[worker]) if in_flight > 0: ret += f' ...{in_flight} bundles currently in flight:\n' for bundle_uuid in self.in_flight_bundles_by_worker[worker]: details = self.bundle_details_by_uuid.get( bundle_uuid, None ) pid = str(details.pid) if details is not None else "TBD" sec = ts - self.start_per_bundle[bundle_uuid] ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far ' if qworker is not None: if sec > qworker[1]: ret += f'{bg("red")}>💻p95{reset()} ' elif sec > qworker[0]: ret += f'{fg("red")}>💻p50{reset()} ' if qall is not None: if sec > qall[1] * 1.5: ret += f'{bg("red")}!!!{reset()}' if details is not None: logger.debug(f'Flagging {details.uuid} for another backup') details.super_slow = True elif sec > qall[1]: ret += f'{bg("red")}>∀p95{reset()} ' if details is not None: logger.debug(f'Flagging {details.uuid} for a backup') details.too_slow = True elif sec > qall[0]: ret += f'{fg("red")}>∀p50{reset()}' ret += '\n' return ret def periodic_dump(self, total_bundles_submitted: int) -> None: assert self.lock.locked() self.total_bundles_submitted = total_bundles_submitted ts = time.time() if ( self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0 ): print(self) self.last_periodic_dump = ts class RemoteWorkerSelectionPolicy(ABC): def register_worker_pool(self, workers): random.seed() self.workers = workers @abstractmethod def is_worker_available(self) -> bool: pass @abstractmethod def acquire_worker( self, machine_to_avoid = None ) -> Optional[RemoteWorkerRecord]: pass class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False def acquire_worker( self, machine_to_avoid = None ) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: for x in range(0, worker.count): for y in range(0, worker.weight): grabbag.append(worker) for _ in range(0, 5): random.shuffle(grabbag) worker = grabbag[0] if worker.machine != machine_to_avoid or _ > 2: if worker.count > 0: worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker logger.warning("Couldn't find a worker; go fish.") return None class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def __init__(self) -> None: self.index = 0 def is_worker_available(self) -> bool: for worker in self.workers: if worker.count > 0: return True return False def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: x = self.index while True: worker = self.workers[x] if worker.count > 0: worker.count -= 1 x += 1 if x >= len(self.workers): x = 0 self.index = x logger.debug(f'Selected worker {worker}') return worker x += 1 if x >= len(self.workers): x = 0 if x == self.index: logger.warning("Couldn't find a worker; go fish.") return None class RemoteExecutor(BaseExecutor): def __init__(self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy) -> None: super().__init__() self.workers = workers self.worker_count = 0 for worker in self.workers: self.worker_count += worker.count if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) raise Exception(msg) self.policy = policy self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() self._helper_executor = fut.ThreadPoolExecutor( thread_name_prefix="remote_executor_helper", max_workers=self.worker_count, ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 logger.debug( f'Creating remote processpool with {self.worker_count} remote endpoints.' ) def is_worker_available(self) -> bool: return self.policy.is_worker_available() def acquire_worker( self, machine_to_avoid: str = None ) -> Optional[RemoteWorkerRecord]: return self.policy.acquire_worker(machine_to_avoid) def find_available_worker_or_block( self, machine_to_avoid: str = None ) -> RemoteWorkerRecord: with self.cv: while not self.is_worker_available(): self.cv.wait() worker = self.acquire_worker(machine_to_avoid) if worker is not None: return worker msg = "We should never reach this point in the code" logger.critical(msg) raise Exception(msg) def release_worker(self, worker: RemoteWorkerRecord) -> None: logger.debug(f'Released worker {worker}') with self.cv: worker.count += 1 self.cv.notify() def heartbeat(self) -> None: with self.status.lock: # Regular progress report self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule if len(self.status.finished_bundle_timings) > 7: for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): for uuid in bundle_uuids: bundle = self.status.bundle_details_by_uuid.get(uuid, None) if ( bundle is not None and bundle.too_slow and bundle.src_bundle is None and config.config['executors_schedule_remote_backups'] ): self.consider_backup_for_bundle(bundle) def consider_backup_for_bundle(self, bundle: BundleDetails) -> None: assert self.status.lock.locked() if ( bundle.too_slow and len(bundle.backup_bundles) == 0 # one backup per ): msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return elif ( bundle.super_slow and len(bundle.backup_bundles) < 2 # two backups in dire situations and self.status.total_idle() > 4 ): msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***" logger.debug(msg) self.schedule_backup_for_bundle(bundle) return def check_if_cancelled(self, bundle: BundleDetails) -> bool: with self.status.lock: if bundle.is_cancelled.wait(timeout=0.0): logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.') bundle.was_cancelled = True return True return False def launch(self, bundle: BundleDetails) -> Any: """Find a worker for bundle or block until one is available.""" self.adjust_task_count(+1) uuid = bundle.uuid hostname = bundle.hostname avoid_machine = None # Try not to schedule a backup on the same host as the original. if bundle.src_bundle is not None: avoid_machine = bundle.src_bundle.machine worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) bundle.worker = worker machine = bundle.machine = worker.machine username = bundle.username = worker.username self.status.record_acquire_worker(worker, uuid) logger.debug(f'Running bundle {uuid} on {worker}...') # Before we do any work, make sure the bundle is still viable. if self.check_if_cancelled(bundle): try: return self.post_launch_work(bundle) except Exception as e: logger.exception(e) logger.info(f"Bundle {uuid} seems to have failed?!") if bundle.failure_count < config.config['executors_max_bundle_failures']: return self.launch(bundle) else: logger.info(f"Bundle {uuid} is poison, giving up on it.") return None # Send input to machine if it's not local. if hostname not in machine: cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' logger.info(f"Copying work to {worker} via {cmd}") run_silently(cmd) # Do it. cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' f'"source remote-execution/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') p = cmd_in_background(cmd, silent=True) bundle.pid = pid = p.pid logger.info(f"Running {cmd} in the background as process {pid}") while True: try: p.wait(timeout=0.25) except subprocess.TimeoutExpired: self.heartbeat() # Both source and backup bundles can be cancelled by # the other depending on which finishes first. if self.check_if_cancelled(bundle): p.terminate() break else: logger.debug( f"{pid}/{bundle.uuid} has finished its work normally." ) break try: return self.post_launch_work(bundle) except Exception as e: logger.exception(e) logger.info(f"Bundle {uuid} seems to have failed?!") if bundle.failure_count < config.config['executors_max_bundle_failures']: return self.launch(bundle) logger.info(f"Bundle {uuid} is poison, giving up on it.") return None def post_launch_work(self, bundle: BundleDetails) -> Any: with self.status.lock: is_original = bundle.src_bundle is None was_cancelled = bundle.was_cancelled username = bundle.username machine = bundle.machine result_file = bundle.result_file code_file = bundle.code_file # Whether original or backup, if we finished first we must # fetch the results if the computation happened on a # remote machine. bundle.end_ts = time.time() if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( f"Fetching results from {username}@{machine} via {cmd}" ) try: run_silently(cmd) except subprocess.CalledProcessError: logger.critical(f'Failed to copy {username}@{machine}:{result_file}!') run_silently(f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"') dur = bundle.end_ts - bundle.start_ts self.histogram.add_item(dur) assert bundle.worker is not None self.status.record_release_worker_already_locked( bundle.worker, bundle.uuid, was_cancelled ) # Only the original worker should unpickle the file contents # though since it's the only one whose result matters. The # original is also the only job that may delete result_file # from disk. Note that the original may have been cancelled # if one of the backups finished first; it still must read the # result from disk. if is_original: logger.debug(f"Unpickling {result_file}.") try: with open(f'{result_file}', 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: msg = f'Failed to load {result_file}' logger.critical(msg) bundle.failure_count += 1 self.release_worker(bundle.worker) raise Exception(e) os.remove(f'{result_file}') os.remove(f'{code_file}') # Notify any backups that the original is done so they # should stop ASAP. Do this whether or not we # finished first since there could be more than one # backup. if bundle.backup_bundles is not None: for backup in bundle.backup_bundles: logger.debug( f'Notifying backup {backup.uuid} that it is cancelled' ) backup.is_cancelled.set() # This is a backup job. else: # Backup results don't matter, they just need to leave the # result file in the right place for their originals to # read/unpickle later. result = None # Tell the original to stop if we finished first. if not was_cancelled: logger.debug( f'Notifying original {bundle.src_bundle.uuid} that it is cancelled' ) bundle.src_bundle.is_cancelled.set() assert bundle.worker is not None self.release_worker(bundle.worker) self.adjust_task_count(-1) return result def create_original_bundle(self, pickle): from string_utils import generate_uuid uuid = generate_uuid(as_hex=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' logger.debug(f'Writing pickled code to {code_file}') with open(f'{code_file}', 'wb') as wb: wb.write(pickle) bundle = BundleDetails( pickled_code = pickle, uuid = uuid, worker = None, username = None, machine = None, hostname = platform.node(), code_file = code_file, result_file = result_file, pid = 0, start_ts = time.time(), end_ts = 0.0, too_slow = False, super_slow = False, src_bundle = None, is_cancelled = threading.Event(), was_cancelled = False, backup_bundles = [], failure_count = 0, ) self.status.record_bundle_details(bundle) logger.debug(f'Created original bundle {uuid}') return bundle def create_backup_bundle(self, src_bundle: BundleDetails): assert src_bundle.backup_bundles is not None n = len(src_bundle.backup_bundles) uuid = src_bundle.uuid + f'_backup#{n}' backup_bundle = BundleDetails( pickled_code = src_bundle.pickled_code, uuid = uuid, worker = None, username = None, machine = None, hostname = src_bundle.hostname, code_file = src_bundle.code_file, result_file = src_bundle.result_file, pid = 0, start_ts = time.time(), end_ts = 0.0, too_slow = False, super_slow = False, src_bundle = src_bundle, is_cancelled = threading.Event(), was_cancelled = False, backup_bundles = None, # backup backups not allowed failure_count = 0, ) src_bundle.backup_bundles.append(backup_bundle) self.status.record_bundle_details_already_locked(backup_bundle) logger.debug(f'Created backup bundle {uuid}') return backup_bundle def schedule_backup_for_bundle(self, src_bundle: BundleDetails): assert self.status.lock.locked() backup_bundle = self.create_backup_bundle(src_bundle) logger.debug( f'Scheduling backup bundle {backup_bundle.uuid} for execution' ) self._helper_executor.submit(self.launch, backup_bundle) # Results from backups don't matter; if they finish first # they will move the result_file to this machine and let # the original pick them up and unpickle them. def submit(self, function: Callable, *args, **kwargs) -> fut.Future: pickle = make_cloud_pickle(function, *args, **kwargs) bundle = self.create_original_bundle(pickle) self.total_bundles_submitted += 1 return self._helper_executor.submit(self.launch, bundle) def shutdown(self, wait=True) -> None: self._helper_executor.shutdown(wait) logging.debug(f'Shutting down RemoteExecutor {self.title}') print(self.histogram) @singleton class DefaultExecutors(object): def __init__(self): self.thread_executor: Optional[ThreadExecutor] = None self.process_executor: Optional[ProcessExecutor] = None self.remote_executor: Optional[RemoteExecutor] = None def ping(self, host) -> bool: command = ['ping', '-c', '1', host] return subprocess.call( command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) == 0 def thread_pool(self) -> ThreadExecutor: if self.thread_executor is None: self.thread_executor = ThreadExecutor() return self.thread_executor def process_pool(self) -> ProcessExecutor: if self.process_executor is None: self.process_executor = ProcessExecutor() return self.process_executor def remote_pool(self) -> RemoteExecutor: if self.remote_executor is None: pool: List[RemoteWorkerRecord] = [] if self.ping('cheetah.house'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', weight = 12, count = 4, ), ) if self.ping('video.house'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'video.house', weight = 1, count = 4, ), ) if self.ping('wannabe.house'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'wannabe.house', weight = 2, count = 4, ), ) if self.ping('meerkat.cabin'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'meerkat.cabin', weight = 6, count = 2, ), ) if self.ping('backup.house'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'backup.house', weight = 1, count = 4, ), ) if self.ping('puma.cabin'): pool.append( RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', weight = 12, count = 4, ), ) policy = WeightedRandomRemoteWorkerSelectionPolicy() policy.register_worker_pool(pool) self.remote_executor = RemoteExecutor(pool, policy) return self.remote_executor