Reduce the doctest lease duration...
[python_utils.git] / executors.py
index 60bd166bb23ba63b227179c404d281681b35cad9..6485afa054689c3b668adb1e0708b7f2d29ed8b9 100644 (file)
@@ -1,6 +1,8 @@
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
+# © Copyright 2021-2022, Scott Gasch
+
 """Defines three executors: a thread executor for doing work using a
 threadpool, a process executor for doing work in other processes on
 the same machine and a remote executor for farming out work to other
 """Defines three executors: a thread executor for doing work using a
 threadpool, a process executor for doing work in other processes on
 the same machine and a remote executor for farming out work to other
@@ -10,7 +12,6 @@ Also defines DefaultExecutors which is a container for references to
 global executors / worker pools with automatic shutdown semantics."""
 
 from __future__ import annotations
 global executors / worker pools with automatic shutdown semantics."""
 
 from __future__ import annotations
-
 import concurrent.futures as fut
 import logging
 import os
 import concurrent.futures as fut
 import logging
 import os
@@ -32,9 +33,10 @@ from overrides import overrides
 import argparse_utils
 import config
 import histogram as hist
 import argparse_utils
 import config
 import histogram as hist
+import string_utils
 from ansi import bg, fg, reset, underline
 from decorator_utils import singleton
 from ansi import bg, fg, reset, underline
 from decorator_utils import singleton
-from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
+from exec_utils import cmd_exitcode, cmd_in_background, run_silently
 from thread_utils import background_thread
 
 logger = logging.getLogger(__name__)
 from thread_utils import background_thread
 
 logger = logging.getLogger(__name__)
@@ -74,13 +76,17 @@ SSH = '/usr/bin/ssh -oForwardX11=no'
 SCP = '/usr/bin/scp -C'
 
 
 SCP = '/usr/bin/scp -C'
 
 
-def make_cloud_pickle(fun, *args, **kwargs):
+def _make_cloud_pickle(fun, *args, **kwargs):
+    """Internal helper to create cloud pickles."""
     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
-    """The base executor interface definition."""
+    """The base executor interface definition.  The interface for
+    :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+    :class:`ThreadExecutor`.
+    """
 
     def __init__(self, *, title=''):
         self.title = title
 
     def __init__(self, *, title=''):
         self.title = title
@@ -128,7 +134,14 @@ class BaseExecutor(ABC):
 
 
 class ThreadExecutor(BaseExecutor):
 
 
 class ThreadExecutor(BaseExecutor):
-    """A threadpool executor instance."""
+    """A threadpool executor.  This executor uses python threads to
+    schedule tasks.  Note that, at least as of python3.10, because of
+    the global lock in the interpreter itself, these do not
+    parallelize very well so this class is useful mostly for non-CPU
+    intensive tasks.
+
+    See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+    """
 
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
 
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
@@ -137,7 +150,10 @@ class ThreadExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug('Creating threadpool executor with %d workers', workers)
+        if workers is not None:
+            logger.debug('Creating threadpool executor with %d workers', workers)
+        else:
+            logger.debug('Creating a default sized threadpool executor')
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
@@ -178,7 +194,10 @@ class ThreadExecutor(BaseExecutor):
 
 
 class ProcessExecutor(BaseExecutor):
 
 
 class ProcessExecutor(BaseExecutor):
-    """A processpool executor."""
+    """An executor which runs tasks in child processes.
+
+    See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+    """
 
     def __init__(self, max_workers=None):
         super().__init__()
 
     def __init__(self, max_workers=None):
         super().__init__()
@@ -187,7 +206,10 @@ class ProcessExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug('Creating processpool executor with %d workers.', workers)
+        if workers is not None:
+            logger.debug('Creating processpool executor with %d workers.', workers)
+        else:
+            logger.debug('Creating a default sized processpool executor')
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
@@ -207,7 +229,7 @@ class ProcessExecutor(BaseExecutor):
             raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
             raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
