#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
from __future__ import annotations
import concurrent.futures as fut
def make_cloud_pickle(fun, *args, **kwargs):
- logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+ logger.debug("Making cloudpickled bundle at %s", fun.__name__)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
+ """The base executor interface definition."""
+
def __init__(self, *, title=''):
self.title = title
self.histogram = hist.SimpleHistogram(
"""
self.task_count += delta
- logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+ logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
def get_task_count(self) -> int:
"""Change the task count. Note: do not call this method from a
class ThreadExecutor(BaseExecutor):
+ """A threadpool executor instance."""
+
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
- logger.debug(f'Creating threadpool executor with {workers} workers')
+ logger.debug('Creating threadpool executor with %d workers', workers)
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
self.already_shutdown = False
# This is run on a different thread; do not adjust task count here.
- def run_local_bundle(self, fun, *args, **kwargs):
- logger.debug(f"Running local bundle at {fun.__name__}")
+ @staticmethod
+ def run_local_bundle(fun, *args, **kwargs):
+ logger.debug("Running local bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
for arg in args:
newargs.append(arg)
start = time.time()
- result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
+ result = self._thread_pool_executor.submit(
+ ThreadExecutor.run_local_bundle, *newargs, **kwargs
+ )
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down threadpool executor {self.title}')
+ logger.debug('Shutting down threadpool executor %s', self.title)
self._thread_pool_executor.shutdown(wait)
if not quiet:
print(self.histogram.__repr__(label_formatter='%ds'))
class ProcessExecutor(BaseExecutor):
+ """A processpool executor."""
+
def __init__(self, max_workers=None):
super().__init__()
workers = None
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
- logger.debug(f'Creating processpool executor with {workers} workers.')
+ logger.debug('Creating processpool executor with %d workers.', workers)
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
self.already_shutdown = False
# This is run in another process; do not adjust task count here.
- def run_cloud_pickle(self, pickle):
+ @staticmethod
+ def run_cloud_pickle(pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
- logger.debug(f"Running pickled bundle at {fun.__name__}")
+ logger.debug("Running pickled bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
start = time.time()
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
- result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+ result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logger.debug(f'Shutting down processpool executor {self.title}')
+ logger.debug('Shutting down processpool executor %s', self.title)
self._process_executor.shutdown(wait)
if not quiet:
print(self.histogram.__repr__(label_formatter='%ds'))
@dataclass
class RemoteWorkerRecord:
+ """A record of info about a remote worker."""
+
username: str
machine: str
weight: int
@dataclass
class BundleDetails:
+ """All info necessary to define some unit of work that needs to be
+ done, where it is being run, its state, whether it is an original
+ bundle of a backup bundle, how many times it has failed, etc...
+
+ """
+
pickled_code: bytes
uuid: str
fname: str
class RemoteExecutorStatus:
+ """A status 'scoreboard' for a remote executor."""
+
def __init__(self, total_worker_count: int) -> None:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
start = self.start_per_bundle[uuid]
assert start is not None
bundle_latency = ts - start
- x = self.finished_bundle_timings_per_worker.get(worker, list())
+ x = self.finished_bundle_timings_per_worker.get(worker, [])
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
self.finished_bundle_timings.append(bundle_latency)
class RemoteWorkerSelectionPolicy(ABC):
- def register_worker_pool(self, workers):
+ """A policy for selecting a remote worker base class."""
+
+ def __init__(self):
+ self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+ def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
self.workers = workers
@abstractmethod
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that uses weighted RNG."""
+
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
- for worker in self.workers:
- if worker.machine != machine_to_avoid:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ if self.workers:
+ for worker in self.workers:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
- for worker in self.workers:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
logger.warning('There are no available workers?!')
worker = random.sample(grabbag, 1)[0]
assert worker.count > 0
worker.count -= 1
- logger.debug(f'Chose worker {worker}')
+ logger.debug('Selected worker %s', worker)
return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that just round robins."""
+
def __init__(self) -> None:
+ super().__init__()
self.index = 0
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
- x = self.index
- while True:
- worker = self.workers[x]
- if worker.count > 0:
- worker.count -= 1
+ if self.workers:
+ x = self.index
+ while True:
+ worker = self.workers[x]
+ if worker.count > 0:
+ worker.count -= 1
+ x += 1
+ if x >= len(self.workers):
+ x = 0
+ self.index = x
+ logger.debug('Selected worker %s', worker)
+ return worker
x += 1
if x >= len(self.workers):
x = 0
- self.index = x
- logger.debug(f'Selected worker {worker}')
- return worker
- x += 1
- if x >= len(self.workers):
- x = 0
- if x == self.index:
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ if x == self.index:
+ logger.warning('Unexpectedly could not find a worker, retrying...')
+ return None
+ return None
class RemoteExecutor(BaseExecutor):
+ """A remote work executor."""
+
def __init__(
self,
workers: List[RemoteWorkerRecord],
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
+ logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(f'score[{bundle}] => {score} # latency boost')
+ logger.debug('score[%s] => %.1f # latency boost', bundle, score)
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(f'score[{bundle}] => {score} # >worker p95')
+ logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(f'score[{bundle}] => {score} # >global p95')
+ logger.debug('score[%s] => %.1f # >global p95', bundle, score)
# Prefer backups of bundles that don't
# have backups already.
else:
score = 0
logger.debug(
- f'score[{bundle}] => {score} # {backup_count} dup backup factor'
+ 'score[%s] => %.1f # {backup_count} dup backup factor',
+ bundle,
+ score,
)
if score != 0 and (best_score is None or score > best_score):
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
- f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+ '=====> SCHEDULING BACKUP %s (score=%.1f}) <=====',
+ bundle_to_backup,
+ best_score,
)
self.schedule_backup_for_bundle(bundle_to_backup)
finally:
def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
- logger.debug(f'Released worker {worker}')
+ logger.debug('Released worker %s', worker)
self.status.record_release_worker(
worker,
bundle.uuid,
def check_if_cancelled(self, bundle: BundleDetails) -> bool:
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
- logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+ logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
bundle.was_cancelled = True
return True
return False
machine = bundle.machine = worker.machine
username = bundle.username = worker.username
self.status.record_acquire_worker(worker, uuid)
- logger.debug(f'{bundle}: Running bundle on {worker}...')
+ logger.debug('%s: Running bundle on %s...', bundle, worker)
# Before we do any work, make sure the bundle is still viable.
# It may have been some time between when it was submitted and
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
+ logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# thing.
logger.exception(e)
logger.error(
- f'{bundle}: We are the original owner thread and yet there are '
- + 'no results for this bundle. This is unexpected and bad.'
+ '%s: We are the original owner thread and yet there are '
+ 'no results for this bundle. This is unexpected and bad.',
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
try:
cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
- logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+ logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+ logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
except Exception as e:
self.release_worker(bundle)
if is_original:
# And we're the original bundle. We have to retry.
logger.exception(e)
logger.error(
- f"{bundle}: Failed to send instructions to the worker machine?! "
- + "This is not expected; we\'re the original bundle so this shouldn\'t "
- + "be a race condition. Attempting an emergency retry..."
+ "%s: Failed to send instructions to the worker machine?! "
+ "This is not expected; we\'re the original bundle so this shouldn\'t "
+ "be a race condition. Attempting an emergency retry...",
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
- msg = f'{bundle}: Failed to send instructions to the worker machine... '
- msg += 'We\'re a backup and this may be caused by the original (or some '
- msg += 'other backup) already finishing this work. Ignoring this.'
- logger.warning(msg)
+ logger.warning(
+ '%s: Failed to send instructions to the worker machine... '
+ 'We\'re a backup and this may be caused by the original (or '
+ 'some other backup) already finishing this work. Ignoring.',
+ bundle,
+ )
return None
# Kick off the work. Note that if this fails we let
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+ logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
+ logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
pid = p.pid
if depth > 3:
logger.error(
- f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+ "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
self.release_worker(bundle)
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(f'{bundle}: looks like another worker finished bundle...')
+ logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
- logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+ logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
p = None
break
# Otherwise, time for an emergency reschedule.
except Exception as e:
logger.exception(e)
- logger.error(f'{bundle}: Something unexpected just happened...')
+ logger.error('%s: Something unexpected just happened...', bundle)
if p is not None:
- msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- logger.warning(msg)
+ logger.warning(
+ "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+ )
return self.wait_for_process(p, bundle, depth + 1)
else:
self.release_worker(bundle)
if bundle.hostname not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
- f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+ "%s: Fetching results back from %s@%s via %s",
+ bundle,
+ username,
+ machine,
+ cmd,
)
# If either of these throw they are handled in
except Exception as e:
attempts += 1
if attempts >= 3:
- raise (e)
+ raise e
else:
break
run_silently(
f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
- logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
+ logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
# if one of the backups finished first; it still must read the
# result from disk.
if is_original:
- logger.debug(f"{bundle}: Unpickling {result_file}.")
+ logger.debug("%s: Unpickling %s.", bundle, result_file)
try:
with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
- msg = f'Failed to load {result_file}... this is bad news.'
- logger.critical(msg)
+ logger.error('Failed to load %s... this is bad news.', result_file)
self.release_worker(bundle)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
- raise Exception(e)
- logger.debug(f'Removing local (master) {code_file} and {result_file}.')
- os.remove(f'{result_file}')
- os.remove(f'{code_file}')
+ raise e
+ logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+ os.remove(result_file)
+ os.remove(code_file)
# Notify any backups that the original is done so they
# should stop ASAP. Do this whether or not we
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
- logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
+ logger.debug(
+ '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
+ )
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
if not was_cancelled:
orig_bundle = bundle.src_bundle
assert orig_bundle is not None
- logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
+ logger.debug(
+ '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
+ )
orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
- logger.debug(f'Writing pickled code to {code_file}')
- with open(f'{code_file}', 'wb') as wb:
+ logger.debug('Writing pickled code to %s', code_file)
+ with open(code_file, 'wb') as wb:
wb.write(pickle)
bundle = BundleDetails(
failure_count=0,
)
self.status.record_bundle_details(bundle)
- logger.debug(f'{bundle}: Created an original bundle')
+ logger.debug('%s: Created an original bundle', bundle)
return bundle
def create_backup_bundle(self, src_bundle: BundleDetails):
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
- logger.debug(f'{backup_bundle}: Created a backup bundle')
+ logger.debug('%s: Created a backup bundle', backup_bundle)
return backup_bundle
def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
assert src_bundle is not None
backup_bundle = self.create_backup_bundle(src_bundle)
logger.debug(
- f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+ '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
)
self._helper_executor.submit(self.launch, backup_bundle)
if bundle.failure_count > retry_limit:
logger.error(
- f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+ '%s: Tried this bundle too many times already (%dx); giving up.',
+ bundle,
+ retry_limit,
)
if is_original:
raise RemoteExecutorException(
- f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+ f'{bundle}: This bundle can\'t be completed despite several backups and retries',
)
else:
logger.error(
- f'{bundle}: At least it\'s only a backup; better luck with the others.'
+ '%s: At least it\'s only a backup; better luck with the others.', bundle
)
return None
else:
@overrides
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
if not self.already_shutdown:
- logging.debug(f'Shutting down RemoteExecutor {self.title}')
+ logging.debug('Shutting down RemoteExecutor %s', self.title)
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
@singleton
class DefaultExecutors(object):
+ """A container for a default thread, process and remote executor.
+ These are not created until needed and we take care to clean up
+ before process exit.
+
+ """
+
def __init__(self):
self.thread_executor: Optional[ThreadExecutor] = None
self.process_executor: Optional[ProcessExecutor] = None
self.remote_executor: Optional[RemoteExecutor] = None
- def ping(self, host) -> bool:
- logger.debug(f'RUN> ping -c 1 {host}')
+ @staticmethod
+ def ping(host) -> bool:
+ logger.debug('RUN> ping -c 1 %s', host)
try:
x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
# The controller machine has a lot to do; go easy on it.
for record in pool:
if record.machine == platform.node() and record.count > 1:
- logger.info(f'Reducing workload for {record.machine}.')
+ logger.info('Reducing workload for %s.', record.machine)
record.count = 1
policy = WeightedRandomRemoteWorkerSelectionPolicy()