More documentation improvements.
[pyutils.git] / src / pyutils / parallelize / executors.py
index 7bd44ca1ddf5d90662f5c5dcfd55c65642423c40..23fd6eb96c3a9a3198691c705c455902d982fce8 100644 (file)
@@ -3,13 +3,38 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""Defines three executors: a thread executor for doing work using a
-threadpool, a process executor for doing work in other processes on
-the same machine and a remote executor for farming out work to other
-machines.
-
-Also defines DefaultExecutors which is a container for references to
-global executors / worker pools with automatic shutdown semantics."""
+"""
+This module defines a :class:`BaseExecutor` interface and three
+implementations:
+
+    - :class:`ThreadExecutor`
+    - :class:`ProcessExecutor`
+    - :class:`RemoteExecutor`
+
+The :class:`ThreadExecutor` is used to dispatch work to background
+threads in the same Python process for parallelized work.  Of course,
+until the Global Interpreter Lock (GIL) bottleneck is resolved, this
+is not terribly useful for compute-bound code.  But it's good for
+work that is mostly I/O bound.
+
+The :class:`ProcessExecutor` is used to dispatch work to other
+processes on the same machine and is more useful for compute-bound
+workloads.
+
+The :class:`RemoteExecutor` is used in conjunection with `ssh`,
+the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
+to dispatch work to a set of remote worker machines on your
+network.  You can configure this pool via a JSON configuration file,
+an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
+
+Finally, this file defines a :class:`DefaultExecutors` pool that
+contains a pre-created and ready instance of each of the three
+executors discussed.  It has the added benefit of being automatically
+cleaned up at process termination time.
+
+See instructions in :mod:`pyutils.parallelize.parallelize` for
+setting up and using the framework.
+"""
 
 from __future__ import annotations
 
@@ -28,11 +53,10 @@ from dataclasses import dataclass, fields
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
-import numpy
 from overrides import overrides
 
 import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, persistent, string_utils
+from pyutils import argparse_utils, config, math_utils, persistent, string_utils
 from pyutils.ansi import bg, fg, reset, underline
 from pyutils.decorator_utils import singleton
 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
@@ -77,6 +101,13 @@ parser.add_argument(
     help='Path of the remote worker records file (JSON)',
     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
 )
+parser.add_argument(
+    '--remote_worker_helper_path',
+    type=str,
+    metavar='PATH_TO_REMOTE_WORKER_PY',
+    help='Path to remote_worker.py on remote machines',
+    default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+)
 
 
 SSH = '/usr/bin/ssh -oForwardX11=no'
@@ -96,6 +127,10 @@ class BaseExecutor(ABC):
     """
 
     def __init__(self, *, title=''):