-        pickle = make_cloud_pickle(function, *args, **kwargs)
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
@@ -239,9 +261,18 @@ class RemoteWorkerRecord:
     """A record of info about a remote worker."""
 
     username: str
     """A record of info about a remote worker."""
 
     username: str
+    """Username we can ssh into on this machine to run work."""
+
     machine: str
     machine: str
+    """Machine address / name."""
+
     weight: int
     weight: int
+    """Relative probability for the weighted policy to select this
+    machine for scheduling work."""
+
     count: int
     count: int
+    """If this machine is selected, what is the maximum number of task
+    that it can handle?"""
 
     def __hash__(self):
         return hash((self.username, self.machine))
 
     def __hash__(self):
         return hash((self.username, self.machine))
@@ -255,28 +286,68 @@ class BundleDetails:
     """All info necessary to define some unit of work that needs to be
     done, where it is being run, its state, whether it is an original
     bundle of a backup bundle, how many times it has failed, etc...
     """All info necessary to define some unit of work that needs to be
     done, where it is being run, its state, whether it is an original
     bundle of a backup bundle, how many times it has failed, etc...
-
     """
 
     pickled_code: bytes
     """
 
     pickled_code: bytes
+    """The code to run, cloud pickled"""
+
     uuid: str
     uuid: str
-    fname: str
+    """A unique identifier"""
+
+    function_name: str
+    """The name of the function we pickled"""
+
     worker: Optional[RemoteWorkerRecord]
     worker: Optional[RemoteWorkerRecord]
+    """The remote worker running this bundle or None if none (yet)"""
+
     username: Optional[str]
     username: Optional[str]
+    """The remote username running this bundle or None if none (yet)"""
+
     machine: Optional[str]
     machine: Optional[str]
+    """The remote machine running this bundle or None if none (yet)"""
+
     hostname: str
     hostname: str
+    """The controller machine"""
+
     code_file: str
     code_file: str
+    """A unique filename to hold the work to be done"""
+
     result_file: str
     result_file: str
+    """Where the results should be placed / read from"""
+
     pid: int
     pid: int
+    """The process id of the local subprocess watching the ssh connection
+    to the remote machine"""
+
     start_ts: float
     start_ts: float
+    """Starting time"""
+
     end_ts: float
     end_ts: float
+    """Ending time"""
+
     slower_than_local_p95: bool
     slower_than_local_p95: bool
+    """Currently slower then 95% of other bundles on remote host"""
+
     slower_than_global_p95: bool
     slower_than_global_p95: bool
+    """Currently slower than 95% of other bundles globally"""
+
     src_bundle: Optional[BundleDetails]
     src_bundle: Optional[BundleDetails]
+    """If this is a backup bundle, this points to the original bundle
+    that it's backing up.  None otherwise."""
+
     is_cancelled: threading.Event
     is_cancelled: threading.Event
+    """An event that can be signaled to indicate this bundle is cancelled.
+    This is set when another copy (backup or original) of this work has
+    completed successfully elsewhere."""
+
     was_cancelled: bool
     was_cancelled: bool
+    """True if this bundle was cancelled, False if it finished normally"""
+
     backup_bundles: Optional[List[BundleDetails]]
     backup_bundles: Optional[List[BundleDetails]]
+    """If we've created backups of this bundle, this is the list of them"""
+
     failure_count: int
     failure_count: int
+    """How many times has this bundle failed already?"""
 
     def __repr__(self):
         uuid = self.uuid
 
     def __repr__(self):
         uuid = self.uuid
@@ -286,6 +357,9 @@ class BundleDetails:
         else:
             suffix = uuid[-6:]
 
         else:
             suffix = uuid[-6:]
 
