#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
-from __future__ import annotations
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
-from abc import ABC, abstractmethod
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
def make_cloud_pickle(fun, *args, **kwargs):
- logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+ logger.debug("Making cloudpickled bundle at %s", fun.__name__)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
+ """The base executor interface definition."""
+
def __init__(self, *, title=''):
self.title = title
self.histogram = hist.SimpleHistogram(
pass
@abstractmethod
- def shutdown(self, wait: bool = True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
pass
- def shutdown_if_idle(self) -> bool:
+ def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
"""
if self.task_count == 0:
- self.shutdown()
+ self.shutdown(wait=True, quiet=quiet)
return True
return False
"""
self.task_count += delta
- logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+ logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
def get_task_count(self) -> int:
"""Change the task count. Note: do not call this method from a
class ThreadExecutor(BaseExecutor):
+ """A threadpool executor instance."""
+
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
- logger.debug(f'Creating threadpool executor with {workers} workers')
+ logger.debug('Creating threadpool executor with %d workers', workers)
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
self.already_shutdown = False
# This is run on a different thread; do not adjust task count here.
- def run_local_bundle(self, fun, *args, **kwargs):
- logger.debug(f"Running local bundle at {fun.__name__}")
+ @staticmethod
+ def run_local_bundle(fun, *args, **kwargs):
+ logger.debug("Running local bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
newargs.append(arg)
start = time.time()
result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
+ ThreadExecutor.run_local_bundle, *newargs, **kwargs
)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down threadpool executor {self.title}')
- print(self.histogram)
+ logger.debug('Shutting down threadpool executor %s', self.title)
self._thread_pool_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
class ProcessExecutor(BaseExecutor):
+ """A processpool executor."""
+
def __init__(self, max_workers=None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
- logger.debug(f'Creating processpool executor with {workers} workers.')
+ logger.debug('Creating processpool executor with %d workers.', workers)
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
self.already_shutdown = False
# This is run in another process; do not adjust task count here.
- def run_cloud_pickle(self, pickle):
+ @staticmethod
+ def run_cloud_pickle(pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
- logger.debug(f"Running pickled bundle at {fun.__name__}")
+ logger.debug("Running pickled bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
start = time.time()
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
- result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+ result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down processpool executor {self.title}')
+ logger.debug('Shutting down processpool executor %s', self.title)
self._process_executor.shutdown(wait)
- print(self.histogram)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
def __getstate__(self):
@dataclass
class RemoteWorkerRecord:
+ """A record of info about a remote worker."""
+
username: str
machine: str
weight: int
@dataclass
class BundleDetails:
+ """All info necessary to define some unit of work that needs to be
+ done, where it is being run, its state, whether it is an original
+ bundle of a backup bundle, how many times it has failed, etc...
+
+ """
+
pickled_code: bytes
uuid: str
fname: str
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle: BundleDetails
+ src_bundle: Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
class RemoteExecutorStatus:
+ """A status 'scoreboard' for a remote executor."""
+
def __init__(self, total_worker_count: int) -> None:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str, float] = defaultdict(float)
+ self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
- x = self.finished_bundle_timings_per_worker.get(worker, list())
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
+ x = self.finished_bundle_timings_per_worker.get(worker, [])
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
self.finished_bundle_timings.append(bundle_latency)
class RemoteWorkerSelectionPolicy(ABC):
- def register_worker_pool(self, workers):
+ """A policy for selecting a remote worker base class."""
+
+ def __init__(self):
+ self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+ def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
self.workers = workers
@abstractmethod
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that uses weighted RNG."""
+
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
- for worker in self.workers:
- if worker.machine != machine_to_avoid:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ if self.workers:
+ for worker in self.workers:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
- for worker in self.workers:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
logger.warning('There are no available workers?!')
worker = random.sample(grabbag, 1)[0]
assert worker.count > 0
worker.count -= 1
- logger.debug(f'Chose worker {worker}')
+ logger.debug('Selected worker %s', worker)
return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that just round robins."""
+
def __init__(self) -> None:
+ super().__init__()
self.index = 0
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
- x = self.index
- while True:
- worker = self.workers[x]
- if worker.count > 0:
- worker.count -= 1
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ if self.workers:
+ x = self.index
+ while True:
+ worker = self.workers[x]
+ if worker.count > 0:
+ worker.count -= 1
+ x += 1
+ if x >= len(self.workers):
+ x = 0
+ self.index = x
+ logger.debug('Selected worker %s', worker)
+ return worker
x += 1
if x >= len(self.workers):
x = 0
- self.index = x
- logger.debug(f'Selected worker {worker}')
- return worker
- x += 1
- if x >= len(self.workers):
- x = 0
- if x == self.index:
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ if x == self.index:
+ logger.warning('Unexpectedly could not find a worker, retrying...')
+ return None
+ return None
class RemoteExecutor(BaseExecutor):
+ """A remote work executor."""
+
def __init__(
self,
workers: List[RemoteWorkerRecord],
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug('score[%s] => %.1f # latency boost', bundle, score)
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug('score[%s] => %.1f # >global p95', bundle, score)
# Prefer backups of bundles that don't
# have backups already.
else:
score = 0
logger.debug(
- f'score[{bundle}] => {score} # {backup_count} dup backup factor'
+ 'score[%s] => %.1f # {backup_count} dup backup factor',
+ bundle,
+ score,
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
- f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+ '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+ bundle_to_backup,
+ best_score,
)
self.schedule_backup_for_bundle(bundle_to_backup)
finally:
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
- logger.debug(f'Released worker {worker}')
+ logger.debug('Released worker %s', worker)
self.status.record_release_worker(
worker,
bundle.uuid,
def check_if_cancelled(self, bundle: BundleDetails) -> bool:
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
- logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+ logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
bundle.was_cancelled = True
return True
return False
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
machine = bundle.machine = worker.machine
username = bundle.username = worker.username
self.status.record_acquire_worker(worker, uuid)
- logger.debug(f'{bundle}: Running bundle on {worker}...')
+ logger.debug('%s: Running bundle on %s...', bundle, worker)
# Before we do any work, make sure the bundle is still viable.
# It may have been some time between when it was submitted and
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# thing.
logger.exception(e)
logger.error(
- f'{bundle}: We are the original owner thread and yet there are '
- + 'no results for this bundle. This is unexpected and bad.'
+ '%s: We are the original owner thread and yet there are '
+ 'no results for this bundle. This is unexpected and bad.',
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
- logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+ logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+ logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
except Exception as e:
self.release_worker(bundle)
if is_original:
# And we're the original bundle. We have to retry.
logger.exception(e)
logger.error(
- f"{bundle}: Failed to send instructions to the worker machine?! "
- + "This is not expected; we\'re the original bundle so this shouldn\'t "
- + "be a race condition. Attempting an emergency retry..."
+ "%s: Failed to send instructions to the worker machine?! "
+ "This is not expected; we\'re the original bundle so this shouldn\'t "
+ "be a race condition. Attempting an emergency retry...",
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
- msg = f'{bundle}: Failed to send instructions to the worker machine... '
- msg += 'We\'re a backup and this may be caused by the original (or some '
- msg += 'other backup) already finishing this work. Ignoring this.'
- logger.warning(msg)
+ logger.warning(
+ '%s: Failed to send instructions to the worker machine... '
+ 'We\'re a backup and this may be caused by the original (or '
+ 'some other backup) already finishing this work. Ignoring.',
+ bundle,
+ )
return None
# Kick off the work. Note that if this fails we let
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+ logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+ self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
- f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+ "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
self.release_worker(bundle)
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
- logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+ logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
p = None
break
# Otherwise, time for an emergency reschedule.
except Exception as e:
logger.exception(e)
- logger.error(f'{bundle}: Something unexpected just happened...')
+ logger.error('%s: Something unexpected just happened...', bundle)
if p is not None:
- msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- logger.warning(msg)
+ logger.warning(
+ "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+ )
return self.wait_for_process(p, bundle, depth + 1)
else:
self.release_worker(bundle)
if bundle.hostname not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
- f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+ "%s: Fetching results back from %s@%s via %s",
+ bundle,
+ username,
+ machine,
+ cmd,
)
# If either of these throw they are handled in
except Exception as e:
attempts += 1
if attempts >= 3:
- raise (e)
+ raise e
else:
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
+ logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
# if one of the backups finished first; it still must read the
# result from disk.
if is_original:
- logger.debug(f"{bundle}: Unpickling {result_file}.")
+ logger.debug("%s: Unpickling %s.", bundle, result_file)
try:
with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
- msg = f'Failed to load {result_file}... this is bad news.'
- logger.critical(msg)
+ logger.error('Failed to load %s... this is bad news.', result_file)
self.release_worker(bundle)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
- raise Exception(e)
- logger.debug(f'Removing local (master) {code_file} and {result_file}.')
- os.remove(f'{result_file}')
- os.remove(f'{code_file}')
+ raise e
+ logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+ os.remove(result_file)
+ os.remove(code_file)
# Notify any backups that the original is done so they
# should stop ASAP. Do this whether or not we
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+ '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
)
backup.is_cancelled.set()
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
)
- bundle.src_bundle.is_cancelled.set()
+ orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
- logger.debug(f'Writing pickled code to {code_file}')
- with open(f'{code_file}', 'wb') as wb:
+ logger.debug('Writing pickled code to %s', code_file)
+ with open(code_file, 'wb') as wb:
wb.write(pickle)
bundle = BundleDetails(
failure_count=0,
)
self.status.record_bundle_details(bundle)
- logger.debug(f'{bundle}: Created an original bundle')
+ logger.debug('%s: Created an original bundle', bundle)
return bundle
def create_backup_bundle(self, src_bundle: BundleDetails):
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
- logger.debug(f'{backup_bundle}: Created a backup bundle')
+ logger.debug('%s: Created a backup bundle', backup_bundle)
return backup_bundle
def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
assert src_bundle is not None
backup_bundle = self.create_backup_bundle(src_bundle)
logger.debug(
- f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+ '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
)
self._helper_executor.submit(self.launch, backup_bundle)
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
if bundle.failure_count > retry_limit:
logger.error(
- f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+ '%s: Tried this bundle too many times already (%dx); giving up.',
+ bundle,
+ retry_limit,
)
if is_original:
raise RemoteExecutorException(
- f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+ f'{bundle}: This bundle can\'t be completed despite several backups and retries',
)
else:
logger.error(
- f'{bundle}: At least it\'s only a backup; better luck with the others.'
+ '%s: At least it\'s only a backup; better luck with the others.', bundle
)
return None
else:
return self._helper_executor.submit(self.launch, bundle)
@overrides
- def shutdown(self, wait=True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logging.debug(f'Shutting down RemoteExecutor {self.title}')
+ logging.debug('Shutting down RemoteExecutor %s', self.title)
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
- print(self.histogram)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
@singleton
class DefaultExecutors(object):
+ """A container for a default thread, process and remote executor.
+ These are not created until needed and we take care to clean up
+ before process exit.
+
+ """
+
def __init__(self):
self.thread_executor: Optional[ThreadExecutor] = None
self.process_executor: Optional[ProcessExecutor] = None
self.remote_executor: Optional[RemoteExecutor] = None
- def ping(self, host) -> bool:
- logger.debug(f'RUN> ping -c 1 {host}')
+ @staticmethod
+ def ping(host) -> bool:
+ logger.debug('RUN> ping -c 1 %s', host)
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
- weight=30,
- count=6,
+ weight=24,
+ count=5,
),
)
if self.ping('meerkat.cabin'):
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
- weight=25,
- count=10,
+ weight=14,
+ count=2,
),
)
if self.ping('puma.cabin'):
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
- weight=30,
- count=6,
+ weight=24,
+ count=5,
),
)
if self.ping('backup.house'):
RemoteWorkerRecord(
username='scott',
machine='backup.house',
- weight=8,
+ weight=9,
count=2,
),
)
# The controller machine has a lot to do; go easy on it.
for record in pool:
if record.machine == platform.node() and record.count > 1:
- logger.info(f'Reducing workload for {record.machine}.')
+ logger.info('Reducing workload for %s.', record.machine)
record.count = 1
policy = WeightedRandomRemoteWorkerSelectionPolicy()
def shutdown(self) -> None:
if self.thread_executor is not None:
- self.thread_executor.shutdown()
+ self.thread_executor.shutdown(wait=True, quiet=True)
self.thread_executor = None
if self.process_executor is not None:
- self.process_executor.shutdown()
+ self.process_executor.shutdown(wait=True, quiet=True)
self.process_executor = None
if self.remote_executor is not None:
- self.remote_executor.shutdown()
+ self.remote_executor.shutdown(wait=True, quiet=True)
self.remote_executor = None