# © Copyright 2021-2022, Scott Gasch
-"""Defines three executors: a thread executor for doing work using a
-threadpool, a process executor for doing work in other processes on
-the same machine and a remote executor for farming out work to other
-machines.
-
-Also defines DefaultExecutors which is a container for references to
-global executors / worker pools with automatic shutdown semantics."""
+"""
+This module defines a :class:`BaseExecutor` interface and three
+implementations:
+
+ - :class:`ThreadExecutor`
+ - :class:`ProcessExecutor`
+ - :class:`RemoteExecutor`
+
+The :class:`ThreadExecutor` is used to dispatch work to background
+threads in the same Python process for parallelized work. Of course,
+until the Global Interpreter Lock (GIL) bottleneck is resolved, this
+is not terribly useful for compute-bound code. But it's good for
+work that is mostly I/O bound.
+
+The :class:`ProcessExecutor` is used to dispatch work to other
+processes on the same machine and is more useful for compute-bound
+workloads.
+
+The :class:`RemoteExecutor` is used in conjunection with `ssh`,
+the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
+to dispatch work to a set of remote worker machines on your
+network. You can configure this pool via a JSON configuration file,
+an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
+
+Finally, this file defines a :class:`DefaultExecutors` pool that
+contains a pre-created and ready instance of each of the three
+executors discussed. It has the added benefit of being automatically
+cleaned up at process termination time.
+
+See instructions in :mod:`pyutils.parallelize.parallelize` for
+setting up and using the framework.
+"""
from __future__ import annotations
from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
-import numpy
from overrides import overrides
import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, persistent, string_utils
+from pyutils import argparse_utils, config, math_utils, persistent, string_utils
from pyutils.ansi import bg, fg, reset, underline
from pyutils.decorator_utils import singleton
from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
help='Path of the remote worker records file (JSON)',
default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
)
+parser.add_argument(
+ '--remote_worker_helper_path',
+ type=str,
+ metavar='PATH_TO_REMOTE_WORKER_PY',
+ help='Path to remote_worker.py on remote machines',
+ default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+)
SSH = '/usr/bin/ssh -oForwardX11=no'
"""
def __init__(self, *, title=''):
+ """
+ Args:
+ title: the name of this executor.
+ """
self.title = title
self.histogram = hist.SimpleHistogram(
hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
@abstractmethod
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ """Submit work for the executor to do.
+
+ Args:
+ function: the Callable to be executed.
+ *args: the arguments to function
+ **kwargs: the arguments to function
+
+ Returns:
+ A concurrent :class:`Future` representing the result of the
+ work.
+ """
pass
@abstractmethod
def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ """Shutdown the executor.
+
+ Args:
+ wait: wait for the shutdown to complete before returning?
+ quiet: keep it quiet, please.
+ """
pass
def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
otherwise. Note: this should only be called by the launcher
process.
+ Args:
+ quiet: keep it quiet, please.
+
+ Returns:
+ True if the executor could be shut down because it has no
+ pending work, False otherwise.
"""
if self.task_count == 0:
self.shutdown(wait=True, quiet=quiet)
worker, it should only be called by the launcher process /
thread / machine.
+ Args:
+ delta: the delta value by which to adjust task count.
"""
self.task_count += delta
logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
worker, it should only be called by the launcher process /
thread / machine.
+ Returns:
+ The executor's current task count.
"""
return self.task_count
class ThreadExecutor(BaseExecutor):
- """A threadpool executor. This executor uses python threads to
+ """A threadpool executor. This executor uses Python threads to
schedule tasks. Note that, at least as of python3.10, because of
the global lock in the interpreter itself, these do not
parallelize very well so this class is useful mostly for non-CPU
"""
def __init__(self, max_workers: Optional[int] = None):
+ """
+ Args:
+ max_workers: maximum number of threads to create in the pool.
+ """
super().__init__()
workers = None
if max_workers is not None:
# This is run on a different thread; do not adjust task count here.
@staticmethod
- def run_local_bundle(fun, *args, **kwargs):
+ def _run_local_bundle(fun, *args, **kwargs):
logger.debug("Running local bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
newargs.append(arg)
start = time.time()
result = self._thread_pool_executor.submit(
- ThreadExecutor.run_local_bundle, *newargs, **kwargs
+ ThreadExecutor._run_local_bundle, *newargs, **kwargs
)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
"""
def __init__(self, max_workers=None):
+ """
+ Args:
+ max_workers: the max number of worker processes to create.
+ """
super().__init__()
workers = None
if max_workers is not None:
# This is run in another process; do not adjust task count here.
@staticmethod
- def run_cloud_pickle(pickle):
+ def _run_cloud_pickle(pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
logger.debug("Running pickled bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
start = time.time()
self.adjust_task_count(+1)
pickle = _make_cloud_pickle(function, *args, **kwargs)
- result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
+ result = self._process_executor.submit(
+ ProcessExecutor._run_cloud_pickle, pickle
+ )
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
"""
def __init__(self, total_worker_count: int) -> None:
- """C'tor.
-
+ """
Args:
total_worker_count: number of workers in the pool
-
"""
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
+ RemoteWorkerRecord, math_utils.NumericPopulation
] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
- self.finished_bundle_timings: List[float] = []
+ self.finished_bundle_timings: math_utils.NumericPopulation = (
+ math_utils.NumericPopulation()
+ )
self.last_periodic_dump: Optional[float] = None
self.total_bundles_submitted: int = 0
start = self.start_per_bundle[uuid]
assert start is not None
bundle_latency = ts - start
- x = self.finished_bundle_timings_per_worker.get(worker, [])
- x.append(bundle_latency)
+ x = self.finished_bundle_timings_per_worker.get(
+ worker, math_utils.NumericPopulation()
+ )
+ x.add_number(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
- self.finished_bundle_timings.append(bundle_latency)
+ self.finished_bundle_timings.add_number(bundle_latency)
def record_processing_began(self, uuid: str):
"""Record when work on a bundle begins."""
ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
qall = None
if len(self.finished_bundle_timings) > 1:
- qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
+ qall_median = self.finished_bundle_timings.get_median()
+ qall_p95 = self.finished_bundle_timings.get_percentile(95)
ret += (
- f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
+ f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)
for worker in self.known_workers:
ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
- timings = self.finished_bundle_timings_per_worker.get(worker, [])
+ timings = self.finished_bundle_timings_per_worker.get(
+ worker, math_utils.NumericPopulation()
+ )
count = len(timings)
- qworker = None
+ qworker_median = None
+ qworker_p95 = None
if count > 1:
- qworker = numpy.quantile(timings, [0.5, 0.95])
- ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
+ qworker_median = timings.get_median()
+ qworker_p95 = timings.get_percentile(95)
+ ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
else:
ret += '\n'
if count > 0:
ret += f' {details} setting up / copying data...'
sec = 0.0
- if qworker is not None:
- if sec > qworker[1]:
+ if qworker_p95 is not None:
+ if sec > qworker_p95:
ret += f'{bg("red")}>💻p95{reset()} '
if details is not None:
details.slower_than_local_p95 = True
executed. Each bundle is assigned a remote worker based on some policy
heuristics. Once assigned to a remote worker, a local subprocess is
created. It copies the pickled code to the remote machine via ssh/scp
- and then starts up work on the remote machine again using ssh. When
- the work is complete it copies the results back to the local machine.
+ and then starts up work on the remote machine again using ssh to invoke
+ the :file:`remote_worker.py` (`--remote_worker_helper_path`). When
+ the work is complete, the local subprocess copies the results back to
+ the local machine via ssh/scp.
So there is essentially one "controller" machine (which may also be
in the remote executor pool and therefore do task work in addition to
computationally expensive tasks such as jobs that will execute
for ~30 seconds or longer.
+ Instructions for how to set this up are provided in
+ :class:`pyutils.parallelize.parallelize`.
+
See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
"""
workers: List[RemoteWorkerRecord],
policy: RemoteWorkerSelectionPolicy,
) -> None:
- """C'tor.
-
+ """
Args:
workers: A list of remote workers we can call on to do tasks.
policy: A policy for selecting remote workers for tasks.
# Kick off the work. Note that if this fails we let
# _wait_for_process deal with it.
self.status.record_processing_began(uuid)
+ helper_path = config.config['remote_worker_helper_path']
cmd = (
f'{SSH} {bundle.username}@{bundle.machine} '
- f'"source py39-venv/bin/activate &&'
- f' /home/scott/lib/python_modules/remote_worker.py'
- f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
+ f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
logger.debug(
'%s: Executing %s in the background to kick off work...', bundle, cmd