#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
-from abc import ABC, abstractmethod
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
+import string_utils
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_exitcode, cmd_in_background, run_silently
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
SCP = '/usr/bin/scp -C'
-def make_cloud_pickle(fun, *args, **kwargs):
- logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+def _make_cloud_pickle(fun, *args, **kwargs):
+ """Internal helper to create cloud pickles."""
+ logger.debug("Making cloudpickled bundle at %s", fun.__name__)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
+ """The base executor interface definition. The interface for
+ :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+ :class:`ThreadExecutor`.
+ """
+
def __init__(self, *, title=''):
self.title = title
self.histogram = hist.SimpleHistogram(
pass
@abstractmethod
- def shutdown(self, wait: bool = True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
pass
- def shutdown_if_idle(self) -> bool:
+ def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
"""
if self.task_count == 0:
- self.shutdown()
+ self.shutdown(wait=True, quiet=quiet)
return True
return False
"""
self.task_count += delta
- logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+ logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
def get_task_count(self) -> int:
"""Change the task count. Note: do not call this method from a
class ThreadExecutor(BaseExecutor):
+ """A threadpool executor. This executor uses python threads to
+ schedule tasks. Note that, at least as of python3.10, because of
+ the global lock in the interpreter itself, these do not
+ parallelize very well so this class is useful mostly for non-CPU
+ intensive tasks.
+
+ See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+ """
+
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
- logger.debug(f'Creating threadpool executor with {workers} workers')
+ if workers is not None:
+ logger.debug('Creating threadpool executor with %d workers', workers)
+ else:
+ logger.debug('Creating a default sized threadpool executor')
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
self.already_shutdown = False
# This is run on a different thread; do not adjust task count here.
- def run_local_bundle(self, fun, *args, **kwargs):
- logger.debug(f"Running local bundle at {fun.__name__}")
+ @staticmethod
+ def run_local_bundle(fun, *args, **kwargs):
+ logger.debug("Running local bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
newargs.append(arg)
start = time.time()
result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
+ ThreadExecutor.run_local_bundle, *newargs, **kwargs
)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down threadpool executor {self.title}')
- print(self.histogram)
+ logger.debug('Shutting down threadpool executor %s', self.title)
self._thread_pool_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
class ProcessExecutor(BaseExecutor):
+ """An executor which runs tasks in child processes.
+
+ See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+ """
+
def __init__(self, max_workers=None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
- logger.debug(f'Creating processpool executor with {workers} workers.')
+ if workers is not None:
+ logger.debug('Creating processpool executor with %d workers.', workers)
+ else:
+ logger.debug('Creating a default sized processpool executor')
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
self.already_shutdown = False
# This is run in another process; do not adjust task count here.
- def run_cloud_pickle(self, pickle):
+ @staticmethod
+ def run_cloud_pickle(pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
- logger.debug(f"Running pickled bundle at {fun.__name__}")
+ logger.debug("Running pickled bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
raise Exception('Submitted work after shutdown.')
start = time.time()
self.adjust_task_count(+1)
- pickle = make_cloud_pickle(function, *args, **kwargs)
- result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
+ result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down processpool executor {self.title}')
+ logger.debug('Shutting down processpool executor %s', self.title)
self._process_executor.shutdown(wait)
- print(self.histogram)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
def __getstate__(self):
@dataclass
class RemoteWorkerRecord:
+ """A record of info about a remote worker."""
+
username: str
+ """Username we can ssh into on this machine to run work."""
+
machine: str
+ """Machine address / name."""
+
weight: int
+ """Relative probability for the weighted policy to select this
+ machine for scheduling work."""
+
count: int
+ """If this machine is selected, what is the maximum number of task
+ that it can handle?"""
def __hash__(self):
return hash((self.username, self.machine))
@dataclass
class BundleDetails:
+ """All info necessary to define some unit of work that needs to be
+ done, where it is being run, its state, whether it is an original
+ bundle of a backup bundle, how many times it has failed, etc...
+ """
+
pickled_code: bytes
+ """The code to run, cloud pickled"""
+
uuid: str
- fname: str
+ """A unique identifier"""
+
+ function_name: str
+ """The name of the function we pickled"""
+
worker: Optional[RemoteWorkerRecord]
+ """The remote worker running this bundle or None if none (yet)"""
+
username: Optional[str]
+ """The remote username running this bundle or None if none (yet)"""
+
machine: Optional[str]
+ """The remote machine running this bundle or None if none (yet)"""
+
hostname: str
+ """The controller machine"""
+
code_file: str
+ """A unique filename to hold the work to be done"""
+
result_file: str
+ """Where the results should be placed / read from"""
+
pid: int
+ """The process id of the local subprocess watching the ssh connection
+ to the remote machine"""
+
start_ts: float
+ """Starting time"""
+
end_ts: float
+ """Ending time"""
+
slower_than_local_p95: bool
+ """Currently slower then 95% of other bundles on remote host"""
+
slower_than_global_p95: bool
- src_bundle: BundleDetails
+ """Currently slower than 95% of other bundles globally"""
+
+ src_bundle: Optional[BundleDetails]
+ """If this is a backup bundle, this points to the original bundle
+ that it's backing up. None otherwise."""
+
is_cancelled: threading.Event
+ """An event that can be signaled to indicate this bundle is cancelled.
+ This is set when another copy (backup or original) of this work has
+ completed successfully elsewhere."""
+
was_cancelled: bool
+ """True if this bundle was cancelled, False if it finished normally"""
+
backup_bundles: Optional[List[BundleDetails]]
+ """If we've created backups of this bundle, this is the list of them"""
+
failure_count: int
+ """How many times has this bundle failed already?"""
def __repr__(self):
uuid = self.uuid
else:
suffix = uuid[-6:]
+ # We colorize the uuid based on some bits from it to make them
+ # stand out in the logging and help a reader correlate log messages
+ # related to the same bundle.
colorz = [
fg('violet red'),
fg('red'),
fg('medium purple'),
]
c = colorz[int(uuid[-2:], 16) % len(colorz)]
- fname = self.fname if self.fname is not None else 'nofname'
+ function_name = self.function_name if self.function_name is not None else 'nofname'
machine = self.machine if self.machine is not None else 'nomachine'
- return f'{c}{suffix}/{fname}/{machine}{reset()}'
+ return f'{c}{suffix}/{function_name}/{machine}{reset()}'
class RemoteExecutorStatus:
+ """A status 'scoreboard' for a remote executor tracking various
+ metrics and able to render a periodic dump of global state.
+ """
+
def __init__(self, total_worker_count: int) -> None:
+ """C'tor.
+
+ Args:
+ total_worker_count: number of workers in the pool
+
+ """
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str, float] = defaultdict(float)
+ self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.lock: threading.Lock = threading.Lock()
def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Record that bundle with uuid is assigned to a particular worker.
+
+ Args:
+ worker: the record of the worker to which uuid is assigned
+ uuid: the uuid of a bundle that has been assigned to a worker
+ """
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Same as above but an entry point that doesn't acquire the lock
+ for codepaths where it's already held."""
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
self.in_flight_bundles_by_worker[worker] = x
def record_bundle_details(self, details: BundleDetails) -> None:
+ """Register the details about a bundle of work."""
with self.lock:
self.record_bundle_details_already_locked(details)
def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
self.bundle_details_by_uuid[details.uuid] = details
uuid: str,
was_cancelled: bool,
) -> None:
+ """Record that a bundle has released a worker."""
with self.lock:
self.record_release_worker_already_locked(worker, uuid, was_cancelled)
uuid: str,
was_cancelled: bool,
) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
ts = time.time()
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
- x = self.finished_bundle_timings_per_worker.get(worker, list())
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
+ x = self.finished_bundle_timings_per_worker.get(worker, [])
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
self.finished_bundle_timings.append(bundle_latency)
def record_processing_began(self, uuid: str):
+ """Record when work on a bundle begins."""
with self.lock:
self.start_per_bundle[uuid] = time.time()
def total_in_flight(self) -> int:
+ """How many bundles are in flight currently?"""
assert self.lock.locked()
total_in_flight = 0
for worker in self.known_workers:
return total_in_flight
def total_idle(self) -> int:
+ """How many idle workers are there currently?"""
assert self.lock.locked()
return self.worker_count - self.total_in_flight()
class RemoteWorkerSelectionPolicy(ABC):
- def register_worker_pool(self, workers):
+ """A policy for selecting a remote worker base class."""
+
+ def __init__(self):
+ self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+ def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
self.workers = workers
@abstractmethod
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that uses weighted RNG."""
+
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
- for worker in self.workers:
- if worker.machine != machine_to_avoid:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ if self.workers:
+ for worker in self.workers:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
- for worker in self.workers:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
logger.warning('There are no available workers?!')
worker = random.sample(grabbag, 1)[0]
assert worker.count > 0
worker.count -= 1
- logger.debug(f'Chose worker {worker}')
+ logger.debug('Selected worker %s', worker)
return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that just round robins."""
+
def __init__(self) -> None:
+ super().__init__()
self.index = 0
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
- x = self.index
- while True:
- worker = self.workers[x]
- if worker.count > 0:
- worker.count -= 1
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ if self.workers:
+ x = self.index
+ while True:
+ worker = self.workers[x]
+ if worker.count > 0:
+ worker.count -= 1
+ x += 1
+ if x >= len(self.workers):
+ x = 0
+ self.index = x
+ logger.debug('Selected worker %s', worker)
+ return worker
x += 1
if x >= len(self.workers):
x = 0
- self.index = x
- logger.debug(f'Selected worker {worker}')
- return worker
- x += 1
- if x >= len(self.workers):
- x = 0
- if x == self.index:
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ if x == self.index:
+ logger.warning('Unexpectedly could not find a worker, retrying...')
+ return None
+ return None
class RemoteExecutor(BaseExecutor):
+ """An executor that uses processes on remote machines to do work. This
+ works by creating "bundles" of work with pickled code in each to be
+ executed. Each bundle is assigned a remote worker based on some policy
+ heuristics. Once assigned to a remote worker, a local subprocess is
+ created. It copies the pickled code to the remote machine via ssh/scp
+ and then starts up work on the remote machine again using ssh. When
+ the work is complete it copies the results back to the local machine.
+
+ So there is essentially one "controller" machine (which may also be
+ in the remote executor pool and therefore do task work in addition to
+ controlling) and N worker machines. This code runs on the controller
+ whereas on the worker machines we invoke pickled user code via a
+ shim in :file:`remote_worker.py`.
+
+ Some redundancy and safety provisions are made; e.g. slower than
+ expected tasks have redundant backups created and if a task fails
+ repeatedly we consider it poisoned and give up on it.
+
+ .. warning::
+
+ The network overhead / latency of copying work from the
+ controller machine to the remote workers is relatively high.
+ This executor probably only makes sense to use with
+ computationally expensive tasks such as jobs that will execute
+ for ~30 seconds or longer.
+
+ See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+ """
+
def __init__(
self,
workers: List[RemoteWorkerRecord],
policy: RemoteWorkerSelectionPolicy,
) -> None:
+ """C'tor.
+
+ Args:
+ workers: A list of remote workers we can call on to do tasks.
+ policy: A policy for selecting remote workers for tasks.
+ """
+
super().__init__()
self.workers = workers
self.policy = policy
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
(
self.heartbeat_thread,
self.heartbeat_stop_event,
- ) = self.run_periodic_heartbeat()
+ ) = self._run_periodic_heartbeat()
self.already_shutdown = False
@background_thread
- def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ """
+ We create a background thread to invoke :meth:`_heartbeat` regularly
+ while we are scheduling work. It does some accounting such as
+ looking for slow bundles to tag for backup creation, checking for
+ unexpected failures, and printing a fancy message on stdout.
+ """
while not stop_event.is_set():
time.sleep(5.0)
logger.debug('Running periodic heartbeat code...')
- self.heartbeat()
+ self._heartbeat()
logger.debug('Periodic heartbeat thread shutting down.')
- def heartbeat(self) -> None:
+ def _heartbeat(self) -> None:
# Note: this is invoked on a background thread, not an
# executor thread. Be careful what you do with it b/c it
# needs to get back and dump status again periodically.
# Look for bundles to reschedule via executor.submit
if config.config['executors_schedule_remote_backups']:
- self.maybe_schedule_backup_bundles()
+ self._maybe_schedule_backup_bundles()
+
+ def _maybe_schedule_backup_bundles(self):
+ """Maybe schedule backup bundles if we see a very slow bundle."""
- def maybe_schedule_backup_bundles(self):
assert self.status.lock.locked()
num_done = len(self.status.finished_bundle_timings)
num_idle_workers = self.worker_count - self.task_count
now = time.time()
if (
- num_done > 2
- and num_idle_workers > 1
+ num_done >= 2
+ and num_idle_workers > 0
and (self.last_backup is None or (now - self.last_backup > 9.0))
and self.backup_lock.acquire(blocking=False)
):
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug('score[%s] => %.1f # latency boost', bundle, score)
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug('score[%s] => %.1f # >global p95', bundle, score)
# Prefer backups of bundles that don't
# have backups already.
else:
score = 0
logger.debug(
- f'score[{bundle}] => {score} # {backup_count} dup backup factor'
+ 'score[%s] => %.1f # {backup_count} dup backup factor',
+ bundle,
+ score,
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
# Note: this is all still happening on the heartbeat
# runner thread. That's ok because
- # schedule_backup_for_bundle uses the executor to
+ # _schedule_backup_for_bundle uses the executor to
# submit the bundle again which will cause it to be
# picked up by a worker thread and allow this thread
# to return to run future heartbeats.
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
- f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+ '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+ bundle_to_backup,
+ best_score,
)
- self.schedule_backup_for_bundle(bundle_to_backup)
+ self._schedule_backup_for_bundle(bundle_to_backup)
finally:
self.backup_lock.release()
- def is_worker_available(self) -> bool:
+ def _is_worker_available(self) -> bool:
+ """Is there a worker available currently?"""
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ """Try to acquire a worker."""
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+ """Find a worker or block until one becomes available."""
with self.cv:
- while not self.is_worker_available():
+ while not self._is_worker_available():
self.cv.wait()
- worker = self.acquire_worker(machine_to_avoid)
+ worker = self._acquire_worker(machine_to_avoid)
if worker is not None:
return worker
msg = "We should never reach this point in the code"
logger.critical(msg)
raise Exception(msg)
- def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ """Release a previously acquired worker."""
worker = bundle.worker
assert worker is not None
- logger.debug(f'Released worker {worker}')
+ logger.debug('Released worker %s', worker)
self.status.record_release_worker(
worker,
bundle.uuid,
self.cv.notify()
self.adjust_task_count(-1)
- def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ """See if a particular bundle is cancelled. Do not block."""
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
- logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+ logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
bundle.was_cancelled = True
return True
return False
- def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+ def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
"""Find a worker for bundle or block until one is available."""
+
self.adjust_task_count(+1)
uuid = bundle.uuid
hostname = bundle.hostname
avoid_machine = bundle.src_bundle.machine
worker = None
while worker is None:
- worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ worker = self._find_available_worker_or_block(avoid_machine)
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
machine = bundle.machine = worker.machine
username = bundle.username = worker.username
self.status.record_acquire_worker(worker, uuid)
- logger.debug(f'{bundle}: Running bundle on {worker}...')
+ logger.debug('%s: Running bundle on %s...', bundle, worker)
# Before we do any work, make sure the bundle is still viable.
# It may have been some time between when it was submitted and
# now due to lack of worker availability and someone else may
# have already finished it.
- if self.check_if_cancelled(bundle):
+ if self._check_if_cancelled(bundle):
try:
- return self.process_work_result(bundle)
+ return self._process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
- self.release_worker(bundle)
+ logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
+ self._release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# bundle. For it to have been cancelled, a backup
# thing.
logger.exception(e)
logger.error(
- f'{bundle}: We are the original owner thread and yet there are '
- + 'no results for this bundle. This is unexpected and bad.'
+ '%s: We are the original owner thread and yet there are '
+ 'no results for this bundle. This is unexpected and bad.',
+ bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
- # Expected(?). We're a backup and our bundle is
- # cancelled before we even got started. Something
- # went bad in process_work_result (I acutually don't
- # see what?) but probably not worth worrying
- # about. Let the original thread worry about
- # either finding the results or complaining about
- # it.
+ # We're a backup and our bundle is cancelled
+ # before we even got started. Do nothing and let
+ # the original bundle's thread worry about either
+ # finding the results or complaining about it.
return None
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
- logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+ logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+ logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
except Exception as e:
- self.release_worker(bundle)
+ self._release_worker(bundle)
if is_original:
- # Weird. We tried to copy the code to the worker and it failed...
- # And we're the original bundle. We have to retry.
+ # Weird. We tried to copy the code to the worker
+ # and it failed... And we're the original bundle.
+ # We have to retry.
logger.exception(e)
logger.error(
- f"{bundle}: Failed to send instructions to the worker machine?! "
- + "This is not expected; we\'re the original bundle so this shouldn\'t "
- + "be a race condition. Attempting an emergency retry..."
+ "%s: Failed to send instructions to the worker machine?! "
+ "This is not expected; we\'re the original bundle so this shouldn\'t "
+ "be a race condition. Attempting an emergency retry...",
+ bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
# This is actually expected; we're a backup.
# There's a race condition where someone else
# already finished the work and removed the source
- # code file before we could copy it. No biggie.
- msg = f'{bundle}: Failed to send instructions to the worker machine... '
- msg += 'We\'re a backup and this may be caused by the original (or some '
- msg += 'other backup) already finishing this work. Ignoring this.'
- logger.warning(msg)
+ # code_file before we could copy it. Ignore.
+ logger.warning(
+ '%s: Failed to send instructions to the worker machine... '
+ 'We\'re a backup and this may be caused by the original (or '
+ 'some other backup) already finishing this work. Ignoring.',
+ bundle,
+ )
return None
# Kick off the work. Note that if this fails we let
- # wait_for_process deal with it.
+ # _wait_for_process deal with it.
self.status.record_processing_began(uuid)
cmd = (
f'{SSH} {bundle.username}@{bundle.machine} '
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+ logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
- return self.wait_for_process(p, bundle, 0)
+ logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
+ return self._wait_for_process(p, bundle, 0)
- def wait_for_process(
- self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+ def _wait_for_process(
+ self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
+ """At this point we've copied the bundle's pickled code to the remote
+ worker and started an ssh process that should be invoking the
+ remote worker to have it execute the user's code. See how
+ that's going and wait for it to complete or fail. Note that
+ this code is recursive: there are codepaths where we decide to
+ stop waiting for an ssh process (because another backup seems
+ to have finished) but then fail to fetch or parse the results
+ from that backup and thus call ourselves to continue waiting
+ on an active ssh process. This is the purpose of the depth
+ argument: to curtail potential infinite recursion by giving up
+ eventually.
+
+ Args:
+ p: the Popen record of the ssh job
+ bundle: the bundle of work being executed remotely
+ depth: how many retries we've made so far. Starts at zero.
+
+ """
+
machine = bundle.machine
- pid = p.pid
+ assert p is not None
+ pid = p.pid # pid of the ssh process
if depth > 3:
logger.error(
- f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+ "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
# Spin until either the ssh job we scheduled finishes the
# bundle or some backup worker signals that they finished it
try:
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
- if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ if self._check_if_cancelled(bundle):
+ logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
- logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+ logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
p = None
break
# If we get here we believe the bundle is done; either the ssh
# subprocess finished (hopefully successfully) or we noticed
# that some other worker seems to have completed the bundle
- # and we're bailing out.
+ # before us and we're bailing out.
try:
- ret = self.process_work_result(bundle)
+ ret = self._process_work_result(bundle)
if ret is not None and p is not None:
p.terminate()
return ret
# Otherwise, time for an emergency reschedule.
except Exception as e:
logger.exception(e)
- logger.error(f'{bundle}: Something unexpected just happened...')
+ logger.error('%s: Something unexpected just happened...', bundle)
if p is not None:
- msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- logger.warning(msg)
- return self.wait_for_process(p, bundle, depth + 1)
+ logger.warning(
+ "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+ )
+ return self._wait_for_process(p, bundle, depth + 1)
else:
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
+
+ def _process_work_result(self, bundle: BundleDetails) -> Any:
+ """A bundle seems to be completed. Check on the results."""
- def process_work_result(self, bundle: BundleDetails) -> Any:
with self.status.lock:
is_original = bundle.src_bundle is None
was_cancelled = bundle.was_cancelled
if bundle.hostname not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
- f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+ "%s: Fetching results back from %s@%s via %s",
+ bundle,
+ username,
+ machine,
+ cmd,
)
# If either of these throw they are handled in
- # wait_for_process.
+ # _wait_for_process.
attempts = 0
while True:
try:
except Exception as e:
attempts += 1
if attempts >= 3:
- raise (e)
+ raise e
else:
break
+ # Cleanup remote /tmp files.
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
+ logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
# original is also the only job that may delete result_file
# from disk. Note that the original may have been cancelled
# if one of the backups finished first; it still must read the
- # result from disk.
+ # result from disk. It still does that here with is_cancelled
+ # set.
if is_original:
- logger.debug(f"{bundle}: Unpickling {result_file}.")
+ logger.debug("%s: Unpickling %s.", bundle, result_file)
try:
with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
- msg = f'Failed to load {result_file}... this is bad news.'
- logger.critical(msg)
- self.release_worker(bundle)
+ logger.error('Failed to load %s... this is bad news.', result_file)
+ self._release_worker(bundle)
- # Re-raise the exception; the code in wait_for_process may
- # decide to emergency_retry_nasty_bundle here.
- raise Exception(e)
- logger.debug(f'Removing local (master) {code_file} and {result_file}.')
- os.remove(f'{result_file}')
- os.remove(f'{code_file}')
+ # Re-raise the exception; the code in _wait_for_process may
+ # decide to _emergency_retry_nasty_bundle here.
+ raise e
+ logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+ os.remove(result_file)
+ os.remove(code_file)
# Notify any backups that the original is done so they
# should stop ASAP. Do this whether or not we
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+ '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
)
backup.is_cancelled.set()
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
)
- bundle.src_bundle.is_cancelled.set()
- self.release_worker(bundle, was_cancelled=was_cancelled)
+ orig_bundle.is_cancelled.set()
+ self._release_worker(bundle, was_cancelled=was_cancelled)
return result
- def create_original_bundle(self, pickle, fname: str):
- from string_utils import generate_uuid
+ def _create_original_bundle(self, pickle, function_name: str):
+ """Creates a bundle that is not a backup of any other bundle but
+ rather represents a user task.
+ """
- uuid = generate_uuid(omit_dashes=True)
+ uuid = string_utils.generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
- logger.debug(f'Writing pickled code to {code_file}')
- with open(f'{code_file}', 'wb') as wb:
+ logger.debug('Writing pickled code to %s', code_file)
+ with open(code_file, 'wb') as wb:
wb.write(pickle)
bundle = BundleDetails(
pickled_code=pickle,
uuid=uuid,
- fname=fname,
+ function_name=function_name,
worker=None,
username=None,
machine=None,
failure_count=0,
)
self.status.record_bundle_details(bundle)
- logger.debug(f'{bundle}: Created an original bundle')
+ logger.debug('%s: Created an original bundle', bundle)
return bundle
- def create_backup_bundle(self, src_bundle: BundleDetails):
+ def _create_backup_bundle(self, src_bundle: BundleDetails):
+ """Creates a bundle that is a backup of another bundle that is
+ running too slowly."""
+
+ assert self.status.lock.locked()
assert src_bundle.backup_bundles is not None
n = len(src_bundle.backup_bundles)
uuid = src_bundle.uuid + f'_backup#{n}'
backup_bundle = BundleDetails(
pickled_code=src_bundle.pickled_code,
uuid=uuid,
- fname=src_bundle.fname,
+ function_name=src_bundle.function_name,
worker=None,
username=None,
machine=None,
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
- logger.debug(f'{backup_bundle}: Created a backup bundle')
+ logger.debug('%s: Created a backup bundle', backup_bundle)
return backup_bundle
- def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ """Schedule a backup of src_bundle."""
+
assert self.status.lock.locked()
assert src_bundle is not None
- backup_bundle = self.create_backup_bundle(src_bundle)
+ backup_bundle = self._create_backup_bundle(src_bundle)
logger.debug(
- f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+ '%s/%s: Scheduling backup for execution...',
+ backup_bundle.uuid,
+ backup_bundle.function_name,
)
- self._helper_executor.submit(self.launch, backup_bundle)
+ self._helper_executor.submit(self._launch, backup_bundle)
# Results from backups don't matter; if they finish first
# they will move the result_file to this machine and let
- # the original pick them up and unpickle them.
+ # the original pick them up and unpickle them (and return
+ # a result).
+
+ def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+ """Something unexpectedly failed with bundle. Either retry it
+ from the beginning or throw in the towel and give up on it."""
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
if bundle.failure_count > retry_limit:
logger.error(
- f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+ '%s: Tried this bundle too many times already (%dx); giving up.',
+ bundle,
+ retry_limit,
)
if is_original:
raise RemoteExecutorException(
- f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+ f'{bundle}: This bundle can\'t be completed despite several backups and retries',
)
else:
logger.error(
- f'{bundle}: At least it\'s only a backup; better luck with the others.'
+ '%s: At least it\'s only a backup; better luck with the others.', bundle
)
return None
else:
msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
logger.warning(msg)
warnings.warn(msg)
- return self.launch(bundle, avoid_last_machine)
+ return self._launch(bundle, avoid_last_machine)
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ """Submit work to be done. This is the user entry point of this
+ class."""
if self.already_shutdown:
raise Exception('Submitted work after shutdown.')
- pickle = make_cloud_pickle(function, *args, **kwargs)
- bundle = self.create_original_bundle(pickle, function.__name__)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
+ bundle = self._create_original_bundle(pickle, function.__name__)
self.total_bundles_submitted += 1
- return self._helper_executor.submit(self.launch, bundle)
+ return self._helper_executor.submit(self._launch, bundle)
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ """Shutdown the executor."""
if not self.already_shutdown:
- logging.debug(f'Shutting down RemoteExecutor {self.title}')
+ logging.debug('Shutting down RemoteExecutor %s', self.title)
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
- print(self.histogram)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
@singleton
class DefaultExecutors(object):
+ """A container for a default thread, process and remote executor.
+ These are not created until needed and we take care to clean up
+ before process exit automatically for the caller's convenience.
+ Instead of creating your own executor, consider using the one
+ from this pool. e.g.::
+
+ @par.parallelize(method=par.Method.PROCESS)
+ def do_work(
+ solutions: List[Work],
+ shard_num: int,
+ ...
+ ):
+ <do the work>
+
+
+ def start_do_work(all_work: List[Work]):
+ shards = []
+ logger.debug('Sharding work into groups of 10.')
+ for subset in list_utils.shard(all_work, 10):
+ shards.append([x for x in subset])
+
+ logger.debug('Kicking off helper pool.')
+ try:
+ for n, shard in enumerate(shards):
+ results.append(
+ do_work(
+ shard, n, shared_cache.get_name(), max_letter_pop_per_word
+ )
+ )
+ smart_future.wait_all(results)
+ finally:
+ # Note: if you forget to do this it will clean itself up
+ # during program termination including tearing down any
+ # active ssh connections.
+ executors.DefaultExecutors().process_pool().shutdown()
+ """
+
def __init__(self):
self.thread_executor: Optional[ThreadExecutor] = None
self.process_executor: Optional[ProcessExecutor] = None
self.remote_executor: Optional[RemoteExecutor] = None
- def ping(self, host) -> bool:
- logger.debug(f'RUN> ping -c 1 {host}')
+ @staticmethod
+ def _ping(host) -> bool:
+ logger.debug('RUN> ping -c 1 %s', host)
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
if self.remote_executor is None:
logger.info('Looking for some helper machines...')
pool: List[RemoteWorkerRecord] = []
- if self.ping('cheetah.house'):
+ if self._ping('cheetah.house'):
logger.info('Found cheetah.house')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
- weight=30,
- count=6,
+ weight=24,
+ count=5,
),
)
- if self.ping('meerkat.cabin'):
+ if self._ping('meerkat.cabin'):
logger.info('Found meerkat.cabin')
pool.append(
RemoteWorkerRecord(
count=2,
),
)
- if self.ping('wannabe.house'):
+ if self._ping('wannabe.house'):
logger.info('Found wannabe.house')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
- weight=25,
- count=10,
+ weight=14,
+ count=2,
),
)
- if self.ping('puma.cabin'):
+ if self._ping('puma.cabin'):
logger.info('Found puma.cabin')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
- weight=30,
- count=6,
+ weight=24,
+ count=5,
),
)
- if self.ping('backup.house'):
+ if self._ping('backup.house'):
logger.info('Found backup.house')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='backup.house',
- weight=8,
+ weight=9,
count=2,
),
)
# The controller machine has a lot to do; go easy on it.
for record in pool:
if record.machine == platform.node() and record.count > 1:
- logger.info(f'Reducing workload for {record.machine}.')
- record.count = 1
+ logger.info('Reducing workload for %s.', record.machine)
+ record.count = max(int(record.count / 2), 1)
policy = WeightedRandomRemoteWorkerSelectionPolicy()
policy.register_worker_pool(pool)
def shutdown(self) -> None:
if self.thread_executor is not None:
- self.thread_executor.shutdown()
+ self.thread_executor.shutdown(wait=True, quiet=True)
self.thread_executor = None
if self.process_executor is not None:
- self.process_executor.shutdown()
+ self.process_executor.shutdown(wait=True, quiet=True)
self.process_executor = None
if self.remote_executor is not None:
- self.remote_executor.shutdown()
+ self.remote_executor.shutdown(wait=True, quiet=True)
self.remote_executor = None