+        # We colorize the uuid based on some bits from it to make them
+        # stand out in the logging and help a reader correlate log messages
+        # related to the same bundle.
         colorz = [
             fg('violet red'),
             fg('red'),
         colorz = [
             fg('violet red'),
             fg('red'),
@@ -302,15 +376,23 @@ class BundleDetails:
             fg('medium purple'),
         ]
         c = colorz[int(uuid[-2:], 16) % len(colorz)]
             fg('medium purple'),
         ]
         c = colorz[int(uuid[-2:], 16) % len(colorz)]
-        fname = self.fname if self.fname is not None else 'nofname'
+        function_name = self.function_name if self.function_name is not None else 'nofname'
         machine = self.machine if self.machine is not None else 'nomachine'
         machine = self.machine if self.machine is not None else 'nomachine'
-        return f'{c}{suffix}/{fname}/{machine}{reset()}'
+        return f'{c}{suffix}/{function_name}/{machine}{reset()}'
 
 
 class RemoteExecutorStatus:
 
 
 class RemoteExecutorStatus:
-    """A status 'scoreboard' for a remote executor."""
+    """A status 'scoreboard' for a remote executor tracking various
+    metrics and able to render a periodic dump of global state.
+    """
 
     def __init__(self, total_worker_count: int) -> None:
 
     def __init__(self, total_worker_count: int) -> None:
+        """C'tor.
+
+        Args:
+            total_worker_count: number of workers in the pool
+
+        """
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
@@ -328,10 +410,18 @@ class RemoteExecutorStatus:
         self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Record that bundle with uuid is assigned to a particular worker.
+
+        Args:
+            worker: the record of the worker to which uuid is assigned
+            uuid: the uuid of a bundle that has been assigned to a worker
+        """
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Same as above but an entry point that doesn't acquire the lock
+        for codepaths where it's already held."""
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -340,10 +430,12 @@ class RemoteExecutorStatus:
         self.in_flight_bundles_by_worker[worker] = x
 
     def record_bundle_details(self, details: BundleDetails) -> None:
         self.in_flight_bundles_by_worker[worker] = x
 
     def record_bundle_details(self, details: BundleDetails) -> None:
