More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / parallelize / executors.py
index fd70e327b75b81a25e6e20a470de3c36a296d125..fe00b5427c0876b04fba223241c2072ae6a1de4f 100644 (file)
@@ -3,13 +3,38 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""Defines three executors: a thread executor for doing work using a
-threadpool, a process executor for doing work in other processes on
-the same machine and a remote executor for farming out work to other
-machines.
-
-Also defines DefaultExecutors which is a container for references to
-global executors / worker pools with automatic shutdown semantics."""
+"""
+This module defines a :class:`BaseExecutor` interface and three
+implementations:
+
+    - :class:`ThreadExecutor`
+    - :class:`ProcessExecutor`
+    - :class:`RemoteExecutor`
+
+The :class:`ThreadExecutor` is used to dispatch work to background
+threads in the same Python process for parallelized work.  Of course,
+until the Global Interpreter Lock (GIL) bottleneck is resolved, this
+is not terribly useful for compute-bound code.  But it's good for
+work that is mostly I/O bound.
+
+The :class:`ProcessExecutor` is used to dispatch work to other
+processes on the same machine and is more useful for compute-bound
+workloads.
+
+The :class:`RemoteExecutor` is used in conjunection with `ssh`,
+the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
+to dispatch work to a set of remote worker machines on your
+network.  You can configure this pool via a JSON configuration file,
+an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
+
+Finally, this file defines a :class:`DefaultExecutors` pool that
+contains a pre-created and ready instance of each of the three
+executors discussed.  It has the added benefit of being automatically
+cleaned up at process termination time.
+
+See instructions in :mod:`pyutils.parallelize.parallelize` for
+setting up and using the framework.
+"""
 
 from __future__ import annotations
 
@@ -76,6 +101,13 @@ parser.add_argument(
     help='Path of the remote worker records file (JSON)',
     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
 )
+parser.add_argument(
+    '--remote_worker_helper_path',
+    type=str,
+    metavar='PATH_TO_REMOTE_WORKER_PY',
+    help='Path to remote_worker.py on remote machines',
+    default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+)
 
 
 SSH = '/usr/bin/ssh -oForwardX11=no'
@@ -95,6 +127,10 @@ class BaseExecutor(ABC):
     """
 
     def __init__(self, *, title=''):