+        """
+        Args:
+            title: the name of this executor.
+        """
         self.title = title
         self.histogram = hist.SimpleHistogram(
             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
@@ -104,10 +139,27 @@ class BaseExecutor(ABC):
 
     @abstractmethod
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        """Submit work for the executor to do.
+
+        Args:
+            function: the Callable to be executed.
+            *args: the arguments to function
+            **kwargs: the arguments to function
+
+        Returns:
+            A concurrent :class:`Future` representing the result of the
+            work.
+        """
         pass
 
     @abstractmethod
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor.
+
+        Args:
+            wait: wait for the shutdown to complete before returning?
+            quiet: keep it quiet, please.
+        """
         pass
 
     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
@@ -116,6 +168,12 @@ class BaseExecutor(ABC):
         otherwise.  Note: this should only be called by the launcher
         process.
 
+        Args:
+            quiet: keep it quiet, please.
+
+        Returns:
+            True if the executor could be shut down because it has no
+            pending work, False otherwise.
         """
         if self.task_count == 0:
             self.shutdown(wait=True, quiet=quiet)
@@ -127,6 +185,8 @@ class BaseExecutor(ABC):
         worker, it should only be called by the launcher process /
         thread / machine.
 
+        Args:
+            delta: the delta value by which to adjust task count.
         """
         self.task_count += delta
         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
@@ -136,12 +196,14 @@ class BaseExecutor(ABC):
         worker, it should only be called by the launcher process /
         thread / machine.
 
+        Returns:
+            The executor's current task count.
         """
         return self.task_count
 
 
 class ThreadExecutor(BaseExecutor):
-    """A threadpool executor.  This executor uses python threads to
+    """A threadpool executor.  This executor uses Python threads to
     schedule tasks.  Note that, at least as of python3.10, because of
     the global lock in the interpreter itself, these do not
     parallelize very well so this class is useful mostly for non-CPU
@@ -151,6 +213,10 @@ class ThreadExecutor(BaseExecutor):
     """
 
     def __init__(self, max_workers: Optional[int] = None):
+        """
+        Args:
+            max_workers: maximum number of threads to create in the pool.
+        """
         super().__init__()
         workers = None
         if max_workers is not None:
@@ -168,7 +234,7 @@ class ThreadExecutor(BaseExecutor):
 
     # This is run on a different thread; do not adjust task count here.
     @staticmethod
-    def run_local_bundle(fun, *args, **kwargs):
+    def _run_local_bundle(fun, *args, **kwargs):
         logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
@@ -184,7 +250,7 @@ class ThreadExecutor(BaseExecutor):
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
-            ThreadExecutor.run_local_bundle, *newargs, **kwargs
+            ThreadExecutor._run_local_bundle, *newargs, **kwargs
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
@@ -207,6 +273,10 @@ class ProcessExecutor(BaseExecutor):
     """
 
     def __init__(self, max_workers=None):
+        """
+        Args:
+            max_workers: the max number of worker processes to create.
+        """
         super().__init__()
         workers = None
         if max_workers is not None:
@@ -224,7 +294,7 @@ class ProcessExecutor(BaseExecutor):
 
     # This is run in another process; do not adjust task count here.
     @staticmethod
-    def run_cloud_pickle(pickle):
+    def _run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
         logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
@@ -237,7 +307,9 @@ class ProcessExecutor(BaseExecutor):
         start = time.time()
         self.adjust_task_count(+1)
         pickle = _make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
+        result = self._process_executor.submit(
+            ProcessExecutor._run_cloud_pickle, pickle
+        )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
@@ -396,11 +468,9 @@ class RemoteExecutorStatus:
     """
 
     def __init__(self, total_worker_count: int) -> None:
-        """C'tor.
-
+        """
         Args:
             total_worker_count: number of workers in the pool
-
         """
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
@@ -408,11 +478,13 @@ class RemoteExecutorStatus:
         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
+            RemoteWorkerRecord, math_utils.NumericPopulation
         ] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
-        self.finished_bundle_timings: List[float] = []
+        self.finished_bundle_timings: math_utils.NumericPopulation = (
+            math_utils.NumericPopulation()
+        )
         self.last_periodic_dump: Optional[float] = None
         self.total_bundles_submitted: int = 0
 
@@ -477,10 +549,12 @@ class RemoteExecutorStatus:
             start = self.start_per_bundle[uuid]
             assert start is not None
             bundle_latency = ts - start