+        """Register the details about a bundle of work."""
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
@@ -353,6 +445,7 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Record that a bundle has released a worker."""
         with self.lock:
             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
         with self.lock:
             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
@@ -362,6 +455,7 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
@@ -376,10 +470,12 @@ class RemoteExecutorStatus:
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
+        """Record when work on a bundle begins."""
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
+        """How many bundles are in flight currently?"""
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
@@ -387,6 +483,7 @@ class RemoteExecutorStatus:
         return total_in_flight
 
     def total_idle(self) -> int:
         return total_in_flight
 
     def total_idle(self) -> int:
+        """How many idle workers are there currently?"""
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
@@ -561,13 +658,47 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 
 class RemoteExecutor(BaseExecutor):
 
 
 class RemoteExecutor(BaseExecutor):
-    """A remote work executor."""
+    """An executor that uses processes on remote machines to do work.  This
+    works by creating "bundles" of work with pickled code in each to be
+    executed.  Each bundle is assigned a remote worker based on some policy
+    heuristics.  Once assigned to a remote worker, a local subprocess is
+    created.  It copies the pickled code to the remote machine via ssh/scp
+    and then starts up work on the remote machine again using ssh.  When
+    the work is complete it copies the results back to the local machine.
+
+    So there is essentially one "controller" machine (which may also be
+    in the remote executor pool and therefore do task work in addition to
+    controlling) and N worker machines.  This code runs on the controller
+    whereas on the worker machines we invoke pickled user code via a
+    shim in :file:`remote_worker.py`.
+
+    Some redundancy and safety provisions are made; e.g. slower than
+    expected tasks have redundant backups created and if a task fails
+    repeatedly we consider it poisoned and give up on it.
+
+    .. warning::
+
+        The network overhead / latency of copying work from the
+        controller machine to the remote workers is relatively high.
+        This executor probably only makes sense to use with
+        computationally expensive tasks such as jobs that will execute
+        for ~30 seconds or longer.
+
+    See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+    """
 
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
 
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
+        """C'tor.
+
+        Args:
+            workers: A list of remote workers we can call on to do tasks.
+            policy: A policy for selecting remote workers for tasks.
+        """
+
         super().__init__()
         self.workers = workers
         self.policy = policy
         super().__init__()
         self.workers = workers
         self.policy = policy
@@ -592,18 +723,24 @@ class RemoteExecutor(BaseExecutor):
         (
             self.heartbeat_thread,
             self.heartbeat_stop_event,
         (
             self.heartbeat_thread,
             self.heartbeat_stop_event,
-        ) = self.run_periodic_heartbeat()
+        ) = self._run_periodic_heartbeat()
         self.already_shutdown = False
 
     @background_thread
         self.already_shutdown = False
 
     @background_thread
-    def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+    def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+        """
+        We create a background thread to invoke :meth:`_heartbeat` regularly
+        while we are scheduling work.  It does some accounting such as
+        looking for slow bundles to tag for backup creation, checking for
+        unexpected failures, and printing a fancy message on stdout.
+        """
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
-            self.heartbeat()
+            self._heartbeat()
         logger.debug('Periodic heartbeat thread shutting down.')
 
         logger.debug('Periodic heartbeat thread shutting down.')
 
-    def heartbeat(self) -> None:
+    def _heartbeat(self) -> None:
         # Note: this is invoked on a background thread, not an
         # executor thread.  Be careful what you do with it b/c it
         # needs to get back and dump status again periodically.
         # Note: this is invoked on a background thread, not an
         # executor thread.  Be careful what you do with it b/c it
         # needs to get back and dump status again periodically.
@@ -612,16 +749,18 @@ class RemoteExecutor(BaseExecutor):
 
             # Look for bundles to reschedule via executor.submit
             if config.config['executors_schedule_remote_backups']:
 
             # Look for bundles to reschedule via executor.submit
             if config.config['executors_schedule_remote_backups']:
-                self.maybe_schedule_backup_bundles()
+                self._maybe_schedule_backup_bundles()
+
+    def _maybe_schedule_backup_bundles(self):
+        """Maybe schedule backup bundles if we see a very slow bundle."""
 
 
-    def maybe_schedule_backup_bundles(self):
         assert self.status.lock.locked()
         num_done = len(self.status.finished_bundle_timings)
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
         assert self.status.lock.locked()
         num_done = len(self.status.finished_bundle_timings)
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
-            num_done > 2
-            and num_idle_workers > 1
+            num_done >= 2
+            and num_idle_workers > 0
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
@@ -698,7 +837,7 @@ class RemoteExecutor(BaseExecutor):
 
                 # Note: this is all still happening on the heartbeat
                 # runner thread.  That's ok because
 
                 # Note: this is all still happening on the heartbeat
                 # runner thread.  That's ok because
-                # schedule_backup_for_bundle uses the executor to
+                # _schedule_backup_for_bundle uses the executor to
                 # submit the bundle again which will cause it to be
                 # picked up by a worker thread and allow this thread
                 # to return to run future heartbeats.
                 # submit the bundle again which will cause it to be
                 # picked up by a worker thread and allow this thread
                 # to return to run future heartbeats.
@@ -709,28 +848,32 @@ class RemoteExecutor(BaseExecutor):
                         bundle_to_backup,
                         best_score,
                     )
                         bundle_to_backup,
                         best_score,
                     )
-                    self.schedule_backup_for_bundle(bundle_to_backup)
+                    self._schedule_backup_for_bundle(bundle_to_backup)
             finally:
                 self.backup_lock.release()
 
             finally:
                 self.backup_lock.release()
 
-    def is_worker_available(self) -> bool:
+    def _is_worker_available(self) -> bool:
+        """Is there a worker available currently?"""
         return self.policy.is_worker_available()
 
         return self.policy.is_worker_available()
 
-    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+    def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        """Try to acquire a worker."""
         return self.policy.acquire_worker(machine_to_avoid)
 
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+    def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+        """Find a worker or block until one becomes available."""
         with self.cv:
         with self.cv:
-            while not self.is_worker_available():
+            while not self._is_worker_available():
                 self.cv.wait()
                 self.cv.wait()
-            worker = self.acquire_worker(machine_to_avoid)
+            worker = self._acquire_worker(machine_to_avoid)
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
-    def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+    def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+        """Release a previously acquired worker."""
         worker = bundle.worker
         assert worker is not None
         logger.debug('Released worker %s', worker)
         worker = bundle.worker
         assert worker is not None
         logger.debug('Released worker %s', worker)
@@ -744,7 +887,8 @@ class RemoteExecutor(BaseExecutor):
             self.cv.notify()
         self.adjust_task_count(-1)
 
             self.cv.notify()
         self.adjust_task_count(-1)
 
-    def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+    def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+        """See if a particular bundle is cancelled.  Do not block."""
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
@@ -752,8 +896,9 @@ class RemoteExecutor(BaseExecutor):
                 return True
         return False
 
                 return True
         return False
 
-    def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+    def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -765,7 +910,7 @@ class RemoteExecutor(BaseExecutor):
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
-            worker = self.find_available_worker_or_block(avoid_machine)
+            worker = self._find_available_worker_or_block(avoid_machine)
         assert worker is not None
 
         # Ok, found a worker.
         assert worker is not None
 
         # Ok, found a worker.
@@ -779,12 +924,12 @@ class RemoteExecutor(BaseExecutor):
         # It may have been some time between when it was submitted and
         # now due to lack of worker availability and someone else may
         # have already finished it.
         # It may have been some time between when it was submitted and
         # now due to lack of worker availability and someone else may
         # have already finished it.
-        if self.check_if_cancelled(bundle):
+        if self._check_if_cancelled(bundle):
             try:
             try:
-                return self.process_work_result(bundle)
+                return self._process_work_result(bundle)
             except Exception as e:
                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
             except Exception as e:
                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
-                self.release_worker(bundle)
+                self._release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
@@ -799,15 +944,12 @@ class RemoteExecutor(BaseExecutor):
                         'no results for this bundle.  This is unexpected and bad.',
                         bundle,
                     )
                         'no results for this bundle.  This is unexpected and bad.',
                         bundle,
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in process_work_result (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.  Let the original thread worry about
-                    # either finding the results or complaining about
-                    # it.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
                     return None
 
         # Send input code / data to worker machine if it's not local.
@@ -820,10 +962,11 @@ class RemoteExecutor(BaseExecutor):
                 xfer_latency = time.time() - start_ts
                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
                 xfer_latency = time.time() - start_ts
                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
-                self.release_worker(bundle)
+                self._release_worker(bundle)
                 if is_original:
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
                     logger.exception(e)
                     logger.error(
                         "%s: Failed to send instructions to the worker machine?! "
                     logger.exception(e)
                     logger.error(
                         "%s: Failed to send instructions to the worker machine?! "
@@ -831,12 +974,12 @@ class RemoteExecutor(BaseExecutor):
                         "be a race condition.  Attempting an emergency retry...",
                         bundle,
                     )
                         "be a race condition.  Attempting an emergency retry...",
                         bundle,
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
+                    # code_file before we could copy it.  Ignore.
                     logger.warning(
                         '%s: Failed to send instructions to the worker machine... '
                         'We\'re a backup and this may be caused by the original (or '
                     logger.warning(
                         '%s: Failed to send instructions to the worker machine... '
                         'We\'re a backup and this may be caused by the original (or '
@@ -846,11 +989,11 @@ class RemoteExecutor(BaseExecutor):
                     return None
 
         # Kick off the work.  Note that if this fails we let
                     return None
 
         # Kick off the work.  Note that if this fails we let
-        # wait_for_process deal with it.
+        # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
         self.status.record_processing_began(uuid)
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
-            f'"source py38-venv/bin/activate &&'
+            f'"source py39-venv/bin/activate &&'
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
@@ -858,21 +1001,40 @@ class RemoteExecutor(BaseExecutor):
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
-        return self.wait_for_process(p, bundle, 0)
+        return self._wait_for_process(p, bundle, 0)
 
 
-    def wait_for_process(
+    def _wait_for_process(
         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
+        """At this point we've copied the bundle's pickled code to the remote
+        worker and started an ssh process that should be invoking the
+        remote worker to have it execute the user's code.  See how
+        that's going and wait for it to complete or fail.  Note that
+        this code is recursive: there are codepaths where we decide to
+        stop waiting for an ssh process (because another backup seems
+        to have finished) but then fail to fetch or parse the results
+        from that backup and thus call ourselves to continue waiting
+        on an active ssh process.  This is the purpose of the depth
+        argument: to curtail potential infinite recursion by giving up
+        eventually.
+
+        Args:
+            p: the Popen record of the ssh job
+            bundle: the bundle of work being executed remotely
+            depth: how many retries we've made so far.  Starts at zero.
+
+        """
+
         machine = bundle.machine
         assert p is not None
         machine = bundle.machine
         assert p is not None
-        pid = p.pid
+        pid = p.pid  # pid of the ssh process
         if depth > 3:
             logger.error(
                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
         if depth > 3:
             logger.error(
                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
-            self.release_worker(bundle)
-            return self.emergency_retry_nasty_bundle(bundle)
+            self._release_worker(bundle)
+            return self._emergency_retry_nasty_bundle(bundle)
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
@@ -881,7 +1043,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
-                if self.check_if_cancelled(bundle):
+                if self._check_if_cancelled(bundle):
                     logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
                     logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
@@ -892,9 +1054,9 @@ class RemoteExecutor(BaseExecutor):
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
-        # and we're bailing out.
+        # before us and we're bailing out.
         try:
         try:
-            ret = self.process_work_result(bundle)
+            ret = self._process_work_result(bundle)
             if ret is not None and p is not None:
                 p.terminate()
             return ret
             if ret is not None and p is not None:
                 p.terminate()
             return ret
@@ -911,12 +1073,14 @@ class RemoteExecutor(BaseExecutor):
                 logger.warning(
                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
                 )
                 logger.warning(
                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
                 )
-                return self.wait_for_process(p, bundle, depth + 1)
+                return self._wait_for_process(p, bundle, depth + 1)
             else:
             else:
-                self.release_worker(bundle)
-                return self.emergency_retry_nasty_bundle(bundle)
+                self._release_worker(bundle)
+                return self._emergency_retry_nasty_bundle(bundle)
+
+    def _process_work_result(self, bundle: BundleDetails) -> Any:
+        """A bundle seems to be completed.  Check on the results."""
 
 
-    def process_work_result(self, bundle: BundleDetails) -> Any:
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
@@ -942,7 +1106,7 @@ class RemoteExecutor(BaseExecutor):
                     )
 
                     # If either of these throw they are handled in
                     )
 
                     # If either of these throw they are handled in
-                    # wait_for_process.
+                    # _wait_for_process.
                     attempts = 0
                     while True:
                         try:
                     attempts = 0
                     while True:
                         try:
@@ -954,6 +1118,7 @@ class RemoteExecutor(BaseExecutor):
                         else:
                             break
 
                         else:
                             break
 
+                    # Cleanup remote /tmp files.
                     run_silently(
                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
                     run_silently(
                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
@@ -966,7 +1131,8 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
             logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
         if is_original:
             logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
@@ -976,10 +1142,10 @@ class RemoteExecutor(BaseExecutor):
             except Exception as e:
                 logger.exception(e)
                 logger.error('Failed to load %s... this is bad news.', result_file)
             except Exception as e:
                 logger.exception(e)
                 logger.error('Failed to load %s... this is bad news.', result_file)
-                self.release_worker(bundle)
+                self._release_worker(bundle)
 
 
-                # Re-raise the exception; the code in wait_for_process may
-                # decide to emergency_retry_nasty_bundle here.
+                # Re-raise the exception; the code in _wait_for_process may
+                # decide to _emergency_retry_nasty_bundle here.
                 raise e
             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
             os.remove(result_file)
                 raise e
             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
             os.remove(result_file)
@@ -1012,13 +1178,15 @@ class RemoteExecutor(BaseExecutor):
                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
                 orig_bundle.is_cancelled.set()
                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
                 orig_bundle.is_cancelled.set()
-        self.release_worker(bundle, was_cancelled=was_cancelled)
+        self._release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
         return result
 
-    def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
+    def _create_original_bundle(self, pickle, function_name: str):
+        """Creates a bundle that is not a backup of any other bundle but
+        rather represents a user task.
+        """
 
 
-        uuid = generate_uuid(omit_dashes=True)
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
@@ -1029,7 +1197,7 @@ class RemoteExecutor(BaseExecutor):
         bundle = BundleDetails(
             pickled_code=pickle,
             uuid=uuid,
         bundle = BundleDetails(
             pickled_code=pickle,
             uuid=uuid,
-            fname=fname,
+            function_name=function_name,
             worker=None,
             username=None,
             machine=None,
             worker=None,
             username=None,
             machine=None,
@@ -1051,7 +1219,11 @@ class RemoteExecutor(BaseExecutor):
         logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
         logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
-    def create_backup_bundle(self, src_bundle: BundleDetails):
+    def _create_backup_bundle(self, src_bundle: BundleDetails):
+        """Creates a bundle that is a backup of another bundle that is
+        running too slowly."""
+
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
@@ -1059,7 +1231,7 @@ class RemoteExecutor(BaseExecutor):
         backup_bundle = BundleDetails(
             pickled_code=src_bundle.pickled_code,
             uuid=uuid,
         backup_bundle = BundleDetails(
             pickled_code=src_bundle.pickled_code,
             uuid=uuid,
-            fname=src_bundle.fname,
+            function_name=src_bundle.function_name,
             worker=None,
             username=None,
             machine=None,
             worker=None,
             username=None,
             machine=None,
@@ -1082,20 +1254,28 @@ class RemoteExecutor(BaseExecutor):
         logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
         logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
-    def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+    def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+        """Schedule a backup of src_bundle."""
+
         assert self.status.lock.locked()
         assert src_bundle is not None
         assert self.status.lock.locked()
         assert src_bundle is not None
-        backup_bundle = self.create_backup_bundle(src_bundle)
+        backup_bundle = self._create_backup_bundle(src_bundle)
         logger.debug(
         logger.debug(
-            '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
+            '%s/%s: Scheduling backup for execution...',
+            backup_bundle.uuid,
+            backup_bundle.function_name,
         )
         )
-        self._helper_executor.submit(self.launch, backup_bundle)
+        self._helper_executor.submit(self._launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
+
+    def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+        """Something unexpectedly failed with bundle.  Either retry it
+        from the beginning or throw in the towel and give up on it."""
 
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1126,19 +1306,22 @@ class RemoteExecutor(BaseExecutor):
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
             logger.warning(msg)
             warnings.warn(msg)
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
             logger.warning(msg)
             warnings.warn(msg)
-            return self.launch(bundle, avoid_last_machine)
+            return self._launch(bundle, avoid_last_machine)
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        """Submit work to be done.  This is the user entry point of this
+        class."""
         if self.already_shutdown:
             raise Exception('Submitted work after shutdown.')
         if self.already_shutdown:
             raise Exception('Submitted work after shutdown.')
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        bundle = self.create_original_bundle(pickle, function.__name__)
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        bundle = self._create_original_bundle(pickle, function.__name__)
         self.total_bundles_submitted += 1
         self.total_bundles_submitted += 1
-        return self._helper_executor.submit(self.launch, bundle)
+        return self._helper_executor.submit(self._launch, bundle)
 
     @overrides
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
 
     @overrides
     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor."""
         if not self.already_shutdown:
             logging.debug('Shutting down RemoteExecutor %s', self.title)
             self.heartbeat_stop_event.set()
         if not self.already_shutdown:
             logging.debug('Shutting down RemoteExecutor %s', self.title)
             self.heartbeat_stop_event.set()
