#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+# © Copyright 2021-2022, Scott Gasch
+
"""Defines three executors: a thread executor for doing work using a
threadpool, a process executor for doing work in other processes on
the same machine and a remote executor for farming out work to other
global executors / worker pools with automatic shutdown semantics."""
from __future__ import annotations
-
import concurrent.futures as fut
import logging
import os
import argparse_utils
import config
import histogram as hist
+import string_utils
from ansi import bg, fg, reset, underline
from decorator_utils import singleton
from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
SCP = '/usr/bin/scp -C'
-def make_cloud_pickle(fun, *args, **kwargs):
+def _make_cloud_pickle(fun, *args, **kwargs):
+ """Internal helper to create cloud pickles."""
logger.debug("Making cloudpickled bundle at %s", fun.__name__)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
- """The base executor interface definition."""
+ """The base executor interface definition. The interface for
+ :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+ :class:`ThreadExecutor`.
+ """
def __init__(self, *, title=''):
self.title = title
class ThreadExecutor(BaseExecutor):
- """A threadpool executor instance."""
+ """A threadpool executor. This executor uses python threads to
+ schedule tasks. Note that, at least as of python3.10, because of
+ the global lock in the interpreter itself, these do not
+ parallelize very well so this class is useful mostly for non-CPU
+ intensive tasks.
+
+ See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+ """
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
- logger.debug('Creating threadpool executor with %d workers', workers)
+ if workers is not None:
+ logger.debug('Creating threadpool executor with %d workers', workers)
+ else:
+ logger.debug('Creating a default sized threadpool executor')
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
class ProcessExecutor(BaseExecutor):
- """A processpool executor."""
+ """An executor which runs tasks in child processes.
+
+ See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+ """
def __init__(self, max_workers=None):
super().__init__()
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
- logger.debug('Creating processpool executor with %d workers.', workers)
+ if workers is not None:
+ logger.debug('Creating processpool executor with %d workers.', workers)
+ else:
+ logger.debug('Creating a default sized processpool executor')
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
raise Exception('Submitted work after shutdown.')
start = time.time()
self.adjust_task_count(+1)
- pickle = make_cloud_pickle(function, *args, **kwargs)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
"""A record of info about a remote worker."""
username: str
+ """Username we can ssh into on this machine to run work."""
+
machine: str
+ """Machine address / name."""
+
weight: int
+ """Relative probability for the weighted policy to select this
+ machine for scheduling work."""
+
count: int
+ """If this machine is selected, what is the maximum number of task
+ that it can handle?"""
def __hash__(self):
return hash((self.username, self.machine))
"""All info necessary to define some unit of work that needs to be
done, where it is being run, its state, whether it is an original
bundle of a backup bundle, how many times it has failed, etc...
-
"""
pickled_code: bytes
+ """The code to run, cloud pickled"""
+
uuid: str
- fname: str
+ """A unique identifier"""
+
+ function_name: str
+ """The name of the function we pickled"""
+
worker: Optional[RemoteWorkerRecord]
+ """The remote worker running this bundle or None if none (yet)"""
+
username: Optional[str]
+ """The remote username running this bundle or None if none (yet)"""
+
machine: Optional[str]
+ """The remote machine running this bundle or None if none (yet)"""
+
hostname: str
+ """The controller machine"""
+
code_file: str
+ """A unique filename to hold the work to be done"""
+
result_file: str
+ """Where the results should be placed / read from"""
+
pid: int
+ """The process id of the local subprocess watching the ssh connection
+ to the remote machine"""
+
start_ts: float
+ """Starting time"""
+
end_ts: float
+ """Ending time"""
+
slower_than_local_p95: bool
+ """Currently slower then 95% of other bundles on remote host"""
+
slower_than_global_p95: bool
+ """Currently slower than 95% of other bundles globally"""
+
src_bundle: Optional[BundleDetails]
+ """If this is a backup bundle, this points to the original bundle
+ that it's backing up. None otherwise."""
+
is_cancelled: threading.Event
+ """An event that can be signaled to indicate this bundle is cancelled.
+ This is set when another copy (backup or original) of this work has
+ completed successfully elsewhere."""
+
was_cancelled: bool
+ """True if this bundle was cancelled, False if it finished normally"""
+
backup_bundles: Optional[List[BundleDetails]]
+ """If we've created backups of this bundle, this is the list of them"""
+
failure_count: int
+ """How many times has this bundle failed already?"""
def __repr__(self):
uuid = self.uuid
else:
suffix = uuid[-6:]
+ # We colorize the uuid based on some bits from it to make them
+ # stand out in the logging and help a reader correlate log messages
+ # related to the same bundle.
colorz = [
fg('violet red'),
fg('red'),
fg('medium purple'),
]
c = colorz[int(uuid[-2:], 16) % len(colorz)]
- fname = self.fname if self.fname is not None else 'nofname'
+ function_name = self.function_name if self.function_name is not None else 'nofname'
machine = self.machine if self.machine is not None else 'nomachine'
- return f'{c}{suffix}/{fname}/{machine}{reset()}'
+ return f'{c}{suffix}/{function_name}/{machine}{reset()}'
class RemoteExecutorStatus:
- """A status 'scoreboard' for a remote executor."""
+ """A status 'scoreboard' for a remote executor tracking various
+ metrics and able to render a periodic dump of global state.
+ """
def __init__(self, total_worker_count: int) -> None:
+ """C'tor.
+
+ Args:
+ total_worker_count: number of workers in the pool
+
+ """
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
self.lock: threading.Lock = threading.Lock()
def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Record that bundle with uuid is assigned to a particular worker.
+
+ Args:
+ worker: the record of the worker to which uuid is assigned
+ uuid: the uuid of a bundle that has been assigned to a worker
+ """
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+ """Same as above but an entry point that doesn't acquire the lock
+ for codepaths where it's already held."""
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
self.in_flight_bundles_by_worker[worker] = x
def record_bundle_details(self, details: BundleDetails) -> None:
+ """Register the details about a bundle of work."""
with self.lock:
self.record_bundle_details_already_locked(details)
def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
self.bundle_details_by_uuid[details.uuid] = details
uuid: str,
was_cancelled: bool,
) -> None:
+ """Record that a bundle has released a worker."""
with self.lock:
self.record_release_worker_already_locked(worker, uuid, was_cancelled)
uuid: str,
was_cancelled: bool,
) -> None:
+ """Same as above but for codepaths that already hold the lock."""
assert self.lock.locked()
ts = time.time()
self.end_per_bundle[uuid] = ts
self.finished_bundle_timings.append(bundle_latency)
def record_processing_began(self, uuid: str):
+ """Record when work on a bundle begins."""
with self.lock:
self.start_per_bundle[uuid] = time.time()
def total_in_flight(self) -> int:
+ """How many bundles are in flight currently?"""
assert self.lock.locked()
total_in_flight = 0
for worker in self.known_workers:
return total_in_flight
def total_idle(self) -> int:
+ """How many idle workers are there currently?"""
assert self.lock.locked()
return self.worker_count - self.total_in_flight()
class RemoteExecutor(BaseExecutor):
- """A remote work executor."""
+ """An executor that uses processes on remote machines to do work. This
+ works by creating "bundles" of work with pickled code in each to be
+ executed. Each bundle is assigned a remote worker based on some policy
+ heuristics. Once assigned to a remote worker, a local subprocess is
+ created. It copies the pickled code to the remote machine via ssh/scp
+ and then starts up work on the remote machine again using ssh. When
+ the work is complete it copies the results back to the local machine.
+
+ So there is essentially one "controller" machine (which may also be
+ in the remote executor pool and therefore do task work in addition to
+ controlling) and N worker machines. This code runs on the controller
+ whereas on the worker machines we invoke pickled user code via a
+ shim in :file:`remote_worker.py`.
+
+ Some redundancy and safety provisions are made; e.g. slower than
+ expected tasks have redundant backups created and if a task fails
+ repeatedly we consider it poisoned and give up on it.
+
+ .. warning::
+
+ The network overhead / latency of copying work from the
+ controller machine to the remote workers is relatively high.
+ This executor probably only makes sense to use with
+ computationally expensive tasks such as jobs that will execute
+ for ~30 seconds or longer.
+
+ See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+ """
def __init__(
self,
workers: List[RemoteWorkerRecord],
policy: RemoteWorkerSelectionPolicy,
) -> None:
+ """C'tor.
+
+ Args:
+ workers: A list of remote workers we can call on to do tasks.
+ policy: A policy for selecting remote workers for tasks.
+ """
+
super().__init__()
self.workers = workers
self.policy = policy
(
self.heartbeat_thread,
self.heartbeat_stop_event,
- ) = self.run_periodic_heartbeat()
+ ) = self._run_periodic_heartbeat()
self.already_shutdown = False
@background_thread
- def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+ """
+ We create a background thread to invoke :meth:`_heartbeat` regularly
+ while we are scheduling work. It does some accounting such as
+ looking for slow bundles to tag for backup creation, checking for
+ unexpected failures, and printing a fancy message on stdout.
+ """
while not stop_event.is_set():
time.sleep(5.0)
logger.debug('Running periodic heartbeat code...')
- self.heartbeat()
+ self._heartbeat()
logger.debug('Periodic heartbeat thread shutting down.')
- def heartbeat(self) -> None:
+ def _heartbeat(self) -> None:
# Note: this is invoked on a background thread, not an
# executor thread. Be careful what you do with it b/c it
# needs to get back and dump status again periodically.
# Look for bundles to reschedule via executor.submit
if config.config['executors_schedule_remote_backups']:
- self.maybe_schedule_backup_bundles()
+ self._maybe_schedule_backup_bundles()
+
+ def _maybe_schedule_backup_bundles(self):
+ """Maybe schedule backup bundles if we see a very slow bundle."""
- def maybe_schedule_backup_bundles(self):
assert self.status.lock.locked()
num_done = len(self.status.finished_bundle_timings)
num_idle_workers = self.worker_count - self.task_count
now = time.time()
if (
- num_done > 2
- and num_idle_workers > 1
+ num_done >= 2
+ and num_idle_workers > 0
and (self.last_backup is None or (now - self.last_backup > 9.0))
and self.backup_lock.acquire(blocking=False)
):
# Note: this is all still happening on the heartbeat
# runner thread. That's ok because
- # schedule_backup_for_bundle uses the executor to
+ # _schedule_backup_for_bundle uses the executor to
# submit the bundle again which will cause it to be
# picked up by a worker thread and allow this thread
# to return to run future heartbeats.
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
- '=====> SCHEDULING BACKUP %s (score=%.1f}) <=====',
+ '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
bundle_to_backup,
best_score,
)
- self.schedule_backup_for_bundle(bundle_to_backup)
+ self._schedule_backup_for_bundle(bundle_to_backup)
finally:
self.backup_lock.release()
- def is_worker_available(self) -> bool:
+ def _is_worker_available(self) -> bool:
+ """Is there a worker available currently?"""
return self.policy.is_worker_available()
- def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ """Try to acquire a worker."""
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+ def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+ """Find a worker or block until one becomes available."""
with self.cv:
- while not self.is_worker_available():
+ while not self._is_worker_available():
self.cv.wait()
- worker = self.acquire_worker(machine_to_avoid)
+ worker = self._acquire_worker(machine_to_avoid)
if worker is not None:
return worker
msg = "We should never reach this point in the code"
logger.critical(msg)
raise Exception(msg)
- def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+ """Release a previously acquired worker."""
worker = bundle.worker
assert worker is not None
logger.debug('Released worker %s', worker)
self.cv.notify()
self.adjust_task_count(-1)
- def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+ """See if a particular bundle is cancelled. Do not block."""
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
return True
return False
- def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+ def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
"""Find a worker for bundle or block until one is available."""
+
self.adjust_task_count(+1)
uuid = bundle.uuid
hostname = bundle.hostname
avoid_machine = bundle.src_bundle.machine
worker = None
while worker is None:
- worker = self.find_available_worker_or_block(avoid_machine)
+ worker = self._find_available_worker_or_block(avoid_machine)
assert worker is not None
# Ok, found a worker.
# It may have been some time between when it was submitted and
# now due to lack of worker availability and someone else may
# have already finished it.
- if self.check_if_cancelled(bundle):
+ if self._check_if_cancelled(bundle):
try:
- return self.process_work_result(bundle)
+ return self._process_work_result(bundle)
except Exception as e:
logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
- self.release_worker(bundle)
+ self._release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# bundle. For it to have been cancelled, a backup
'no results for this bundle. This is unexpected and bad.',
bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
- # Expected(?). We're a backup and our bundle is
- # cancelled before we even got started. Something
- # went bad in process_work_result (I acutually don't
- # see what?) but probably not worth worrying
- # about. Let the original thread worry about
- # either finding the results or complaining about
- # it.
+ # We're a backup and our bundle is cancelled
+ # before we even got started. Do nothing and let
+ # the original bundle's thread worry about either
+ # finding the results or complaining about it.
return None
# Send input code / data to worker machine if it's not local.
xfer_latency = time.time() - start_ts
logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
except Exception as e:
- self.release_worker(bundle)
+ self._release_worker(bundle)
if is_original:
- # Weird. We tried to copy the code to the worker and it failed...
- # And we're the original bundle. We have to retry.
+ # Weird. We tried to copy the code to the worker
+ # and it failed... And we're the original bundle.
+ # We have to retry.
logger.exception(e)
logger.error(
"%s: Failed to send instructions to the worker machine?! "
"be a race condition. Attempting an emergency retry...",
bundle,
)
- return self.emergency_retry_nasty_bundle(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
else:
# This is actually expected; we're a backup.
# There's a race condition where someone else
# already finished the work and removed the source
- # code file before we could copy it. No biggie.
+ # code_file before we could copy it. Ignore.
logger.warning(
'%s: Failed to send instructions to the worker machine... '
'We\'re a backup and this may be caused by the original (or '
return None
# Kick off the work. Note that if this fails we let
- # wait_for_process deal with it.
+ # _wait_for_process deal with it.
self.status.record_processing_began(uuid)
cmd = (
f'{SSH} {bundle.username}@{bundle.machine} '
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
- return self.wait_for_process(p, bundle, 0)
+ return self._wait_for_process(p, bundle, 0)
- def wait_for_process(
+ def _wait_for_process(
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
+ """At this point we've copied the bundle's pickled code to the remote
+ worker and started an ssh process that should be invoking the
+ remote worker to have it execute the user's code. See how
+ that's going and wait for it to complete or fail. Note that
+ this code is recursive: there are codepaths where we decide to
+ stop waiting for an ssh process (because another backup seems
+ to have finished) but then fail to fetch or parse the results
+ from that backup and thus call ourselves to continue waiting
+ on an active ssh process. This is the purpose of the depth
+ argument: to curtail potential infinite recursion by giving up
+ eventually.
+
+ Args:
+ p: the Popen record of the ssh job
+ bundle: the bundle of work being executed remotely
+ depth: how many retries we've made so far. Starts at zero.
+
+ """
+
machine = bundle.machine
assert p is not None
- pid = p.pid
+ pid = p.pid # pid of the ssh process
if depth > 3:
logger.error(
"I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
# Spin until either the ssh job we scheduled finishes the
# bundle or some backup worker signals that they finished it
try:
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
- if self.check_if_cancelled(bundle):
+ if self._check_if_cancelled(bundle):
logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
# If we get here we believe the bundle is done; either the ssh
# subprocess finished (hopefully successfully) or we noticed
# that some other worker seems to have completed the bundle
- # and we're bailing out.
+ # before us and we're bailing out.
try:
- ret = self.process_work_result(bundle)
+ ret = self._process_work_result(bundle)
if ret is not None and p is not None:
p.terminate()
return ret
logger.warning(
"%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
)
- return self.wait_for_process(p, bundle, depth + 1)
+ return self._wait_for_process(p, bundle, depth + 1)
else:
- self.release_worker(bundle)
- return self.emergency_retry_nasty_bundle(bundle)
+ self._release_worker(bundle)
+ return self._emergency_retry_nasty_bundle(bundle)
+
+ def _process_work_result(self, bundle: BundleDetails) -> Any:
+ """A bundle seems to be completed. Check on the results."""
- def process_work_result(self, bundle: BundleDetails) -> Any:
with self.status.lock:
is_original = bundle.src_bundle is None
was_cancelled = bundle.was_cancelled
)
# If either of these throw they are handled in
- # wait_for_process.
+ # _wait_for_process.
attempts = 0
while True:
try:
else:
break
+ # Cleanup remote /tmp files.
run_silently(
f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
# original is also the only job that may delete result_file
# from disk. Note that the original may have been cancelled
# if one of the backups finished first; it still must read the
- # result from disk.
+ # result from disk. It still does that here with is_cancelled
+ # set.
if is_original:
logger.debug("%s: Unpickling %s.", bundle, result_file)
try:
except Exception as e:
logger.exception(e)
logger.error('Failed to load %s... this is bad news.', result_file)
- self.release_worker(bundle)
+ self._release_worker(bundle)
- # Re-raise the exception; the code in wait_for_process may
- # decide to emergency_retry_nasty_bundle here.
+ # Re-raise the exception; the code in _wait_for_process may
+ # decide to _emergency_retry_nasty_bundle here.
raise e
logger.debug('Removing local (master) %s and %s.', code_file, result_file)
os.remove(result_file)
'%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
)
orig_bundle.is_cancelled.set()
- self.release_worker(bundle, was_cancelled=was_cancelled)
+ self._release_worker(bundle, was_cancelled=was_cancelled)
return result
- def create_original_bundle(self, pickle, fname: str):
- from string_utils import generate_uuid
+ def _create_original_bundle(self, pickle, function_name: str):
+ """Creates a bundle that is not a backup of any other bundle but
+ rather represents a user task.
+ """
- uuid = generate_uuid(omit_dashes=True)
+ uuid = string_utils.generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
bundle = BundleDetails(
pickled_code=pickle,
uuid=uuid,
- fname=fname,
+ function_name=function_name,
worker=None,
username=None,
machine=None,
logger.debug('%s: Created an original bundle', bundle)
return bundle
- def create_backup_bundle(self, src_bundle: BundleDetails):
+ def _create_backup_bundle(self, src_bundle: BundleDetails):
+ """Creates a bundle that is a backup of another bundle that is
+ running too slowly."""
+
+ assert self.status.lock.locked()
assert src_bundle.backup_bundles is not None
n = len(src_bundle.backup_bundles)
uuid = src_bundle.uuid + f'_backup#{n}'
backup_bundle = BundleDetails(
pickled_code=src_bundle.pickled_code,
uuid=uuid,
- fname=src_bundle.fname,
+ function_name=src_bundle.function_name,
worker=None,
username=None,
machine=None,
logger.debug('%s: Created a backup bundle', backup_bundle)
return backup_bundle
- def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+ """Schedule a backup of src_bundle."""
+
assert self.status.lock.locked()
assert src_bundle is not None
- backup_bundle = self.create_backup_bundle(src_bundle)
+ backup_bundle = self._create_backup_bundle(src_bundle)
logger.debug(
- '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
+ '%s/%s: Scheduling backup for execution...',
+ backup_bundle.uuid,
+ backup_bundle.function_name,
)
- self._helper_executor.submit(self.launch, backup_bundle)
+ self._helper_executor.submit(self._launch, backup_bundle)
# Results from backups don't matter; if they finish first
# they will move the result_file to this machine and let
- # the original pick them up and unpickle them.
+ # the original pick them up and unpickle them (and return
+ # a result).
+
+ def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+ """Something unexpectedly failed with bundle. Either retry it
+ from the beginning or throw in the towel and give up on it."""
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
logger.warning(msg)
warnings.warn(msg)
- return self.launch(bundle, avoid_last_machine)
+ return self._launch(bundle, avoid_last_machine)
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ """Submit work to be done. This is the user entry point of this
+ class."""
if self.already_shutdown:
raise Exception('Submitted work after shutdown.')
- pickle = make_cloud_pickle(function, *args, **kwargs)
- bundle = self.create_original_bundle(pickle, function.__name__)
+ pickle = _make_cloud_pickle(function, *args, **kwargs)
+ bundle = self._create_original_bundle(pickle, function.__name__)
self.total_bundles_submitted += 1
- return self._helper_executor.submit(self.launch, bundle)
+ return self._helper_executor.submit(self._launch, bundle)
@overrides
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ """Shutdown the executor."""
if not self.already_shutdown:
logging.debug('Shutting down RemoteExecutor %s', self.title)
self.heartbeat_stop_event.set()
class DefaultExecutors(object):
"""A container for a default thread, process and remote executor.
These are not created until needed and we take care to clean up
- before process exit.
+ before process exit automatically for the caller's convenience.
+ Instead of creating your own executor, consider using the one
+ from this pool. e.g.::
+
+ @par.parallelize(method=par.Method.PROCESS)
+ def do_work(
+ solutions: List[Work],
+ shard_num: int,
+ ...
+ ):
+ <do the work>
+
+ def start_do_work(all_work: List[Work]):
+ shards = []
+ logger.debug('Sharding work into groups of 10.')
+ for subset in list_utils.shard(all_work, 10):
+ shards.append([x for x in subset])
+
+ logger.debug('Kicking off helper pool.')
+ try:
+ for n, shard in enumerate(shards):
+ results.append(
+ do_work(
+ shard, n, shared_cache.get_name(), max_letter_pop_per_word
+ )
+ )
+ smart_future.wait_all(results)
+ finally:
+ # Note: if you forget to do this it will clean itself up
+ # during program termination including tearing down any
+ # active ssh connections.
+ executors.DefaultExecutors().process_pool().shutdown()
"""
def __init__(self):
self.remote_executor: Optional[RemoteExecutor] = None
@staticmethod
- def ping(host) -> bool:
+ def _ping(host) -> bool:
logger.debug('RUN> ping -c 1 %s', host)
try:
x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
if self.remote_executor is None:
logger.info('Looking for some helper machines...')
pool: List[RemoteWorkerRecord] = []
- if self.ping('cheetah.house'):
+ if self._ping('cheetah.house'):
logger.info('Found cheetah.house')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
weight=24,
- count=6,
+ count=5,
),
)
- if self.ping('meerkat.cabin'):
+ if self._ping('meerkat.cabin'):
logger.info('Found meerkat.cabin')
pool.append(
RemoteWorkerRecord(
count=2,
),
)
- if self.ping('wannabe.house'):
+ if self._ping('wannabe.house'):
logger.info('Found wannabe.house')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
weight=14,
- count=8,
+ count=2,
),
)
- if self.ping('puma.cabin'):
+ if self._ping('puma.cabin'):
logger.info('Found puma.cabin')
pool.append(
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
weight=24,
- count=6,
+ count=5,
),
)
- if self.ping('backup.house'):
+ if self._ping('backup.house'):
logger.info('Found backup.house')
pool.append(
RemoteWorkerRecord(
for record in pool:
if record.machine == platform.node() and record.count > 1:
logger.info('Reducing workload for %s.', record.machine)
- record.count = 1
+ record.count = max(int(record.count / 2), 1)
policy = WeightedRandomRemoteWorkerSelectionPolicy()
policy.register_worker_pool(pool)