+        """
+        Args:
+            title: the name of this executor.
+        """
         self.title = title
         self.histogram = hist.SimpleHistogram(
             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
@@ -103,10 +139,27 @@ class BaseExecutor(ABC):
 
     @abstractmethod
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        """Submit work for the executor to do.
+
+        Args:
+            function: the Callable to be executed.
+            *args: the arguments to function
+            **kwargs: the arguments to function
+
+        Returns:
+            A concurrent :class:`Future` representing the result of the
+            work.
+        """
         pass
 
     @abstractmethod
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor.
+
+        Args:
+            wait: wait for the shutdown to complete before returning?
+            quiet: keep it quiet, please.
+        """
         pass
 
     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
@@ -115,6 +168,12 @@ class BaseExecutor(ABC):
         otherwise.  Note: this should only be called by the launcher
         process.
 
+        Args:
+            quiet: keep it quiet, please.
+
+        Returns:
+            True if the executor could be shut down because it has no
+            pending work, False otherwise.
         """
         if self.task_count == 0:
             self.shutdown(wait=True, quiet=quiet)
@@ -126,6 +185,8 @@ class BaseExecutor(ABC):
         worker, it should only be called by the launcher process /
         thread / machine.
 
+        Args:
+            delta: the delta value by which to adjust task count.
         """
         self.task_count += delta
         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
@@ -135,12 +196,14 @@ class BaseExecutor(ABC):
         worker, it should only be called by the launcher process /
         thread / machine.
 
+        Returns:
+            The executor's current task count.
         """
         return self.task_count
 
 
 class ThreadExecutor(BaseExecutor):
-    """A threadpool executor.  This executor uses python threads to
+    """A threadpool executor.  This executor uses Python threads to
     schedule tasks.  Note that, at least as of python3.10, because of
     the global lock in the interpreter itself, these do not
     parallelize very well so this class is useful mostly for non-CPU
@@ -150,6 +213,10 @@ class ThreadExecutor(BaseExecutor):
     """
 
     def __init__(self, max_workers: Optional[int] = None):
+        """
+        Args:
+            max_workers: maximum number of threads to create in the pool.
+        """
         super().__init__()
         workers = None
         if max_workers is not None:
@@ -167,7 +234,7 @@ class ThreadExecutor(BaseExecutor):
 
     # This is run on a different thread; do not adjust task count here.
     @staticmethod
-    def run_local_bundle(fun, *args, **kwargs):
+    def _run_local_bundle(fun, *args, **kwargs):
         logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
@@ -183,7 +250,7 @@ class ThreadExecutor(BaseExecutor):
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
-            ThreadExecutor.run_local_bundle, *newargs, **kwargs
+            ThreadExecutor._run_local_bundle, *newargs, **kwargs
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
@@ -206,6 +273,10 @@ class ProcessExecutor(BaseExecutor):
     """
 
     def __init__(self, max_workers=None):
+        """
+        Args:
+            max_workers: the max number of worker processes to create.
+        """
         super().__init__()
         workers = None
         if max_workers is not None:
@@ -223,7 +294,7 @@ class ProcessExecutor(BaseExecutor):
 
     # This is run in another process; do not adjust task count here.
     @staticmethod
-    def run_cloud_pickle(pickle):
+    def _run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
         logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
@@ -236,7 +307,9 @@ class ProcessExecutor(BaseExecutor):
         start = time.time()
         self.adjust_task_count(+1)
         pickle = _make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
+        result = self._process_executor.submit(
+            ProcessExecutor._run_cloud_pickle, pickle
+        )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
@@ -395,11 +468,9 @@ class RemoteExecutorStatus:
     """
 
     def __init__(self, total_worker_count: int) -> None:
-        """C'tor.
-
+        """
         Args:
             total_worker_count: number of workers in the pool
-
         """
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
@@ -688,8 +759,10 @@ class RemoteExecutor(BaseExecutor):
     executed.  Each bundle is assigned a remote worker based on some policy
     heuristics.  Once assigned to a remote worker, a local subprocess is
     created.  It copies the pickled code to the remote machine via ssh/scp
-    and then starts up work on the remote machine again using ssh.  When
-    the work is complete it copies the results back to the local machine.
+    and then starts up work on the remote machine again using ssh to invoke
+    the :file:`remote_worker.py` (`--remote_worker_helper_path`).  When
+    the work is complete, the local subprocess copies the results back to
+    the local machine via ssh/scp.
 
     So there is essentially one "controller" machine (which may also be
     in the remote executor pool and therefore do task work in addition to
@@ -709,6 +782,9 @@ class RemoteExecutor(BaseExecutor):
         computationally expensive tasks such as jobs that will execute
         for ~30 seconds or longer.
 
+    Instructions for how to set this up are provided in
+    :class:`pyutils.parallelize.parallelize`.
+
     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
     """
 
@@ -717,8 +793,7 @@ class RemoteExecutor(BaseExecutor):
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
-        """C'tor.
-
+        """
         Args:
             workers: A list of remote workers we can call on to do tasks.
             policy: A policy for selecting remote workers for tasks.
@@ -1040,11 +1115,10 @@ class RemoteExecutor(BaseExecutor):
         # Kick off the work.  Note that if this fails we let
         # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
+        helper_path = config.config['remote_worker_helper_path']
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
-            f'"source py39-venv/bin/activate &&'
-            f' /home/scott/lib/python_modules/remote_worker.py'
-            f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
+            f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
         logger.debug(
             '%s: Executing %s in the background to kick off work...', bundle, cmd