@@ -1153,8 +1336,39 @@ class RemoteExecutor(BaseExecutor):
 class DefaultExecutors(object):
     """A container for a default thread, process and remote executor.
     These are not created until needed and we take care to clean up
 class DefaultExecutors(object):
     """A container for a default thread, process and remote executor.
     These are not created until needed and we take care to clean up
-    before process exit.
+    before process exit automatically for the caller's convenience.
+    Instead of creating your own executor, consider using the one
+    from this pool.  e.g.::
+
+        @par.parallelize(method=par.Method.PROCESS)
+        def do_work(
+            solutions: List[Work],
+            shard_num: int,
+            ...
+        ):
+            <do the work>
+
 
 
+        def start_do_work(all_work: List[Work]):
+            shards = []
+            logger.debug('Sharding work into groups of 10.')
+            for subset in list_utils.shard(all_work, 10):
+                shards.append([x for x in subset])
+
+            logger.debug('Kicking off helper pool.')
+            try:
+                for n, shard in enumerate(shards):
+                    results.append(
+                        do_work(
+                            shard, n, shared_cache.get_name(), max_letter_pop_per_word
+                        )
+                    )
+                smart_future.wait_all(results)
+            finally:
+                # Note: if you forget to do this it will clean itself up
+                # during program termination including tearing down any
+                # active ssh connections.
+                executors.DefaultExecutors().process_pool().shutdown()
     """
 
     def __init__(self):
     """
 
     def __init__(self):
