Reduce the doctest lease duration...
[python_utils.git] / executors.py
index 47b4a89a88d693d535ed2e036c6288829505a005..6485afa054689c3b668adb1e0708b7f2d29ed8b9 100644 (file)
@@ -1,34 +1,44 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
 
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
 
 
-from abc import ABC, abstractmethod
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
 import concurrent.futures as fut
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
-from typing import Any, Callable, Dict, List, Optional, Set
 import warnings
 import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
 import argparse_utils
 import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 import histogram as hist
 import histogram as hist
+import string_utils
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_exitcode, cmd_in_background, run_silently
 from thread_utils import background_thread
 
 from thread_utils import background_thread
 
-
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
@@ -66,12 +76,18 @@ SSH = '/usr/bin/ssh -oForwardX11=no'
 SCP = '/usr/bin/scp -C'
 
 
 SCP = '/usr/bin/scp -C'
 
 
-def make_cloud_pickle(fun, *args, **kwargs):
-    logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+def _make_cloud_pickle(fun, *args, **kwargs):
+    """Internal helper to create cloud pickles."""
+    logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
+    """The base executor interface definition.  The interface for
+    :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+    :class:`ThreadExecutor`.
+    """
+
     def __init__(self, *, title=''):
         self.title = title
         self.histogram = hist.SimpleHistogram(
     def __init__(self, *, title=''):
         self.title = title
         self.histogram = hist.SimpleHistogram(
@@ -84,10 +100,10 @@ class BaseExecutor(ABC):
         pass
 
     @abstractmethod
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
         pass
 
-    def shutdown_if_idle(self) -> bool:
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
@@ -95,7 +111,7 @@ class BaseExecutor(ABC):
 
         """
         if self.task_count == 0:
 
         """
         if self.task_count == 0:
-            self.shutdown()
+            self.shutdown(wait=True, quiet=quiet)
             return True
         return False
 
             return True
         return False
 
@@ -106,7 +122,7 @@ class BaseExecutor(ABC):
 
         """
         self.task_count += delta
 
         """
         self.task_count += delta
-        logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+        logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
 
     def get_task_count(self) -> int:
         """Change the task count.  Note: do not call this method from a
 
     def get_task_count(self) -> int:
         """Change the task count.  Note: do not call this method from a
@@ -118,6 +134,15 @@ class BaseExecutor(ABC):
 
 
 class ThreadExecutor(BaseExecutor):
 
 
 class ThreadExecutor(BaseExecutor):
+    """A threadpool executor.  This executor uses python threads to
+    schedule tasks.  Note that, at least as of python3.10, because of
+    the global lock in the interpreter itself, these do not
+    parallelize very well so this class is useful mostly for non-CPU
+    intensive tasks.
+
+    See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+    """
+
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
@@ -125,15 +150,19 @@ class ThreadExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug(f'Creating threadpool executor with {workers} workers')
+        if workers is not None:
+            logger.debug('Creating threadpool executor with %d workers', workers)
+        else:
+            logger.debug('Creating a default sized threadpool executor')
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self.already_shutdown = False
 
     # This is run on a different thread; do not adjust task count here.
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self.already_shutdown = False
 
     # This is run on a different thread; do not adjust task count here.
-    def run_local_bundle(self, fun, *args, **kwargs):
-        logger.debug(f"Running local bundle at {fun.__name__}")
+    @staticmethod
+    def run_local_bundle(fun, *args, **kwargs):
+        logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
         result = fun(*args, **kwargs)
         return result
 
@@ -148,22 +177,28 @@ class ThreadExecutor(BaseExecutor):
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
+            ThreadExecutor.run_local_bundle, *newargs, **kwargs
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down threadpool executor {self.title}')
-            print(self.histogram)
+            logger.debug('Shutting down threadpool executor %s', self.title)
             self._thread_pool_executor.shutdown(wait)
             self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
             self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
+    """An executor which runs tasks in child processes.
+
+    See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+    """
+
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
@@ -171,16 +206,20 @@ class ProcessExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug(f'Creating processpool executor with {workers} workers.')
+        if workers is not None:
+            logger.debug('Creating processpool executor with %d workers.', workers)
+        else:
+            logger.debug('Creating a default sized processpool executor')
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self.already_shutdown = False
 
     # This is run in another process; do not adjust task count here.
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self.already_shutdown = False
 
     # This is run in another process; do not adjust task count here.
-    def run_cloud_pickle(self, pickle):
+    @staticmethod
+    def run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
         fun, args, kwargs = cloudpickle.loads(pickle)
-        logger.debug(f"Running pickled bundle at {fun.__name__}")
+        logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
         result = fun(*args, **kwargs)
         return result
 
@@ -190,18 +229,19 @@ class ProcessExecutor(BaseExecutor):
             raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
             raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down processpool executor {self.title}')
+            logger.debug('Shutting down processpool executor %s', self.title)
             self._process_executor.shutdown(wait)
             self._process_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
     def __getstate__(self):
             self.already_shutdown = True
 
     def __getstate__(self):
@@ -218,10 +258,21 @@ class RemoteExecutorException(Exception):
 
 @dataclass
 class RemoteWorkerRecord:
 
 @dataclass
 class RemoteWorkerRecord:
+    """A record of info about a remote worker."""
+
     username: str
     username: str
+    """Username we can ssh into on this machine to run work."""
+
     machine: str
     machine: str
+    """Machine address / name."""
+
     weight: int
     weight: int
+    """Relative probability for the weighted policy to select this
+    machine for scheduling work."""
+
     count: int
     count: int
+    """If this machine is selected, what is the maximum number of task
+    that it can handle?"""
 
     def __hash__(self):
         return hash((self.username, self.machine))
 
     def __hash__(self):
         return hash((self.username, self.machine))
@@ -232,25 +283,71 @@ class RemoteWorkerRecord:
 
 @dataclass
 class BundleDetails:
 
 @dataclass
 class BundleDetails:
+    """All info necessary to define some unit of work that needs to be
+    done, where it is being run, its state, whether it is an original
+    bundle of a backup bundle, how many times it has failed, etc...
+    """
+
     pickled_code: bytes
     pickled_code: bytes
+    """The code to run, cloud pickled"""
+
     uuid: str
     uuid: str
-    fname: str
+    """A unique identifier"""
+
+    function_name: str
+    """The name of the function we pickled"""
+
     worker: Optional[RemoteWorkerRecord]
     worker: Optional[RemoteWorkerRecord]
+    """The remote worker running this bundle or None if none (yet)"""
+
     username: Optional[str]
     username: Optional[str]
+    """The remote username running this bundle or None if none (yet)"""
+
     machine: Optional[str]
     machine: Optional[str]
+    """The remote machine running this bundle or None if none (yet)"""
+
     hostname: str
     hostname: str
+    """The controller machine"""
+
     code_file: str
     code_file: str
+    """A unique filename to hold the work to be done"""
+
     result_file: str
     result_file: str
+    """Where the results should be placed / read from"""
+
     pid: int
     pid: int
+    """The process id of the local subprocess watching the ssh connection
+    to the remote machine"""
+
     start_ts: float
     start_ts: float
+    """Starting time"""
+
     end_ts: float
     end_ts: float
+    """Ending time"""
+
     slower_than_local_p95: bool
     slower_than_local_p95: bool
+    """Currently slower then 95% of other bundles on remote host"""
+
     slower_than_global_p95: bool
     slower_than_global_p95: bool
-    src_bundle: BundleDetails
+    """Currently slower than 95% of other bundles globally"""
+
+    src_bundle: Optional[BundleDetails]
+    """If this is a backup bundle, this points to the original bundle
+    that it's backing up.  None otherwise."""
+
     is_cancelled: threading.Event
     is_cancelled: threading.Event
+    """An event that can be signaled to indicate this bundle is cancelled.
+    This is set when another copy (backup or original) of this work has
+    completed successfully elsewhere."""
+
     was_cancelled: bool
     was_cancelled: bool
+    """True if this bundle was cancelled, False if it finished normally"""
+
     backup_bundles: Optional[List[BundleDetails]]
     backup_bundles: Optional[List[BundleDetails]]
+    """If we've created backups of this bundle, this is the list of them"""
+
     failure_count: int
     failure_count: int
+    """How many times has this bundle failed already?"""
 
     def __repr__(self):
         uuid = self.uuid
 
     def __repr__(self):
         uuid = self.uuid
@@ -260,6 +357,9 @@ class BundleDetails:
         else:
             suffix = uuid[-6:]
 
         else:
             suffix = uuid[-6:]
 
+        # We colorize the uuid based on some bits from it to make them
+        # stand out in the logging and help a reader correlate log messages
+        # related to the same bundle.
         colorz = [
             fg('violet red'),
             fg('red'),
         colorz = [
             fg('violet red'),
             fg('red'),
@@ -276,21 +376,29 @@ class BundleDetails:
             fg('medium purple'),
         ]
         c = colorz[int(uuid[-2:], 16) % len(colorz)]
             fg('medium purple'),
         ]
         c = colorz[int(uuid[-2:], 16) % len(colorz)]
-        fname = self.fname if self.fname is not None else 'nofname'
+        function_name = self.function_name if self.function_name is not None else 'nofname'
         machine = self.machine if self.machine is not None else 'nomachine'
         machine = self.machine if self.machine is not None else 'nomachine'
-        return f'{c}{suffix}/{fname}/{machine}{reset()}'
+        return f'{c}{suffix}/{function_name}/{machine}{reset()}'
 
 
 class RemoteExecutorStatus:
 
 
 class RemoteExecutorStatus:
+    """A status 'scoreboard' for a remote executor tracking various
+    metrics and able to render a periodic dump of global state.
+    """
+
     def __init__(self, total_worker_count: int) -> None:
     def __init__(self, total_worker_count: int) -> None:
+        """C'tor.
+
+        Args:
+            total_worker_count: number of workers in the pool
+
+        """
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -302,12 +410,18 @@ class RemoteExecutorStatus:
         self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Record that bundle with uuid is assigned to a particular worker.
+
+        Args:
+            worker: the record of the worker to which uuid is assigned
+            uuid: the uuid of a bundle that has been assigned to a worker
+        """
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Same as above but an entry point that doesn't acquire the lock
+        for codepaths where it's already held."""
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -316,10 +430,12 @@ class RemoteExecutorStatus:
         self.in_flight_bundles_by_worker[worker] = x
 
     def record_bundle_details(self, details: BundleDetails) -> None:
         self.in_flight_bundles_by_worker[worker] = x
 
     def record_bundle_details(self, details: BundleDetails) -> None:
+        """Register the details about a bundle of work."""
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
@@ -329,6 +445,7 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Record that a bundle has released a worker."""
         with self.lock:
             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
         with self.lock:
             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
@@ -338,22 +455,27 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
-            x = self.finished_bundle_timings_per_worker.get(worker, list())
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
+            x = self.finished_bundle_timings_per_worker.get(worker, [])
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
+        """Record when work on a bundle begins."""
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
+        """How many bundles are in flight currently?"""
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
@@ -361,6 +483,7 @@ class RemoteExecutorStatus:
         return total_in_flight
 
     def total_idle(self) -> int:
         return total_in_flight
 
     def total_idle(self) -> int:
+        """How many idle workers are there currently?"""
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
@@ -439,7 +562,12 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    def register_worker_pool(self, workers):
+    """A policy for selecting a remote worker base class."""
+
+    def __init__(self):
+        self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+    def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
         self.workers = workers
 
     @abstractmethod
         self.workers = workers
 
     @abstractmethod
@@ -452,30 +580,33 @@ class RemoteWorkerSelectionPolicy(ABC):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that uses weighted RNG."""
+
     @overrides
     def is_worker_available(self) -> bool:
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
-        for worker in self.workers:
-            if worker.machine != machine_to_avoid:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+        if self.workers:
+            for worker in self.workers:
+                if worker.machine != machine_to_avoid:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
 
         if len(grabbag) == 0:
-            logger.debug(
-                f'There are no available workers that avoid {machine_to_avoid}...'
-            )
-            for worker in self.workers:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+            logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+            if self.workers:
+                for worker in self.workers:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
             logger.warning('There are no available workers?!')
 
         if len(grabbag) == 0:
             logger.warning('There are no available workers?!')
@@ -484,51 +615,90 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         worker = random.sample(grabbag, 1)[0]
         assert worker.count > 0
         worker.count -= 1
         worker = random.sample(grabbag, 1)[0]
         assert worker.count > 0
         worker.count -= 1
-        logger.debug(f'Chose worker {worker}')
+        logger.debug('Selected worker %s', worker)
         return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that just round robins."""
+
     def __init__(self) -> None:
     def __init__(self) -> None:
+        super().__init__()
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
-        x = self.index
-        while True:
-            worker = self.workers[x]
-            if worker.count > 0:
-                worker.count -= 1
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        if self.workers:
+            x = self.index
+            while True:
+                worker = self.workers[x]
+                if worker.count > 0:
+                    worker.count -= 1
+                    x += 1
+                    if x >= len(self.workers):
+                        x = 0
+                    self.index = x
+                    logger.debug('Selected worker %s', worker)
+                    return worker
                 x += 1
                 if x >= len(self.workers):
                     x = 0
                 x += 1
                 if x >= len(self.workers):
                     x = 0
-                self.index = x
-                logger.debug(f'Selected worker {worker}')
-                return worker
-            x += 1
-            if x >= len(self.workers):
-                x = 0
-            if x == self.index:
-                msg = 'Unexpectedly could not find a worker, retrying...'
-                logger.warning(msg)
-                return None
+                if x == self.index:
+                    logger.warning('Unexpectedly could not find a worker, retrying...')
+                    return None
+        return None
 
 
 class RemoteExecutor(BaseExecutor):
 
 
 class RemoteExecutor(BaseExecutor):
+    """An executor that uses processes on remote machines to do work.  This
+    works by creating "bundles" of work with pickled code in each to be
+    executed.  Each bundle is assigned a remote worker based on some policy
+    heuristics.  Once assigned to a remote worker, a local subprocess is
+    created.  It copies the pickled code to the remote machine via ssh/scp
+    and then starts up work on the remote machine again using ssh.  When
+    the work is complete it copies the results back to the local machine.
+
+    So there is essentially one "controller" machine (which may also be
+    in the remote executor pool and therefore do task work in addition to
+    controlling) and N worker machines.  This code runs on the controller
+    whereas on the worker machines we invoke pickled user code via a
+    shim in :file:`remote_worker.py`.
+
+    Some redundancy and safety provisions are made; e.g. slower than
+    expected tasks have redundant backups created and if a task fails
+    repeatedly we consider it poisoned and give up on it.
+
+    .. warning::
+
+        The network overhead / latency of copying work from the
+        controller machine to the remote workers is relatively high.
+        This executor probably only makes sense to use with
+        computationally expensive tasks such as jobs that will execute
+        for ~30 seconds or longer.
+
+    See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+    """
+
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
         policy: RemoteWorkerSelectionPolicy,
     ) -> None:
+        """C'tor.
+
+        Args:
+            workers: A list of remote workers we can call on to do tasks.
+            policy: A policy for selecting remote workers for tasks.
+        """
+
         super().__init__()
         self.workers = workers
         self.policy = policy
         super().__init__()
         self.workers = workers
         self.policy = policy
@@ -541,9 +711,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -555,18 +723,24 @@ class RemoteExecutor(BaseExecutor):
         (
             self.heartbeat_thread,
             self.heartbeat_stop_event,
         (
             self.heartbeat_thread,
             self.heartbeat_stop_event,
-        ) = self.run_periodic_heartbeat()
+        ) = self._run_periodic_heartbeat()
         self.already_shutdown = False
 
     @background_thread
         self.already_shutdown = False
 
     @background_thread
-    def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+    def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+        """
+        We create a background thread to invoke :meth:`_heartbeat` regularly
+        while we are scheduling work.  It does some accounting such as
+        looking for slow bundles to tag for backup creation, checking for
+        unexpected failures, and printing a fancy message on stdout.
+        """
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
-            self.heartbeat()
+            self._heartbeat()
         logger.debug('Periodic heartbeat thread shutting down.')
 
         logger.debug('Periodic heartbeat thread shutting down.')
 
-    def heartbeat(self) -> None:
+    def _heartbeat(self) -> None:
         # Note: this is invoked on a background thread, not an
         # executor thread.  Be careful what you do with it b/c it
         # needs to get back and dump status again periodically.
         # Note: this is invoked on a background thread, not an
         # executor thread.  Be careful what you do with it b/c it
         # needs to get back and dump status again periodically.
@@ -575,16 +749,18 @@ class RemoteExecutor(BaseExecutor):
 
             # Look for bundles to reschedule via executor.submit
             if config.config['executors_schedule_remote_backups']:
 
             # Look for bundles to reschedule via executor.submit
             if config.config['executors_schedule_remote_backups']:
-                self.maybe_schedule_backup_bundles()
+                self._maybe_schedule_backup_bundles()
+
+    def _maybe_schedule_backup_bundles(self):
+        """Maybe schedule backup bundles if we see a very slow bundle."""
 
 
-    def maybe_schedule_backup_bundles(self):
         assert self.status.lock.locked()
         num_done = len(self.status.finished_bundle_timings)
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
         assert self.status.lock.locked()
         num_done = len(self.status.finished_bundle_timings)
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
-            num_done > 2
-            and num_idle_workers > 1
+            num_done >= 2
+            and num_idle_workers > 0
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
@@ -625,21 +801,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
 
                             # Prefer backups of bundles that don't
                             # have backups already.
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -653,12 +823,12 @@ class RemoteExecutor(BaseExecutor):
                             else:
                                 score = 0
                             logger.debug(
                             else:
                                 score = 0
                             logger.debug(
-                                f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
+                                'score[%s] => %.1f  # {backup_count} dup backup factor',
+                                bundle,
+                                score,
                             )
 
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -667,44 +837,46 @@ class RemoteExecutor(BaseExecutor):
 
                 # Note: this is all still happening on the heartbeat
                 # runner thread.  That's ok because
 
                 # Note: this is all still happening on the heartbeat
                 # runner thread.  That's ok because
-                # schedule_backup_for_bundle uses the executor to
+                # _schedule_backup_for_bundle uses the executor to
                 # submit the bundle again which will cause it to be
                 # picked up by a worker thread and allow this thread
                 # to return to run future heartbeats.
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
                 # submit the bundle again which will cause it to be
                 # picked up by a worker thread and allow this thread
                 # to return to run future heartbeats.
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
-                        f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+                        '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+                        bundle_to_backup,
+                        best_score,
                     )
                     )
-                    self.schedule_backup_for_bundle(bundle_to_backup)
+                    self._schedule_backup_for_bundle(bundle_to_backup)
             finally:
                 self.backup_lock.release()
 
             finally:
                 self.backup_lock.release()
 
-    def is_worker_available(self) -> bool:
+    def _is_worker_available(self) -> bool:
+        """Is there a worker available currently?"""
         return self.policy.is_worker_available()
 
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        """Try to acquire a worker."""
         return self.policy.acquire_worker(machine_to_avoid)
 
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+        """Find a worker or block until one becomes available."""
         with self.cv:
         with self.cv:
-            while not self.is_worker_available():
+            while not self._is_worker_available():
                 self.cv.wait()
                 self.cv.wait()
-            worker = self.acquire_worker(machine_to_avoid)
+            worker = self._acquire_worker(machine_to_avoid)
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
-    def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+    def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+        """Release a previously acquired worker."""
         worker = bundle.worker
         assert worker is not None
         worker = bundle.worker
         assert worker is not None
-        logger.debug(f'Released worker {worker}')
+        logger.debug('Released worker %s', worker)
         self.status.record_release_worker(
             worker,
             bundle.uuid,
         self.status.record_release_worker(
             worker,
             bundle.uuid,
@@ -715,16 +887,18 @@ class RemoteExecutor(BaseExecutor):
             self.cv.notify()
         self.adjust_task_count(-1)
 
             self.cv.notify()
         self.adjust_task_count(-1)
 
-    def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+    def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+        """See if a particular bundle is cancelled.  Do not block."""
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
-                logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+                logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
                 bundle.was_cancelled = True
                 return True
         return False
 
                 bundle.was_cancelled = True
                 return True
         return False
 
-    def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+    def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -736,28 +910,26 @@ class RemoteExecutor(BaseExecutor):
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
-            worker = self.find_available_worker_or_block(avoid_machine)
-        assert worker
+            worker = self._find_available_worker_or_block(avoid_machine)
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{bundle}: Running bundle on {worker}...')
+        logger.debug('%s: Running bundle on %s...', bundle, worker)
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
         # now due to lack of worker availability and someone else may
         # have already finished it.
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
         # now due to lack of worker availability and someone else may
         # have already finished it.
-        if self.check_if_cancelled(bundle):
+        if self._check_if_cancelled(bundle):
             try:
             try:
-                return self.process_work_result(bundle)
+                return self._process_work_result(bundle)
             except Exception as e:
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
-                self.release_worker(bundle)
+                logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
+                self._release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
@@ -768,83 +940,101 @@ class RemoteExecutor(BaseExecutor):
                     # thing.
                     logger.exception(e)
                     logger.error(
                     # thing.
                     logger.exception(e)
                     logger.error(
-                        f'{bundle}: We are the original owner thread and yet there are '
-                        + 'no results for this bundle.  This is unexpected and bad.'
+                        '%s: We are the original owner thread and yet there are '
+                        'no results for this bundle.  This is unexpected and bad.',
+                        bundle,
                     )
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in process_work_result (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.  Let the original thread worry about
-                    # either finding the results or complaining about
-                    # it.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 start_ts = time.time()
-                logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+                logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+                logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
             except Exception as e:
-                self.release_worker(bundle)
+                self._release_worker(bundle)
                 if is_original:
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
                     logger.exception(e)
                     logger.error(
                     logger.exception(e)
                     logger.error(
-                        f"{bundle}: Failed to send instructions to the worker machine?! "
-                        + "This is not expected; we\'re the original bundle so this shouldn\'t "
-                        + "be a race condition.  Attempting an emergency retry..."
+                        "%s: Failed to send instructions to the worker machine?! "
+                        "This is not expected; we\'re the original bundle so this shouldn\'t "
+                        "be a race condition.  Attempting an emergency retry...",
+                        bundle,
                     )
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
-                    msg = f'{bundle}: Failed to send instructions to the worker machine... '
-                    msg += 'We\'re a backup and this may be caused by the original (or some '
-                    msg += 'other backup) already finishing this work.  Ignoring this.'
-                    logger.warning(msg)
+                    # code_file before we could copy it.  Ignore.
+                    logger.warning(
+                        '%s: Failed to send instructions to the worker machine... '
+                        'We\'re a backup and this may be caused by the original (or '
+                        'some other backup) already finishing this work.  Ignoring.',
+                        bundle,
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
                     return None
 
         # Kick off the work.  Note that if this fails we let
-        # wait_for_process deal with it.
+        # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
         self.status.record_processing_began(uuid)
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
-            f'"source py38-venv/bin/activate &&'
+            f'"source py39-venv/bin/activate &&'
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
-        logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+        logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
-        return self.wait_for_process(p, bundle, 0)
+        logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
+        return self._wait_for_process(p, bundle, 0)
 
 
-    def wait_for_process(
-        self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+    def _wait_for_process(
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
     ) -> Any:
+        """At this point we've copied the bundle's pickled code to the remote
+        worker and started an ssh process that should be invoking the
+        remote worker to have it execute the user's code.  See how
+        that's going and wait for it to complete or fail.  Note that
+        this code is recursive: there are codepaths where we decide to
+        stop waiting for an ssh process (because another backup seems
+        to have finished) but then fail to fetch or parse the results
+        from that backup and thus call ourselves to continue waiting
+        on an active ssh process.  This is the purpose of the depth
+        argument: to curtail potential infinite recursion by giving up
+        eventually.
+
+        Args:
+            p: the Popen record of the ssh job
+            bundle: the bundle of work being executed remotely
+            depth: how many retries we've made so far.  Starts at zero.
+
+        """
+
         machine = bundle.machine
         machine = bundle.machine
-        pid = p.pid
+        assert p is not None
+        pid = p.pid  # pid of the ssh process
         if depth > 3:
             logger.error(
         if depth > 3:
             logger.error(
-                f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+                "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
             )
             p.terminate()
-            self.release_worker(bundle)
-            return self.emergency_retry_nasty_bundle(bundle)
+            self._release_worker(bundle)
+            return self._emergency_retry_nasty_bundle(bundle)
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
@@ -853,22 +1043,20 @@ class RemoteExecutor(BaseExecutor):
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
-                if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                if self._check_if_cancelled(bundle):
+                    logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
                     break
             else:
-                logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+                logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
                 p = None
                 break
 
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
                 p = None
                 break
 
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
-        # and we're bailing out.
+        # before us and we're bailing out.
         try:
         try:
-            ret = self.process_work_result(bundle)
+            ret = self._process_work_result(bundle)
             if ret is not None and p is not None:
                 p.terminate()
             return ret
             if ret is not None and p is not None:
                 p.terminate()
             return ret
@@ -880,16 +1068,19 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{bundle}: Something unexpected just happened...')
+            logger.error('%s: Something unexpected just happened...', bundle)
             if p is not None:
             if p is not None:
-                msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
-                logger.warning(msg)
-                return self.wait_for_process(p, bundle, depth + 1)
+                logger.warning(
+                    "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+                )
+                return self._wait_for_process(p, bundle, depth + 1)
             else:
             else:
-                self.release_worker(bundle)
-                return self.emergency_retry_nasty_bundle(bundle)
+                self._release_worker(bundle)
+                return self._emergency_retry_nasty_bundle(bundle)
+
+    def _process_work_result(self, bundle: BundleDetails) -> Any:
+        """A bundle seems to be completed.  Check on the results."""
 
 
-    def process_work_result(self, bundle: BundleDetails) -> Any:
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
@@ -907,11 +1098,15 @@ class RemoteExecutor(BaseExecutor):
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+                        "%s: Fetching results back from %s@%s via %s",
+                        bundle,
+                        username,
+                        machine,
+                        cmd,
                     )
 
                     # If either of these throw they are handled in
                     )
 
                     # If either of these throw they are handled in
-                    # wait_for_process.
+                    # _wait_for_process.
                     attempts = 0
                     while True:
                         try:
                     attempts = 0
                     while True:
                         try:
@@ -919,17 +1114,15 @@ class RemoteExecutor(BaseExecutor):
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
-                                raise (e)
+                                raise e
                         else:
                             break
 
                         else:
                             break
 
+                    # Cleanup remote /tmp files.
                     run_silently(
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
                     )
+                    logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -938,25 +1131,25 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
         if is_original:
-            logger.debug(f"{bundle}: Unpickling {result_file}.")
+            logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
-                msg = f'Failed to load {result_file}... this is bad news.'
-                logger.critical(msg)
-                self.release_worker(bundle)
+                logger.error('Failed to load %s... this is bad news.', result_file)
+                self._release_worker(bundle)
 
 
-                # Re-raise the exception; the code in wait_for_process may
-                # decide to emergency_retry_nasty_bundle here.
-                raise Exception(e)
-            logger.debug(f'Removing local (master) {code_file} and {result_file}.')
-            os.remove(f'{result_file}')
-            os.remove(f'{code_file}')
+                # Re-raise the exception; the code in _wait_for_process may
+                # decide to _emergency_retry_nasty_bundle here.
+                raise e
+            logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+            os.remove(result_file)
+            os.remove(code_file)
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
@@ -965,7 +1158,7 @@ class RemoteExecutor(BaseExecutor):
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+                        '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
                     )
                     backup.is_cancelled.set()
 
                     )
                     backup.is_cancelled.set()
 
@@ -979,28 +1172,32 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
                 logger.debug(
                 logger.debug(
-                    f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+                    '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
                 )
-                bundle.src_bundle.is_cancelled.set()
-        self.release_worker(bundle, was_cancelled=was_cancelled)
+                orig_bundle.is_cancelled.set()
+        self._release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
         return result
 
-    def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
+    def _create_original_bundle(self, pickle, function_name: str):
+        """Creates a bundle that is not a backup of any other bundle but
+        rather represents a user task.
+        """
 
 
-        uuid = generate_uuid(omit_dashes=True)
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
-        logger.debug(f'Writing pickled code to {code_file}')
-        with open(f'{code_file}', 'wb') as wb:
+        logger.debug('Writing pickled code to %s', code_file)
+        with open(code_file, 'wb') as wb:
             wb.write(pickle)
 
         bundle = BundleDetails(
             pickled_code=pickle,
             uuid=uuid,
             wb.write(pickle)
 
         bundle = BundleDetails(
             pickled_code=pickle,
             uuid=uuid,
-            fname=fname,
+            function_name=function_name,
             worker=None,
             username=None,
             machine=None,
             worker=None,
             username=None,
             machine=None,
@@ -1019,10 +1216,14 @@ class RemoteExecutor(BaseExecutor):
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{bundle}: Created an original bundle')
+        logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
         return bundle
 
-    def create_backup_bundle(self, src_bundle: BundleDetails):
+    def _create_backup_bundle(self, src_bundle: BundleDetails):
+        """Creates a bundle that is a backup of another bundle that is
+        running too slowly."""
+
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
@@ -1030,7 +1231,7 @@ class RemoteExecutor(BaseExecutor):
         backup_bundle = BundleDetails(
             pickled_code=src_bundle.pickled_code,
             uuid=uuid,
         backup_bundle = BundleDetails(
             pickled_code=src_bundle.pickled_code,
             uuid=uuid,
-            fname=src_bundle.fname,
+            function_name=src_bundle.function_name,
             worker=None,
             username=None,
             machine=None,
             worker=None,
             username=None,
             machine=None,
@@ -1050,23 +1251,31 @@ class RemoteExecutor(BaseExecutor):
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{backup_bundle}: Created a backup bundle')
+        logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
         return backup_bundle
 
-    def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+    def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+        """Schedule a backup of src_bundle."""
+
         assert self.status.lock.locked()
         assert src_bundle is not None
         assert self.status.lock.locked()
         assert src_bundle is not None
-        backup_bundle = self.create_backup_bundle(src_bundle)
+        backup_bundle = self._create_backup_bundle(src_bundle)
         logger.debug(
         logger.debug(
-            f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+            '%s/%s: Scheduling backup for execution...',
+            backup_bundle.uuid,
+            backup_bundle.function_name,
         )
         )
-        self._helper_executor.submit(self.launch, backup_bundle)
+        self._helper_executor.submit(self._launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
+
+    def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+        """Something unexpectedly failed with bundle.  Either retry it
+        from the beginning or throw in the towel and give up on it."""
 
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1080,56 +1289,98 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                '%s: Tried this bundle too many times already (%dx); giving up.',
+                bundle,
+                retry_limit,
             )
             if is_original:
                 raise RemoteExecutorException(
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+                    f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
             else:
                 logger.error(
                 )
             else:
                 logger.error(
-                    f'{bundle}: At least it\'s only a backup; better luck with the others.'
+                    '%s: At least it\'s only a backup; better luck with the others.', bundle
                 )
             return None
         else:
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
             logger.warning(msg)
             warnings.warn(msg)
                 )
             return None
         else:
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
             logger.warning(msg)
             warnings.warn(msg)
-            return self.launch(bundle, avoid_last_machine)
+            return self._launch(bundle, avoid_last_machine)
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        """Submit work to be done.  This is the user entry point of this
+        class."""
         if self.already_shutdown:
             raise Exception('Submitted work after shutdown.')
         if self.already_shutdown:
             raise Exception('Submitted work after shutdown.')
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        bundle = self.create_original_bundle(pickle, function.__name__)
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        bundle = self._create_original_bundle(pickle, function.__name__)
         self.total_bundles_submitted += 1
         self.total_bundles_submitted += 1
-        return self._helper_executor.submit(self.launch, bundle)
+        return self._helper_executor.submit(self._launch, bundle)
 
     @overrides
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor."""
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logging.debug(f'Shutting down RemoteExecutor {self.title}')
+            logging.debug('Shutting down RemoteExecutor %s', self.title)
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
 @singleton
 class DefaultExecutors(object):
             self.already_shutdown = True
 
 
 @singleton
 class DefaultExecutors(object):
+    """A container for a default thread, process and remote executor.
+    These are not created until needed and we take care to clean up
+    before process exit automatically for the caller's convenience.
+    Instead of creating your own executor, consider using the one
+    from this pool.  e.g.::
+
+        @par.parallelize(method=par.Method.PROCESS)
+        def do_work(
+            solutions: List[Work],
+            shard_num: int,
+            ...
+        ):
+            <do the work>
+
+
+        def start_do_work(all_work: List[Work]):
+            shards = []
+            logger.debug('Sharding work into groups of 10.')
+            for subset in list_utils.shard(all_work, 10):
+                shards.append([x for x in subset])
+
+            logger.debug('Kicking off helper pool.')
+            try:
+                for n, shard in enumerate(shards):
+                    results.append(
+                        do_work(
+                            shard, n, shared_cache.get_name(), max_letter_pop_per_word
+                        )
+                    )
+                smart_future.wait_all(results)
+            finally:
+                # Note: if you forget to do this it will clean itself up
+                # during program termination including tearing down any
+                # active ssh connections.
+                executors.DefaultExecutors().process_pool().shutdown()
+    """
+
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
-    def ping(self, host) -> bool:
-        logger.debug(f'RUN> ping -c 1 {host}')
+    @staticmethod
+    def _ping(host) -> bool:
+        logger.debug('RUN> ping -c 1 %s', host)
         try:
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
             return x == 0
         except Exception:
             return False
@@ -1148,17 +1399,17 @@ class DefaultExecutors(object):
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
             pool: List[RemoteWorkerRecord] = []
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
             pool: List[RemoteWorkerRecord] = []
-            if self.ping('cheetah.house'):
+            if self._ping('cheetah.house'):
                 logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
                 logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
-                        weight=30,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
                     ),
                 )
-            if self.ping('meerkat.cabin'):
+            if self._ping('meerkat.cabin'):
                 logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                 logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
@@ -1168,33 +1419,33 @@ class DefaultExecutors(object):
                         count=2,
                     ),
                 )
                         count=2,
                     ),
                 )
-            if self.ping('wannabe.house'):
+            if self._ping('wannabe.house'):
                 logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
                 logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
-                        weight=25,
-                        count=10,
+                        weight=14,
+                        count=2,
                     ),
                 )
                     ),
                 )
-            if self.ping('puma.cabin'):
+            if self._ping('puma.cabin'):
                 logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
                 logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
-                        weight=30,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
                     ),
                 )
-            if self.ping('backup.house'):
+            if self._ping('backup.house'):
                 logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
                 logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
-                        weight=8,
+                        weight=9,
                         count=2,
                     ),
                 )
                         count=2,
                     ),
                 )
@@ -1202,8 +1453,8 @@ class DefaultExecutors(object):
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
-                    logger.info(f'Reducing workload for {record.machine}.')
-                    record.count = 1
+                    logger.info('Reducing workload for %s.', record.machine)
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
@@ -1212,11 +1463,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None
             self.remote_executor = None