Reduce the doctest lease duration...
[python_utils.git] / executors.py
index 399e32d14d33c3b3097a6457f762c76439785f5e..6485afa054689c3b668adb1e0708b7f2d29ed8b9 100644 (file)
@@ -1,34 +1,44 @@
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
 
-from abc import ABC, abstractmethod
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
-from typing import Any, Callable, Dict, List, Optional, Set
 import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 import histogram as hist
+import string_utils
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_exitcode, cmd_in_background, run_silently
 from thread_utils import background_thread
 
-
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
@@ -66,33 +76,73 @@ SSH = '/usr/bin/ssh -oForwardX11=no'
 SCP = '/usr/bin/scp -C'
 
 
-def make_cloud_pickle(fun, *args, **kwargs):
-    logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+def _make_cloud_pickle(fun, *args, **kwargs):
+    """Internal helper to create cloud pickles."""
+    logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
+    """The base executor interface definition.  The interface for
+    :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+    :class:`ThreadExecutor`.
+    """
+
     def __init__(self, *, title=''):
         self.title = title
-        self.task_count = 0
         self.histogram = hist.SimpleHistogram(
             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
         )
+        self.task_count = 0
 
     @abstractmethod
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
+        """Shutdown the executor and return True if the executor is idle
+        (i.e. there are no pending or active tasks).  Return False
+        otherwise.  Note: this should only be called by the launcher
+        process.
+
+        """
+        if self.task_count == 0:
+            self.shutdown(wait=True, quiet=quiet)
+            return True
+        return False
+
     def adjust_task_count(self, delta: int) -> None:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
         self.task_count += delta
-        logger.debug(f'Executor current task count is {self.task_count}')
+        logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
+
+    def get_task_count(self) -> int:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
+        return self.task_count
 
 
 class ThreadExecutor(BaseExecutor):
+    """A threadpool executor.  This executor uses python threads to
+    schedule tasks.  Note that, at least as of python3.10, because of
+    the global lock in the interpreter itself, these do not
+    parallelize very well so this class is useful mostly for non-CPU
+    intensive tasks.
+
+    See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+    """
+
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
@@ -100,41 +150,55 @@ class ThreadExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug(f'Creating threadpool executor with {workers} workers')
+        if workers is not None:
+            logger.debug('Creating threadpool executor with %d workers', workers)
+        else:
+            logger.debug('Creating a default sized threadpool executor')
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
+        self.already_shutdown = False
 
-    def run_local_bundle(self, fun, *args, **kwargs):
-        logger.debug(f"Running local bundle at {fun.__name__}")
-        start = time.time()
+    # This is run on a different thread; do not adjust task count here.
+    @staticmethod
+    def run_local_bundle(fun, *args, **kwargs):
+        logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
-        end = time.time()
-        self.adjust_task_count(-1)
-        duration = end - start
-        logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
-        self.histogram.add_item(duration)
         return result
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         self.adjust_task_count(+1)
         newargs = []
         newargs.append(function)
         for arg in args:
             newargs.append(arg)
-        return self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
+        start = time.time()
+        result = self._thread_pool_executor.submit(
+            ThreadExecutor.run_local_bundle, *newargs, **kwargs
         )
+        result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
+        return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logger.debug(f'Shutting down threadpool executor {self.title}')
-        print(self.histogram)
-        self._thread_pool_executor.shutdown(wait)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug('Shutting down threadpool executor %s', self.title)
+            self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
+    """An executor which runs tasks in child processes.
+
+    See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+    """
+
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
@@ -142,32 +206,43 @@ class ProcessExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug(f'Creating processpool executor with {workers} workers.')
+        if workers is not None:
+            logger.debug('Creating processpool executor with %d workers.', workers)
+        else:
+            logger.debug('Creating a default sized processpool executor')
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
+        self.already_shutdown = False
 