@@ -1163,10 +1377,10 @@ class DefaultExecutors(object):
         self.remote_executor: Optional[RemoteExecutor] = None
 
     @staticmethod
         self.remote_executor: Optional[RemoteExecutor] = None
 
     @staticmethod
-    def ping(host) -> bool:
+    def _ping(host) -> bool:
         logger.debug('RUN> ping -c 1 %s', host)
         try:
         logger.debug('RUN> ping -c 1 %s', host)
         try:
-            x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
+            x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
             return x == 0
         except Exception:
             return False
@@ -1185,17 +1399,17 @@ class DefaultExecutors(object):
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
             pool: List[RemoteWorkerRecord] = []
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
             pool: List[RemoteWorkerRecord] = []
-            if self.ping('cheetah.house'):
+            if self._ping('cheetah.house'):
                 logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
                         weight=24,
                 logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
                         weight=24,
-                        count=6,
+                        count=5,
                     ),
                 )
                     ),
                 )
-            if self.ping('meerkat.cabin'):
+            if self._ping('meerkat.cabin'):
                 logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                 logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
@@ -1205,27 +1419,27 @@ class DefaultExecutors(object):
                         count=2,
                     ),
                 )
                         count=2,
                     ),
                 )
-            if self.ping('wannabe.house'):
+            if self._ping('wannabe.house'):
                 logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
                         weight=14,
                 logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
                         weight=14,
-                        count=8,
+                        count=2,
                     ),
                 )
                     ),
                 )
-            if self.ping('puma.cabin'):
+            if self._ping('puma.cabin'):
                 logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
                         weight=24,
                 logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
                         weight=24,
-                        count=6,
+                        count=5,
                     ),
                 )
                     ),
                 )
-            if self.ping('backup.house'):
+            if self._ping('backup.house'):
                 logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
                 logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
@@ -1240,7 +1454,7 @@ class DefaultExecutors(object):
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
                     logger.info('Reducing workload for %s.', record.machine)
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
                     logger.info('Reducing workload for %s.', record.machine)
-                    record.count = 1
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)