-            x = self.finished_bundle_timings_per_worker.get(worker, [])
-            x.append(bundle_latency)
+            x = self.finished_bundle_timings_per_worker.get(
+                worker, math_utils.NumericPopulation()
+            )
+            x.add_number(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
-            self.finished_bundle_timings.append(bundle_latency)
+            self.finished_bundle_timings.add_number(bundle_latency)
 
     def record_processing_began(self, uuid: str):
         """Record when work on a bundle begins."""
@@ -508,9 +582,10 @@ class RemoteExecutorStatus:
         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
         qall = None
         if len(self.finished_bundle_timings) > 1:
-            qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
+            qall_median = self.finished_bundle_timings.get_median()
+            qall_p95 = self.finished_bundle_timings.get_percentile(95)
             ret += (
-                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
+                f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
                 f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
@@ -523,12 +598,16 @@ class RemoteExecutorStatus:
 
         for worker in self.known_workers:
             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
-            timings = self.finished_bundle_timings_per_worker.get(worker, [])
+            timings = self.finished_bundle_timings_per_worker.get(
+                worker, math_utils.NumericPopulation()
+            )
             count = len(timings)
-            qworker = None
+            qworker_median = None
+            qworker_p95 = None
             if count > 1:
-                qworker = numpy.quantile(timings, [0.5, 0.95])
-                ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
+                qworker_median = timings.get_median()
+                qworker_p95 = timings.get_percentile(95)
+                ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
             else:
                 ret += '\n'
             if count > 0:
@@ -546,8 +625,8 @@ class RemoteExecutorStatus:
                         ret += f'       {details} setting up / copying data...'
                         sec = 0.0
 
-                    if qworker is not None:
-                        if sec > qworker[1]:
+                    if qworker_p95 is not None:
+                        if sec > qworker_p95:
                             ret += f'{bg("red")}>💻p95{reset()} '
                             if details is not None:
                                 details.slower_than_local_p95 = True
@@ -575,7 +654,7 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    """A policy for selecting a remote worker base class."""
+    """An interface definition of a policy for selecting a remote worker."""
 
     def __init__(self):
         self.workers: Optional[List[RemoteWorkerRecord]] = None
@@ -675,33 +754,78 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 
 class RemoteExecutor(BaseExecutor):
-    """An executor that uses processes on remote machines to do work.  This
-    works by creating "bundles" of work with pickled code in each to be
-    executed.  Each bundle is assigned a remote worker based on some policy
-    heuristics.  Once assigned to a remote worker, a local subprocess is
-    created.  It copies the pickled code to the remote machine via ssh/scp
-    and then starts up work on the remote machine again using ssh.  When
-    the work is complete it copies the results back to the local machine.
-
-    So there is essentially one "controller" machine (which may also be
-    in the remote executor pool and therefore do task work in addition to
-    controlling) and N worker machines.  This code runs on the controller
-    whereas on the worker machines we invoke pickled user code via a
-    shim in :file:`remote_worker.py`.
-
-    Some redundancy and safety provisions are made; e.g. slower than
-    expected tasks have redundant backups created and if a task fails
-    repeatedly we consider it poisoned and give up on it.
+    """An executor that uses processes on remote machines to do work.
+    To do so, it requires that a pool of remote workers to be properly
+    configured.  See instructions in
+    :class:`pyutils.parallelize.parallelize`.
+
+    Each machine in a worker pool has a *weight* and a *count*.  A
+    *weight* captures the relative speed of a processor on that worker
+    and a *count* captures the number of synchronous tasks the worker
+    can accept (i.e. the number of cpus on the machine).
+
+    To dispatch work to a remote machine, this class pickles the code
+    to be executed remotely using `cloudpickle`.  For that to work,
+    the remote machine should be running the same version of Python as
+    this machine, ideally in a virtual environment with the same
+    import libraries installed.  Differences in operating system
+    and/or processor architecture don't seem to matter for most code,
+    though.
+
+    .. warning::
+
+        Mismatches in Python version or in the version numbers of
+        third-party libraries between machines can cause problems
+        when trying to unpickle and run code remotely.
+
+    Work to be dispatched is represented in this code by creating a
+    "bundle".  Each bundle is assigned to a remote worker based on
+    heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
+    general, it attempts to load all workers in the pool and maximize
+    throughput.  Once assigned to a remote worker, pickled code is
+    copied to that worker via `scp` and a remote command is issued via
+    `ssh` to execute a :file:`remote_worker.py` process on the remote
+    machine.  This process unpickles the code, runs it, and produces a
+    result which is then copied back to the local machine (again via
+    `scp`) where it can be processed by local code.
+
+    You can and probably must override the path of
+    :file:`remote_worker.py` on your pool machines using the
+    `--remote_worker_helper_path` commandline argument (or by just
+    changing the default in code, see above in this file's code).
+
+    During remote work execution, this local machine acts as a
+    controller dispatching all work to the network, copying pickled
+    tasks out, and copying results back in.  It may also be a worker
+    in the pool but do not underestimate the cost of being a
+    controller -- it takes some cpu and a lot of network bandwidth.
+    The work dispatcher logic attempts to detect when a controller is
+    also a worker and reduce its load.
+
+    Some redundancy and safety provisions are made when scheduling
+    tasks to the worker pool; e.g. slower than expected tasks have
+    redundant backups tasks created, especially if there are otherwise
+    idle workers.  If a task fails repeatedly, the dispatcher consider
+    it poisoned and give up on it.
 
     .. warning::
 
-        The network overhead / latency of copying work from the
-        controller machine to the remote workers is relatively high.
         This executor probably only makes sense to use with
         computationally expensive tasks such as jobs that will execute
         for ~30 seconds or longer.
 
+        The network overhead and latency of copying work from the
+        controller (local) machine to the remote workers and copying
+        results back again is relatively high.  Especially at startup,
+        the network can become a bottleneck.  Future versions of this
+        code may attempt to split the responsibility of being a
+        controller (distributing work to pool machines).
+
+    Instructions for how to set this up are provided in
+    :class:`pyutils.parallelize.parallelize`.
+
     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+
     """
 
     def __init__(
@@ -709,8 +833,7 @@ class RemoteExecutor(BaseExecutor):
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
-        """C'tor.
-
+        """
         Args:
             workers: A list of remote workers we can call on to do tasks.
             policy: A policy for selecting remote workers for tasks.
@@ -1032,11 +1155,10 @@ class RemoteExecutor(BaseExecutor):
         # Kick off the work.  Note that if this fails we let
         # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
+        helper_path = config.config['remote_worker_helper_path']
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
-            f'"source py39-venv/bin/activate &&'
-            f' /home/scott/lib/python_modules/remote_worker.py'
-            f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
+            f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
         logger.debug(
             '%s: Executing %s in the background to kick off work...', bundle, cmd