-    def run_cloud_pickle(self, pickle):
+    # This is run in another process; do not adjust task count here.
+    @staticmethod
+    def run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
-        logger.debug(f"Running pickled bundle at {fun.__name__}")
+        logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
-        self.adjust_task_count(-1)
         return result
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logger.debug(f'Shutting down processpool executor {self.title}')
-        self._process_executor.shutdown(wait)
-        print(self.histogram)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug('Shutting down processpool executor %s', self.title)
+            self._process_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
     def __getstate__(self):
         state = self.__dict__.copy()
@@ -183,10 +258,21 @@ class RemoteExecutorException(Exception):
 
 @dataclass
 class RemoteWorkerRecord:
+    """A record of info about a remote worker."""
+
     username: str
+    """Username we can ssh into on this machine to run work."""
+
     machine: str
+    """Machine address / name."""
+
     weight: int
+    """Relative probability for the weighted policy to select this
+    machine for scheduling work."""
+
     count: int
+    """If this machine is selected, what is the maximum number of task
+    that it can handle?"""
 
     def __hash__(self):
         return hash((self.username, self.machine))
@@ -197,25 +283,71 @@ class RemoteWorkerRecord:
 
 @dataclass
 class BundleDetails:
+    """All info necessary to define some unit of work that needs to be
+    done, where it is being run, its state, whether it is an original
+    bundle of a backup bundle, how many times it has failed, etc...
+    """
+
     pickled_code: bytes
+    """The code to run, cloud pickled"""
+
     uuid: str
-    fname: str
+    """A unique identifier"""
+
+    function_name: str
+    """The name of the function we pickled"""
+
     worker: Optional[RemoteWorkerRecord]
+    """The remote worker running this bundle or None if none (yet)"""
+
     username: Optional[str]
+    """The remote username running this bundle or None if none (yet)"""
+
     machine: Optional[str]
+    """The remote machine running this bundle or None if none (yet)"""
+
     hostname: str
+    """The controller machine"""
+
     code_file: str
+    """A unique filename to hold the work to be done"""
+
     result_file: str
+    """Where the results should be placed / read from"""
+
     pid: int
+    """The process id of the local subprocess watching the ssh connection
+    to the remote machine"""
+
     start_ts: float
+    """Starting time"""
+
     end_ts: float
+    """Ending time"""
+
     slower_than_local_p95: bool
+    """Currently slower then 95% of other bundles on remote host"""
+
     slower_than_global_p95: bool
-    src_bundle: BundleDetails
+    """Currently slower than 95% of other bundles globally"""
+
+    src_bundle: Optional[BundleDetails]
+    """If this is a backup bundle, this points to the original bundle
+    that it's backing up.  None otherwise."""
+
     is_cancelled: threading.Event
+    """An event that can be signaled to indicate this bundle is cancelled.
+    This is set when another copy (backup or original) of this work has
+    completed successfully elsewhere."""
+
     was_cancelled: bool
+    """True if this bundle was cancelled, False if it finished normally"""
+
     backup_bundles: Optional[List[BundleDetails]]
+    """If we've created backups of this bundle, this is the list of them"""
+
     failure_count: int
+    """How many times has this bundle failed already?"""
 
     def __repr__(self):
         uuid = self.uuid
@@ -225,6 +357,9 @@ class BundleDetails:
         else:
             suffix = uuid[-6:]
 
+        # We colorize the uuid based on some bits from it to make them
+        # stand out in the logging and help a reader correlate log messages
+        # related to the same bundle.
         colorz = [
             fg('violet red'),
             fg('red'),
@@ -241,21 +376,29 @@ class BundleDetails:
             fg('medium purple'),
         ]
         c = colorz[int(uuid[-2:], 16) % len(colorz)]
-        fname = self.fname if self.fname is not None else 'nofname'
+        function_name = self.function_name if self.function_name is not None else 'nofname'
         machine = self.machine if self.machine is not None else 'nomachine'
-        return f'{c}{suffix}/{fname}/{machine}{reset()}'
+        return f'{c}{suffix}/{function_name}/{machine}{reset()}'
 
 
 class RemoteExecutorStatus:
+    """A status 'scoreboard' for a remote executor tracking various
+    metrics and able to render a periodic dump of global state.
+    """
+
     def __init__(self, total_worker_count: int) -> None:
+        """C'tor.
+
+        Args:
+            total_worker_count: number of workers in the pool
+
+        """
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -267,12 +410,18 @@ class RemoteExecutorStatus:
         self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Record that bundle with uuid is assigned to a particular worker.
+
+        Args:
+            worker: the record of the worker to which uuid is assigned
+            uuid: the uuid of a bundle that has been assigned to a worker
+        """
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Same as above but an entry point that doesn't acquire the lock
+        for codepaths where it's already held."""
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -281,10 +430,12 @@ class RemoteExecutorStatus:
         self.in_flight_bundles_by_worker[worker] = x
 
     def record_bundle_details(self, details: BundleDetails) -> None:
+        """Register the details about a bundle of work."""
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
@@ -294,6 +445,7 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Record that a bundle has released a worker."""
         with self.lock:
             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
@@ -303,22 +455,27 @@ class RemoteExecutorStatus:
         uuid: str,
         was_cancelled: bool,
     ) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
-            x = self.finished_bundle_timings_per_worker.get(worker, list())
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
+            x = self.finished_bundle_timings_per_worker.get(worker, [])
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
+        """Record when work on a bundle begins."""
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
+        """How many bundles are in flight currently?"""
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
@@ -326,6 +483,7 @@ class RemoteExecutorStatus:
         return total_in_flight
 
     def total_idle(self) -> int:
+        """How many idle workers are there currently?"""
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
@@ -404,7 +562,12 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    def register_worker_pool(self, workers):
+    """A policy for selecting a remote worker base class."""
+
+    def __init__(self):
+        self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+    def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
         self.workers = workers
 
     @abstractmethod
@@ -417,73 +580,125 @@ class RemoteWorkerSelectionPolicy(ABC):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that uses weighted RNG."""
+
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
-        for worker in self.workers:
-            for x in range(0, worker.count):
-                for y in range(0, worker.weight):
-                    grabbag.append(worker)
-
-        for _ in range(0, 5):
-            random.shuffle(grabbag)
-            worker = grabbag[0]
-            if worker.machine != machine_to_avoid or _ > 2:
-                if worker.count > 0:
-                    worker.count -= 1
-                    logger.debug(f'Selected worker {worker}')
-                    return worker
-        msg = 'Unexpectedly could not find a worker, retrying...'
-        logger.warning(msg)
-        return None
+        if self.workers:
+            for worker in self.workers:
+                if worker.machine != machine_to_avoid:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+            if self.workers:
+                for worker in self.workers:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.warning('There are no available workers?!')
+            return None
+
+        worker = random.sample(grabbag, 1)[0]
+        assert worker.count > 0
+        worker.count -= 1
+        logger.debug('Selected worker %s', worker)
+        return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that just round robins."""
+
     def __init__(self) -> None:
+        super().__init__()
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
-        x = self.index
-        while True:
-            worker = self.workers[x]
-            if worker.count > 0:
-                worker.count -= 1
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        if self.workers:
+            x = self.index
+            while True:
+                worker = self.workers[x]
+                if worker.count > 0:
+                    worker.count -= 1
+                    x += 1
+                    if x >= len(self.workers):
+                        x = 0
+                    self.index = x
+                    logger.debug('Selected worker %s', worker)
+                    return worker
                 x += 1
                 if x >= len(self.workers):
                     x = 0
-                self.index = x
-                logger.debug(f'Selected worker {worker}')
-                return worker
-            x += 1
-            if x >= len(self.workers):
-                x = 0
-            if x == self.index:
-                msg = 'Unexpectedly could not find a worker, retrying...'
-                logger.warning(msg)
-                return None
+                if x == self.index:
+                    logger.warning('Unexpectedly could not find a worker, retrying...')
+                    return None
+        return None
 
 
 class RemoteExecutor(BaseExecutor):
+    """An executor that uses processes on remote machines to do work.  This
+    works by creating "bundles" of work with pickled code in each to be
+    executed.  Each bundle is assigned a remote worker based on some policy
+    heuristics.  Once assigned to a remote worker, a local subprocess is
+    created.  It copies the pickled code to the remote machine via ssh/scp
+    and then starts up work on the remote machine again using ssh.  When
+    the work is complete it copies the results back to the local machine.
+
+    So there is essentially one "controller" machine (which may also be
+    in the remote executor pool and therefore do task work in addition to
+    controlling) and N worker machines.  This code runs on the controller
+    whereas on the worker machines we invoke pickled user code via a
+    shim in :file:`remote_worker.py`.
+
+    Some redundancy and safety provisions are made; e.g. slower than
+    expected tasks have redundant backups created and if a task fails
+    repeatedly we consider it poisoned and give up on it.
+
+    .. warning::
+
+        The network overhead / latency of copying work from the
+        controller machine to the remote workers is relatively high.
+        This executor probably only makes sense to use with
+        computationally expensive tasks such as jobs that will execute
+        for ~30 seconds or longer.
+
+    See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+    """
+
     def __init__(
-        self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
+        self,
+        workers: List[RemoteWorkerRecord],
+        policy: RemoteWorkerSelectionPolicy,
     ) -> None:
+        """C'tor.
+
+        Args:
+            workers: A list of remote workers we can call on to do tasks.
+            policy: A policy for selecting remote workers for tasks.
+        """
+
         super().__init__()
         self.workers = workers
         self.policy = policy
@@ -496,9 +711,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -510,34 +723,45 @@ class RemoteExecutor(BaseExecutor):
         (
             self.heartbeat_thread,
             self.heartbeat_stop_event,
-        ) = self.run_periodic_heartbeat()
+        ) = self._run_periodic_heartbeat()
+        self.already_shutdown = False
 
     @background_thread
-    def run_periodic_heartbeat(self, stop_event) -> None:
+    def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+        """
+        We create a background thread to invoke :meth:`_heartbeat` regularly
+        while we are scheduling work.  It does some accounting such as
+        looking for slow bundles to tag for backup creation, checking for
+        unexpected failures, and printing a fancy message on stdout.
+        """
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
-            self.heartbeat()
+            self._heartbeat()
         logger.debug('Periodic heartbeat thread shutting down.')
 
-    def heartbeat(self) -> None:
+    def _heartbeat(self) -> None:
+        # Note: this is invoked on a background thread, not an
+        # executor thread.  Be careful what you do with it b/c it
+        # needs to get back and dump status again periodically.
         with self.status.lock:
-            # Dump regular progress report
             self.status.periodic_dump(self.total_bundles_submitted)
 
             # Look for bundles to reschedule via executor.submit
             if config.config['executors_schedule_remote_backups']:
-                self.maybe_schedule_backup_bundles()
+                self._maybe_schedule_backup_bundles()
+
+    def _maybe_schedule_backup_bundles(self):
+        """Maybe schedule backup bundles if we see a very slow bundle."""
 
-    def maybe_schedule_backup_bundles(self):
         assert self.status.lock.locked()
         num_done = len(self.status.finished_bundle_timings)
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
-            num_done > 2
-            and num_idle_workers > 1
-            and (self.last_backup is None or (now - self.last_backup > 6.0))
+            num_done >= 2
+            and num_idle_workers > 0
+            and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             try:
@@ -577,21 +801,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -605,12 +823,12 @@ class RemoteExecutor(BaseExecutor):
                             else:
                                 score = 0
                             logger.debug(
-                                f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
+                                'score[%s] => %.1f  # {backup_count} dup backup factor',
+                                bundle,
+                                score,
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -619,44 +837,46 @@ class RemoteExecutor(BaseExecutor):
 
                 # Note: this is all still happening on the heartbeat
                 # runner thread.  That's ok because
-                # schedule_backup_for_bundle uses the executor to
+                # _schedule_backup_for_bundle uses the executor to
                 # submit the bundle again which will cause it to be
                 # picked up by a worker thread and allow this thread
                 # to return to run future heartbeats.
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
-                        f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+                        '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+                        bundle_to_backup,
+                        best_score,
                     )
-                    self.schedule_backup_for_bundle(bundle_to_backup)
+                    self._schedule_backup_for_bundle(bundle_to_backup)
             finally:
                 self.backup_lock.release()
 
-    def is_worker_available(self) -> bool:
+    def _is_worker_available(self) -> bool:
+        """Is there a worker available currently?"""
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        """Try to acquire a worker."""
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+        """Find a worker or block until one becomes available."""
         with self.cv:
-            while not self.is_worker_available():
+            while not self._is_worker_available():
                 self.cv.wait()
-            worker = self.acquire_worker(machine_to_avoid)
+            worker = self._acquire_worker(machine_to_avoid)
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
-    def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+    def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+        """Release a previously acquired worker."""
         worker = bundle.worker
         assert worker is not None
-        logger.debug(f'Released worker {worker}')
+        logger.debug('Released worker %s', worker)
         self.status.record_release_worker(
             worker,
             bundle.uuid,
@@ -667,16 +887,18 @@ class RemoteExecutor(BaseExecutor):
             self.cv.notify()
         self.adjust_task_count(-1)
 
-    def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+    def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+        """See if a particular bundle is cancelled.  Do not block."""
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
-                logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+                logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
                 bundle.was_cancelled = True
                 return True
         return False
 
-    def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+    def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -688,28 +910,26 @@ class RemoteExecutor(BaseExecutor):
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
-            worker = self.find_available_worker_or_block(avoid_machine)
-        assert worker
+            worker = self._find_available_worker_or_block(avoid_machine)
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{bundle}: Running bundle on {worker}...')
+        logger.debug('%s: Running bundle on %s...', bundle, worker)
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
         # now due to lack of worker availability and someone else may
         # have already finished it.
-        if self.check_if_cancelled(bundle):
+        if self._check_if_cancelled(bundle):
             try:
-                return self.process_work_result(bundle)
+                return self._process_work_result(bundle)
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
-                self.release_worker(bundle)
+                logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
+                self._release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
@@ -720,83 +940,101 @@ class RemoteExecutor(BaseExecutor):
                     # thing.
                     logger.exception(e)
                     logger.error(
-                        f'{bundle}: We are the original owner thread and yet there are '
-                        + 'no results for this bundle.  This is unexpected and bad.'
+                        '%s: We are the original owner thread and yet there are '
+                        'no results for this bundle.  This is unexpected and bad.',
+                        bundle,
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in process_work_result (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.  Let the original thread worry about
-                    # either finding the results or complaining about
-                    # it.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
-                logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+                logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+                logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
-                self.release_worker(bundle)
+                self._release_worker(bundle)
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
                     logger.exception(e)
                     logger.error(
-                        f"{bundle}: Failed to send instructions to the worker machine?! "
-                        + "This is not expected; we\'re the original bundle so this shouldn\'t "
-                        + "be a race condition.  Attempting an emergency retry..."
+                        "%s: Failed to send instructions to the worker machine?! "
+                        "This is not expected; we\'re the original bundle so this shouldn\'t "
+                        "be a race condition.  Attempting an emergency retry...",
+                        bundle,
                     )
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
-                    msg = f'{bundle}: Failed to send instructions to the worker machine... '
-                    msg += 'We\'re a backup and this may be caused by the original (or some '
-                    msg += 'other backup) already finishing this work.  Ignoring this.'
-                    logger.warning(msg)
+                    # code_file before we could copy it.  Ignore.
+                    logger.warning(
+                        '%s: Failed to send instructions to the worker machine... '
+                        'We\'re a backup and this may be caused by the original (or '
+                        'some other backup) already finishing this work.  Ignoring.',
+                        bundle,
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
-        # wait_for_process deal with it.
+        # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
         cmd = (
             f'{SSH} {bundle.username}@{bundle.machine} '
-            f'"source py38-venv/bin/activate &&'
+            f'"source py39-venv/bin/activate &&'
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
-        logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+        logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
-        return self.wait_for_process(p, bundle, 0)
+        logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
+        return self._wait_for_process(p, bundle, 0)
 
-    def wait_for_process(
-        self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+    def _wait_for_process(
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
+        """At this point we've copied the bundle's pickled code to the remote
+        worker and started an ssh process that should be invoking the
+        remote worker to have it execute the user's code.  See how
+        that's going and wait for it to complete or fail.  Note that
+        this code is recursive: there are codepaths where we decide to
+        stop waiting for an ssh process (because another backup seems
+        to have finished) but then fail to fetch or parse the results
+        from that backup and thus call ourselves to continue waiting
+        on an active ssh process.  This is the purpose of the depth
+        argument: to curtail potential infinite recursion by giving up
+        eventually.
+
+        Args:
+            p: the Popen record of the ssh job
+            bundle: the bundle of work being executed remotely
+            depth: how many retries we've made so far.  Starts at zero.
+
+        """
+
         machine = bundle.machine
-        pid = p.pid
+        assert p is not None
+        pid = p.pid  # pid of the ssh process
         if depth > 3:
             logger.error(
-                f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+                "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
-            self.release_worker(bundle)
-            return self.emergency_retry_nasty_bundle(bundle)
+            self._release_worker(bundle)
+            return self._emergency_retry_nasty_bundle(bundle)
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
@@ -805,22 +1043,20 @@ class RemoteExecutor(BaseExecutor):
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
-                if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                if self._check_if_cancelled(bundle):
+                    logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
-                logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+                logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
                 p = None
                 break
 
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
-        # and we're bailing out.
+        # before us and we're bailing out.
         try:
-            ret = self.process_work_result(bundle)
+            ret = self._process_work_result(bundle)
             if ret is not None and p is not None:
                 p.terminate()
             return ret
@@ -832,16 +1068,19 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{bundle}: Something unexpected just happened...')
+            logger.error('%s: Something unexpected just happened...', bundle)
             if p is not None:
-                msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
-                logger.warning(msg)
-                return self.wait_for_process(p, bundle, depth + 1)
+                logger.warning(
+                    "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+                )
+                return self._wait_for_process(p, bundle, depth + 1)
             else:
-                self.release_worker(bundle)
-                return self.emergency_retry_nasty_bundle(bundle)
+                self._release_worker(bundle)
+                return self._emergency_retry_nasty_bundle(bundle)
+
+    def _process_work_result(self, bundle: BundleDetails) -> Any:
+        """A bundle seems to be completed.  Check on the results."""
 
-    def process_work_result(self, bundle: BundleDetails) -> Any:
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
@@ -859,11 +1098,15 @@ class RemoteExecutor(BaseExecutor):
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+                        "%s: Fetching results back from %s@%s via %s",
+                        bundle,
+                        username,
+                        machine,
+                        cmd,
                     )
 
                     # If either of these throw they are handled in
-                    # wait_for_process.
+                    # _wait_for_process.
                     attempts = 0
                     while True:
                         try:
@@ -871,17 +1114,15 @@ class RemoteExecutor(BaseExecutor):
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
-                                raise (e)
+                                raise e
                         else:
                             break
 
+                    # Cleanup remote /tmp files.
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
+                    logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -890,25 +1131,25 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
-            logger.debug(f"{bundle}: Unpickling {result_file}.")
+            logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
-                msg = f'Failed to load {result_file}... this is bad news.'
-                logger.critical(msg)
-                self.release_worker(bundle)
+                logger.error('Failed to load %s... this is bad news.', result_file)
+                self._release_worker(bundle)
 
-                # Re-raise the exception; the code in wait_for_process may
-                # decide to emergency_retry_nasty_bundle here.
-                raise Exception(e)
-            logger.debug(f'Removing local (master) {code_file} and {result_file}.')
-            os.remove(f'{result_file}')
-            os.remove(f'{code_file}')
+                # Re-raise the exception; the code in _wait_for_process may
+                # decide to _emergency_retry_nasty_bundle here.
+                raise e
+            logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+            os.remove(result_file)
+            os.remove(code_file)
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
@@ -917,7 +1158,7 @@ class RemoteExecutor(BaseExecutor):
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+                        '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
                     )
                     backup.is_cancelled.set()
 
@@ -931,28 +1172,32 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
                 logger.debug(
-                    f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+                    '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
-                bundle.src_bundle.is_cancelled.set()
-        self.release_worker(bundle, was_cancelled=was_cancelled)
+                orig_bundle.is_cancelled.set()
+        self._release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
-    def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
+    def _create_original_bundle(self, pickle, function_name: str):
+        """Creates a bundle that is not a backup of any other bundle but
+        rather represents a user task.
+        """
 
-        uuid = generate_uuid(omit_dashes=True)
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
-        logger.debug(f'Writing pickled code to {code_file}')
-        with open(f'{code_file}', 'wb') as wb:
+        logger.debug('Writing pickled code to %s', code_file)
+        with open(code_file, 'wb') as wb:
             wb.write(pickle)
 
         bundle = BundleDetails(
             pickled_code=pickle,
             uuid=uuid,
-            fname=fname,
+            function_name=function_name,
             worker=None,
             username=None,
             machine=None,
@@ -971,10 +1216,14 @@ class RemoteExecutor(BaseExecutor):
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{bundle}: Created an original bundle')
+        logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
-    def create_backup_bundle(self, src_bundle: BundleDetails):
+    def _create_backup_bundle(self, src_bundle: BundleDetails):
+        """Creates a bundle that is a backup of another bundle that is
+        running too slowly."""
+
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
@@ -982,7 +1231,7 @@ class RemoteExecutor(BaseExecutor):
         backup_bundle = BundleDetails(
             pickled_code=src_bundle.pickled_code,
             uuid=uuid,
-            fname=src_bundle.fname,
+            function_name=src_bundle.function_name,
             worker=None,
             username=None,
             machine=None,
@@ -1002,23 +1251,31 @@ class RemoteExecutor(BaseExecutor):
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{backup_bundle}: Created a backup bundle')
+        logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
-    def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+    def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+        """Schedule a backup of src_bundle."""
+
         assert self.status.lock.locked()
         assert src_bundle is not None
-        backup_bundle = self.create_backup_bundle(src_bundle)
+        backup_bundle = self._create_backup_bundle(src_bundle)
         logger.debug(
-            f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+            '%s/%s: Scheduling backup for execution...',
+            backup_bundle.uuid,
+            backup_bundle.function_name,
         )
-        self._helper_executor.submit(self.launch, backup_bundle)
+        self._helper_executor.submit(self._launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
+
+    def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+        """Something unexpectedly failed with bundle.  Either retry it
+        from the beginning or throw in the towel and give up on it."""
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1032,52 +1289,98 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                '%s: Tried this bundle too many times already (%dx); giving up.',
+                bundle,
+                retry_limit,
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+                    f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
             else:
                 logger.error(
-                    f'{bundle}: At least it\'s only a backup; better luck with the others.'
+                    '%s: At least it\'s only a backup; better luck with the others.', bundle
                 )
             return None
         else:
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
             logger.warning(msg)
             warnings.warn(msg)
-            return self.launch(bundle, avoid_last_machine)
+            return self._launch(bundle, avoid_last_machine)
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        bundle = self.create_original_bundle(pickle, function.__name__)
+        """Submit work to be done.  This is the user entry point of this
+        class."""
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        bundle = self._create_original_bundle(pickle, function.__name__)
         self.total_bundles_submitted += 1
-        return self._helper_executor.submit(self.launch, bundle)
+        return self._helper_executor.submit(self._launch, bundle)
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logging.debug(f'Shutting down RemoteExecutor {self.title}')
-        self.heartbeat_stop_event.set()
-        self.heartbeat_thread.join()
-        self._helper_executor.shutdown(wait)
-        print(self.histogram)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor."""
+        if not self.already_shutdown:
+            logging.debug('Shutting down RemoteExecutor %s', self.title)
+            self.heartbeat_stop_event.set()
+            self.heartbeat_thread.join()
+            self._helper_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
 
 @singleton
 class DefaultExecutors(object):
+    """A container for a default thread, process and remote executor.
+    These are not created until needed and we take care to clean up
+    before process exit automatically for the caller's convenience.
+    Instead of creating your own executor, consider using the one
+    from this pool.  e.g.::
+
+        @par.parallelize(method=par.Method.PROCESS)
+        def do_work(
+            solutions: List[Work],
+            shard_num: int,
+            ...
+        ):
+            <do the work>
+
+
+        def start_do_work(all_work: List[Work]):
+            shards = []
+            logger.debug('Sharding work into groups of 10.')
+            for subset in list_utils.shard(all_work, 10):
+                shards.append([x for x in subset])
+
+            logger.debug('Kicking off helper pool.')
+            try:
+                for n, shard in enumerate(shards):
+                    results.append(
+                        do_work(
+                            shard, n, shared_cache.get_name(), max_letter_pop_per_word
+                        )
+                    )
+                smart_future.wait_all(results)
+            finally:
+                # Note: if you forget to do this it will clean itself up
+                # during program termination including tearing down any
+                # active ssh connections.
+                executors.DefaultExecutors().process_pool().shutdown()
+    """
+
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
-    def ping(self, host) -> bool:
-        logger.debug(f'RUN> ping -c 1 {host}')
+    @staticmethod
+    def _ping(host) -> bool:
+        logger.debug('RUN> ping -c 1 %s', host)
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
@@ -1096,17 +1399,17 @@ class DefaultExecutors(object):
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
             pool: List[RemoteWorkerRecord] = []
-            if self.ping('cheetah.house'):
+            if self._ping('cheetah.house'):
                 logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
-                        weight=25,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
-            if self.ping('meerkat.cabin'):
+            if self._ping('meerkat.cabin'):
                 logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
@@ -1116,33 +1419,33 @@ class DefaultExecutors(object):
                         count=2,
                     ),
                 )
-            if self.ping('wannabe.house'):
+            if self._ping('wannabe.house'):
                 logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
-                        weight=30,
-                        count=10,
+                        weight=14,
+                        count=2,
                     ),
                 )
-            if self.ping('puma.cabin'):
+            if self._ping('puma.cabin'):
                 logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
-                        weight=25,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
-            if self.ping('backup.house'):
+            if self._ping('backup.house'):
                 logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
-                        weight=7,
+                        weight=9,
                         count=2,
                     ),
                 )
@@ -1150,8 +1453,8 @@ class DefaultExecutors(object):
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
-                    logger.info(f'Reducing workload for {record.machine}.')
-                    record.count = 1
+                    logger.info('Reducing workload for %s.', record.machine)
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
@@ -1160,11 +1463,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None