Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / executors.py
index a0273264339ef3b158c1370f457f110842decff1..2794ca18f6667fef64097272abd3bc4f58896298 100644 (file)
@@ -1,43 +1,56 @@
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
 
-from abc import ABC, abstractmethod
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
+import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass, fields
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
-from decorator_utils import singleton
 import histogram as hist
+import persistent
+import string_utils
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_exitcode, cmd_in_background, run_silently
+from thread_utils import background_thread
 
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
-    f"Executors ({__file__})",
-    "Args related to processing executors."
+    f"Executors ({__file__})", "Args related to processing executors."
 )
 parser.add_argument(
     '--executors_threadpool_size',
     type=int,
     metavar='#THREADS',
     help='Number of threads in the default threadpool, leave unset for default',
-    default=None
+    default=None,
 )
 parser.add_argument(
     '--executors_processpool_size',
@@ -59,137 +72,186 @@ parser.add_argument(
     metavar='#FAILURES',
     help='Maximum number of failures before giving up on a bundle',
 )
+parser.add_argument(
+    '--remote_worker_records_file',
+    type=str,
+    metavar='FILENAME',
+    help='Path of the remote worker records file (JSON)',
+    default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
+)
 
-RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
-SSH = 'ssh -oForwardX11=no'
 
+SSH = '/usr/bin/ssh -oForwardX11=no'
+SCP = '/usr/bin/scp -C'
 
-def make_cloud_pickle(fun, *args, **kwargs):
-    logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+
+def _make_cloud_pickle(fun, *args, **kwargs):
+    """Internal helper to create cloud pickles."""
+    logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
+    """The base executor interface definition.  The interface for
+    :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
+    :class:`ThreadExecutor`.
+    """
+
     def __init__(self, *, title=''):
         self.title = title
-        self.task_count = 0
         self.histogram = hist.SimpleHistogram(
-            hist.SimpleHistogram.n_evenly_spaced_buckets(
-                int(0), int(500), 50
-            )
+            hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
         )
+        self.task_count = 0
 
     @abstractmethod
-    def submit(self,
-               function: Callable,
-               *args,
-               **kwargs) -> fut.Future:
+    def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
         pass
 
     @abstractmethod
-    def shutdown(self,
-                 wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
+        """Shutdown the executor and return True if the executor is idle
+        (i.e. there are no pending or active tasks).  Return False
+        otherwise.  Note: this should only be called by the launcher
+        process.
+
+        """
+        if self.task_count == 0:
+            self.shutdown(wait=True, quiet=quiet)
+            return True
+        return False
+
     def adjust_task_count(self, delta: int) -> None:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
         self.task_count += delta
-        logger.debug(f'Executor current task count is {self.task_count}')
+        logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
+
+    def get_task_count(self) -> int:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
+        return self.task_count
 
 
 class ThreadExecutor(BaseExecutor):
-    def __init__(self,
-                 max_workers: Optional[int] = None):
+    """A threadpool executor.  This executor uses python threads to
+    schedule tasks.  Note that, at least as of python3.10, because of
+    the global lock in the interpreter itself, these do not
+    parallelize very well so this class is useful mostly for non-CPU
+    intensive tasks.
+
+    See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
+    """
+
+    def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
         if max_workers is not None:
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug(f'Creating threadpool executor with {workers} workers')
+        if workers is not None:
+            logger.debug('Creating threadpool executor with %d workers', workers)
+        else:
+            logger.debug('Creating a default sized threadpool executor')
         self._thread_pool_executor = fut.ThreadPoolExecutor(
-            max_workers=workers,
-            thread_name_prefix="thread_executor_helper"
+            max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
+        self.already_shutdown = False
 
-    def run_local_bundle(self, fun, *args, **kwargs):
-        logger.debug(f"Running local bundle at {fun.__name__}")
-        start = time.time()
+    # This is run on a different thread; do not adjust task count here.
+    @staticmethod
+    def run_local_bundle(fun, *args, **kwargs):
+        logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
-        end = time.time()
-        self.adjust_task_count(-1)
-        duration = end - start
-        logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
-        self.histogram.add_item(duration)
         return result
 
     @overrides
-    def submit(self,
-               function: Callable,
-               *args,
-               **kwargs) -> fut.Future:
+    def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         self.adjust_task_count(+1)
         newargs = []
         newargs.append(function)
         for arg in args:
             newargs.append(arg)
-        return self._thread_pool_executor.submit(
-            self.run_local_bundle,
-            *newargs,
-            **kwargs)
+        start = time.time()
+        result = self._thread_pool_executor.submit(
+            ThreadExecutor.run_local_bundle, *newargs, **kwargs
+        )
+        result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
+        return result
 
     @overrides
-    def shutdown(self,
-                 wait = True) -> None:
-        logger.debug(f'Shutting down threadpool executor {self.title}')
-        print(self.histogram)
-        self._thread_pool_executor.shutdown(wait)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug('Shutting down threadpool executor %s', self.title)
+            self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
-    def __init__(self,
-                 max_workers=None):
+    """An executor which runs tasks in child processes.
+
+    See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
+    """
+
+    def __init__(self, max_workers=None):
         super().__init__()
         workers = None
         if max_workers is not None:
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug(f'Creating processpool executor with {workers} workers.')
+        if workers is not None:
+            logger.debug('Creating processpool executor with %d workers.', workers)
+        else:
+            logger.debug('Creating a default sized processpool executor')
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
+        self.already_shutdown = False
 
-    def run_cloud_pickle(self, pickle):
+    # This is run in another process; do not adjust task count here.
+    @staticmethod
+    def run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
-        logger.debug(f"Running pickled bundle at {fun.__name__}")
+        logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
-        self.adjust_task_count(-1)
         return result
 
     @overrides
-    def submit(self,
-               function: Callable,
-               *args,
-               **kwargs) -> fut.Future:
+    def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(
-            self.run_cloud_pickle,
-            pickle
-        )
-        result.add_done_callback(
-            lambda _: self.histogram.add_item(
-                time.time() - start
-            )
-        )
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
+        result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logger.debug(f'Shutting down processpool executor {self.title}')
-        self._process_executor.shutdown(wait)
-        print(self.histogram)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug('Shutting down processpool executor %s', self.title)
+            self._process_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
     def __getstate__(self):
         state = self.__dict__.copy()
@@ -199,15 +261,27 @@ class ProcessExecutor(BaseExecutor):
 
 class RemoteExecutorException(Exception):
     """Thrown when a bundle cannot be executed despite several retries."""
+
     pass
 
 
 @dataclass
 class RemoteWorkerRecord:
+    """A record of info about a remote worker."""
+
     username: str
+    """Username we can ssh into on this machine to run work."""
+
     machine: str
+    """Machine address / name."""
+
     weight: int
+    """Relative probability for the weighted policy to select this
+    machine for scheduling work."""
+
     count: int
+    """If this machine is selected, what is the maximum number of task
+    that it can handle?"""
 
     def __hash__(self):
         return hash((self.username, self.machine))
@@ -218,66 +292,145 @@ class RemoteWorkerRecord:
 
 @dataclass
 class BundleDetails:
+    """All info necessary to define some unit of work that needs to be
+    done, where it is being run, its state, whether it is an original
+    bundle of a backup bundle, how many times it has failed, etc...
+    """
+
     pickled_code: bytes
+    """The code to run, cloud pickled"""
+
     uuid: str
-    fname: str
+    """A unique identifier"""
+
+    function_name: str
+    """The name of the function we pickled"""
+
     worker: Optional[RemoteWorkerRecord]
+    """The remote worker running this bundle or None if none (yet)"""
+
     username: Optional[str]
+    """The remote username running this bundle or None if none (yet)"""
+
     machine: Optional[str]
+    """The remote machine running this bundle or None if none (yet)"""
+
     hostname: str
+    """The controller machine"""
+
     code_file: str
+    """A unique filename to hold the work to be done"""
+
     result_file: str
+    """Where the results should be placed / read from"""
+
     pid: int
+    """The process id of the local subprocess watching the ssh connection
+    to the remote machine"""
+
     start_ts: float
+    """Starting time"""
+
     end_ts: float
-    too_slow: bool
-    super_slow: bool
-    src_bundle: BundleDetails
+    """Ending time"""
+
+    slower_than_local_p95: bool
+    """Currently slower then 95% of other bundles on remote host"""
+
+    slower_than_global_p95: bool
+    """Currently slower than 95% of other bundles globally"""
+
+    src_bundle: Optional[BundleDetails]
+    """If this is a backup bundle, this points to the original bundle
+    that it's backing up.  None otherwise."""
+
     is_cancelled: threading.Event
+    """An event that can be signaled to indicate this bundle is cancelled.
+    This is set when another copy (backup or original) of this work has
+    completed successfully elsewhere."""
+
     was_cancelled: bool
+    """True if this bundle was cancelled, False if it finished normally"""
+
     backup_bundles: Optional[List[BundleDetails]]
+    """If we've created backups of this bundle, this is the list of them"""
+
     failure_count: int
+    """How many times has this bundle failed already?"""
+
+    def __repr__(self):
+        uuid = self.uuid
+        if uuid[-9:-2] == '_backup':
+            uuid = uuid[:-9]
+            suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
+        else:
+            suffix = uuid[-6:]
+
+        # We colorize the uuid based on some bits from it to make them
+        # stand out in the logging and help a reader correlate log messages
+        # related to the same bundle.
+        colorz = [
+            fg('violet red'),
+            fg('red'),
+            fg('orange'),
+            fg('peach orange'),
+            fg('yellow'),
+            fg('marigold yellow'),
+            fg('green yellow'),
+            fg('tea green'),
+            fg('cornflower blue'),
+            fg('turquoise blue'),
+            fg('tropical blue'),
+            fg('lavender purple'),
+            fg('medium purple'),
+        ]
+        c = colorz[int(uuid[-2:], 16) % len(colorz)]
+        function_name = self.function_name if self.function_name is not None else 'nofname'
+        machine = self.machine if self.machine is not None else 'nomachine'
+        return f'{c}{suffix}/{function_name}/{machine}{reset()}'
 
 
 class RemoteExecutorStatus:
+    """A status 'scoreboard' for a remote executor tracking various
+    metrics and able to render a periodic dump of global state.
+    """
+
     def __init__(self, total_worker_count: int) -> None:
-        self.worker_count = total_worker_count
+        """C'tor.
+
+        Args:
+            total_worker_count: number of workers in the pool
+
+        """
+        self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_time: float = time.time()
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord,
-            List[float]
-        ] = {}
-        self.in_flight_bundles_by_worker: Dict[
-            RemoteWorkerRecord,
-            Set[str]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
+        self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
         self.last_periodic_dump: Optional[float] = None
-        self.total_bundles_submitted = 0
+        self.total_bundles_submitted: int = 0
 
         # Protects reads and modification using self.  Also used
         # as a memory fence for modifications to bundle.
-        self.lock = threading.Lock()
+        self.lock: threading.Lock = threading.Lock()
 
-    def record_acquire_worker(
-            self,
-            worker: RemoteWorkerRecord,
-            uuid: str
-    ) -> None:
+    def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Record that bundle with uuid is assigned to a particular worker.
+
+        Args:
+            worker: the record of the worker to which uuid is assigned
+            uuid: the uuid of a bundle that has been assigned to a worker
+        """
         with self.lock:
-            self.record_acquire_worker_already_locked(
-                worker,
-                uuid
-            )
+            self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-            self,
-            worker: RemoteWorkerRecord,
-            uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
+        """Same as above but an entry point that doesn't acquire the lock
+        for codepaths where it's already held."""
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -285,51 +438,53 @@ class RemoteExecutorStatus:
         x.add(uuid)
         self.in_flight_bundles_by_worker[worker] = x
 
-    def record_bundle_details(
-            self,
-            details: BundleDetails) -> None:
+    def record_bundle_details(self, details: BundleDetails) -> None:
+        """Register the details about a bundle of work."""
         with self.lock:
             self.record_bundle_details_already_locked(details)
 
-    def record_bundle_details_already_locked(
-            self,
-            details: BundleDetails) -> None:
+    def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         self.bundle_details_by_uuid[details.uuid] = details
 
     def record_release_worker(
-            self,
-            worker: RemoteWorkerRecord,
-            uuid: str,
-            was_cancelled: bool,
+        self,
+        worker: RemoteWorkerRecord,
+        uuid: str,
+        was_cancelled: bool,
     ) -> None:
+        """Record that a bundle has released a worker."""
         with self.lock:
-            self.record_release_worker_already_locked(
-                worker, uuid, was_cancelled
-            )
+            self.record_release_worker_already_locked(worker, uuid, was_cancelled)
 
     def record_release_worker_already_locked(
-            self,
-            worker: RemoteWorkerRecord,
-            uuid: str,
-            was_cancelled: bool,
+        self,
+        worker: RemoteWorkerRecord,
+        uuid: str,
+        was_cancelled: bool,
     ) -> None:
+        """Same as above but for codepaths that already hold the lock."""
         assert self.lock.locked()
         ts = time.time()
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
-            x = self.finished_bundle_timings_per_worker.get(worker, list())
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
+            x = self.finished_bundle_timings_per_worker.get(worker, [])
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
 
     def record_processing_began(self, uuid: str):
+        """Record when work on a bundle begins."""
         with self.lock:
             self.start_per_bundle[uuid] = time.time()
 
     def total_in_flight(self) -> int:
+        """How many bundles are in flight currently?"""
         assert self.lock.locked()
         total_in_flight = 0
         for worker in self.known_workers:
@@ -337,6 +492,7 @@ class RemoteExecutorStatus:
         return total_in_flight
 
     def total_idle(self) -> int:
+        """How many idle workers are there currently?"""
         assert self.lock.locked()
         return self.worker_count - self.total_in_flight()
 
@@ -350,13 +506,14 @@ class RemoteExecutorStatus:
         if len(self.finished_bundle_timings) > 1:
             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
             ret += (
-                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
+                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
                 f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
         else:
             ret += (
-                f' ✅={total_finished}/{self.total_bundles_submitted}, '
+                f'⏱={ts-self.start_time:.1f}s, '
+                f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
 
@@ -376,36 +533,31 @@ class RemoteExecutorStatus:
             if in_flight > 0:
                 ret += f'    ...{in_flight} bundles currently in flight:\n'
                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
-                    details = self.bundle_details_by_uuid.get(
-                        bundle_uuid,
-                        None
-                    )
-                    pid = str(details.pid) if details is not None else "TBD"
+                    details = self.bundle_details_by_uuid.get(bundle_uuid, None)
+                    pid = str(details.pid) if (details and details.pid != 0) else "TBD"
                     if self.start_per_bundle[bundle_uuid] is not None:
                         sec = ts - self.start_per_bundle[bundle_uuid]
-                        ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
+                        ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
                     else:
-                        ret += f'       {bundle_uuid} setting up / copying data...'
+                        ret += f'       {details} setting up / copying data...'
                         sec = 0.0
 
                     if qworker is not None:
                         if sec > qworker[1]:
                             ret += f'{bg("red")}>💻p95{reset()} '
-                        elif sec > qworker[0]:
-                            ret += f'{fg("red")}>💻p50{reset()} '
-                    if qall is not None:
-                        if sec > qall[1] * 1.5:
-                            ret += f'{bg("red")}!!!{reset()}'
                             if details is not None:
-                                logger.debug(f'Flagging {details.uuid} for another backup')
-                                details.super_slow = True
-                        elif sec > qall[1]:
+                                details.slower_than_local_p95 = True
+                        else:
+                            if details is not None:
+                                details.slower_than_local_p95 = False
+
+                    if qall is not None:
+                        if sec > qall[1]:
                             ret += f'{bg("red")}>∀p95{reset()} '
                             if details is not None:
-                                logger.debug(f'Flagging {details.uuid} for a backup')
-                                details.too_slow = True
-                        elif sec > qall[0]:
-                            ret += f'{fg("red")}>∀p50{reset()}'
+                                details.slower_than_global_p95 = True
+                        else:
+                            details.slower_than_global_p95 = False
                     ret += '\n'
         return ret
 
@@ -413,17 +565,18 @@ class RemoteExecutorStatus:
         assert self.lock.locked()
         self.total_bundles_submitted = total_bundles_submitted
         ts = time.time()
-        if (
-                self.last_periodic_dump is None
-                or ts - self.last_periodic_dump > 5.0
-        ):
+        if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
             print(self)
             self.last_periodic_dump = ts
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    def register_worker_pool(self, workers):
-        random.seed()
+    """A policy for selecting a remote worker base class."""
+
+    def __init__(self):
+        self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+    def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
         self.workers = workers
 
     @abstractmethod
@@ -431,79 +584,130 @@ class RemoteWorkerSelectionPolicy(ABC):
         pass
 
     @abstractmethod
-    def acquire_worker(
-            self,
-            machine_to_avoid = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         pass
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that uses weighted RNG."""
+
+    @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
-    def acquire_worker(
-            self,
-            machine_to_avoid = None
-    ) -> Optional[RemoteWorkerRecord]:
+    @overrides
+    def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
-        for worker in self.workers:
-            for x in range(0, worker.count):
-                for y in range(0, worker.weight):
-                    grabbag.append(worker)
-
-        for _ in range(0, 5):
-            random.shuffle(grabbag)
-            worker = grabbag[0]
-            if worker.machine != machine_to_avoid or _ > 2:
-                if worker.count > 0:
-                    worker.count -= 1
-                    logger.debug(f'Selected worker {worker}')
-                    return worker
-        logger.warning("Couldn't find a worker; go fish.")
-        return None
+        if self.workers:
+            for worker in self.workers:
+                if worker.machine != machine_to_avoid:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+            if self.workers:
+                for worker in self.workers:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.warning('There are no available workers?!')
+            return None
+
+        worker = random.sample(grabbag, 1)[0]
+        assert worker.count > 0
+        worker.count -= 1
+        logger.debug('Selected worker %s', worker)
+        return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that just round robins."""
+
     def __init__(self) -> None:
+        super().__init__()
         self.index = 0
 
+    @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
-    def acquire_worker(
-            self,
-            machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
-        x = self.index
-        while True:
-            worker = self.workers[x]
-            if worker.count > 0:
-                worker.count -= 1
+    @overrides
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        if self.workers:
+            x = self.index
+            while True:
+                worker = self.workers[x]
+                if worker.count > 0:
+                    worker.count -= 1
+                    x += 1
+                    if x >= len(self.workers):
+                        x = 0
+                    self.index = x
+                    logger.debug('Selected worker %s', worker)
+                    return worker
                 x += 1
                 if x >= len(self.workers):
                     x = 0
-                self.index = x
-                logger.debug(f'Selected worker {worker}')
-                return worker
-            x += 1
-            if x >= len(self.workers):
-                x = 0
-            if x == self.index:
-                logger.warning("Couldn't find a worker; go fish.")
-                return None
+                if x == self.index:
+                    logger.warning('Unexpectedly could not find a worker, retrying...')
+                    return None
+        return None
 
 
 class RemoteExecutor(BaseExecutor):
-    def __init__(self,
-                 workers: List[RemoteWorkerRecord],
-                 policy: RemoteWorkerSelectionPolicy) -> None:
+    """An executor that uses processes on remote machines to do work.  This
+    works by creating "bundles" of work with pickled code in each to be
+    executed.  Each bundle is assigned a remote worker based on some policy
+    heuristics.  Once assigned to a remote worker, a local subprocess is
+    created.  It copies the pickled code to the remote machine via ssh/scp
+    and then starts up work on the remote machine again using ssh.  When
+    the work is complete it copies the results back to the local machine.
+
+    So there is essentially one "controller" machine (which may also be
+    in the remote executor pool and therefore do task work in addition to
+    controlling) and N worker machines.  This code runs on the controller
+    whereas on the worker machines we invoke pickled user code via a
+    shim in :file:`remote_worker.py`.
+
+    Some redundancy and safety provisions are made; e.g. slower than
+    expected tasks have redundant backups created and if a task fails
+    repeatedly we consider it poisoned and give up on it.
+
+    .. warning::
+
+        The network overhead / latency of copying work from the
+        controller machine to the remote workers is relatively high.
+        This executor probably only makes sense to use with
+        computationally expensive tasks such as jobs that will execute
+        for ~30 seconds or longer.
+
+    See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+    """
+
+    def __init__(
+        self,
+        workers: List[RemoteWorkerRecord],
+        policy: RemoteWorkerSelectionPolicy,
+    ) -> None:
+        """C'tor.
+
+        Args:
+            workers: A list of remote workers we can call on to do tasks.
+            policy: A policy for selecting remote workers for tasks.
+        """
+
         super().__init__()
         self.workers = workers
         self.policy = policy
@@ -516,113 +720,194 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
+        logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
         )
         self.status = RemoteExecutorStatus(self.worker_count)
         self.total_bundles_submitted = 0
+        self.backup_lock = threading.Lock()
+        self.last_backup = None
+        (
+            self.heartbeat_thread,
+            self.heartbeat_stop_event,
+        ) = self._run_periodic_heartbeat()
+        self.already_shutdown = False
+
+    @background_thread
+    def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
+        """
+        We create a background thread to invoke :meth:`_heartbeat` regularly
+        while we are scheduling work.  It does some accounting such as
+        looking for slow bundles to tag for backup creation, checking for
+        unexpected failures, and printing a fancy message on stdout.
+        """
+        while not stop_event.is_set():
+            time.sleep(5.0)
+            logger.debug('Running periodic heartbeat code...')
+            self._heartbeat()
+        logger.debug('Periodic heartbeat thread shutting down.')
+
+    def _heartbeat(self) -> None:
+        # Note: this is invoked on a background thread, not an
+        # executor thread.  Be careful what you do with it b/c it
+        # needs to get back and dump status again periodically.
+        with self.status.lock:
+            self.status.periodic_dump(self.total_bundles_submitted)
 
-    def bundle_prefix(self, bundle: BundleDetails) -> str:
-        colorz = [
-            fg('violet red'),
-            fg('red'),
-            fg('orange'),
-            fg('peach orange'),
-            fg('yellow'),
-            fg('marigold yellow'),
-            fg('green yellow'),
-            fg('tea green'),
-            fg('cornflower blue'),
-            fg('turquoise blue'),
-            fg('tropical blue'),
-            fg('lavender purple'),
-            fg('medium purple'),
-        ]
-        c = colorz[int(bundle.uuid[-2:], 16) % len(colorz)]
-        fname = bundle.fname if bundle.fname is not None else 'nofname'
-        machine = bundle.machine if bundle.machine is not None else 'nomachine'
-        return f'{c}{bundle.uuid[-8:]}/{fname}/{machine}{reset()}'
+            # Look for bundles to reschedule via executor.submit
+            if config.config['executors_schedule_remote_backups']:
+                self._maybe_schedule_backup_bundles()
 
-    def is_worker_available(self) -> bool:
+    def _maybe_schedule_backup_bundles(self):
+        """Maybe schedule backup bundles if we see a very slow bundle."""
+
+        assert self.status.lock.locked()
+        num_done = len(self.status.finished_bundle_timings)
+        num_idle_workers = self.worker_count - self.task_count
+        now = time.time()
+        if (
+            num_done >= 2
+            and num_idle_workers > 0
+            and (self.last_backup is None or (now - self.last_backup > 9.0))
+            and self.backup_lock.acquire(blocking=False)
+        ):
+            try:
+                assert self.backup_lock.locked()
+
+                bundle_to_backup = None
+                best_score = None
+                for (
+                    worker,
+                    bundle_uuids,
+                ) in self.status.in_flight_bundles_by_worker.items():
+
+                    # Prefer to schedule backups of bundles running on
+                    # slower machines.
+                    base_score = 0
+                    for record in self.workers:
+                        if worker.machine == record.machine:
+                            base_score = float(record.weight)
+                            base_score = 1.0 / base_score
+                            base_score *= 200.0
+                            base_score = int(base_score)
+                            break
+
+                    for uuid in bundle_uuids:
+                        bundle = self.status.bundle_details_by_uuid.get(uuid, None)
+                        if (
+                            bundle is not None
+                            and bundle.src_bundle is None
+                            and bundle.backup_bundles is not None
+                        ):
+                            score = base_score
+
+                            # Schedule backups of bundles running
+                            # longer; especially those that are
+                            # unexpectedly slow.
+                            start_ts = self.status.start_per_bundle[uuid]
+                            if start_ts is not None:
+                                runtime = now - start_ts
+                                score += runtime
+                                logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
+
+                                if bundle.slower_than_local_p95:
+                                    score += runtime / 2
+                                    logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
+
+                                if bundle.slower_than_global_p95:
+                                    score += runtime / 4
+                                    logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
+
+                            # Prefer backups of bundles that don't
+                            # have backups already.
+                            backup_count = len(bundle.backup_bundles)
+                            if backup_count == 0:
+                                score *= 2
+                            elif backup_count == 1:
+                                score /= 2
+                            elif backup_count == 2:
+                                score /= 8
+                            else:
+                                score = 0
+                            logger.debug(
+                                'score[%s] => %.1f  # {backup_count} dup backup factor',
+                                bundle,
+                                score,
+                            )
+
+                            if score != 0 and (best_score is None or score > best_score):
+                                bundle_to_backup = bundle
+                                assert bundle is not None
+                                assert bundle.backup_bundles is not None
+                                assert bundle.src_bundle is None
+                                best_score = score
+
+                # Note: this is all still happening on the heartbeat
+                # runner thread.  That's ok because
+                # _schedule_backup_for_bundle uses the executor to
+                # submit the bundle again which will cause it to be
+                # picked up by a worker thread and allow this thread
+                # to return to run future heartbeats.
+                if bundle_to_backup is not None:
+                    self.last_backup = now
+                    logger.info(
+                        '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+                        bundle_to_backup,
+                        best_score,
+                    )
+                    self._schedule_backup_for_bundle(bundle_to_backup)
+            finally:
+                self.backup_lock.release()
+
+    def _is_worker_available(self) -> bool:
+        """Is there a worker available currently?"""
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-            self,
-            machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        """Try to acquire a worker."""
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-            self,
-            machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
+        """Find a worker or block until one becomes available."""
         with self.cv:
-            while not self.is_worker_available():
+            while not self._is_worker_available():
                 self.cv.wait()
-            worker = self.acquire_worker(machine_to_avoid)
+            worker = self._acquire_worker(machine_to_avoid)
             if worker is not None:
                 return worker
         msg = "We should never reach this point in the code"
         logger.critical(msg)
         raise Exception(msg)
 
-    def release_worker(self, worker: RemoteWorkerRecord) -> None:
-        logger.debug(f'Released worker {worker}')
+    def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
+        """Release a previously acquired worker."""
+        worker = bundle.worker
+        assert worker is not None
+        logger.debug('Released worker %s', worker)
+        self.status.record_release_worker(
+            worker,
+            bundle.uuid,
+            was_cancelled,
+        )
         with self.cv:
             worker.count += 1
             self.cv.notify()
+        self.adjust_task_count(-1)
 
-    def heartbeat(self) -> None:
-        with self.status.lock:
-            # Regular progress report
-            self.status.periodic_dump(self.total_bundles_submitted)
-
-            # Look for bundles to reschedule.
-            num_done = len(self.status.finished_bundle_timings)
-            if num_done > 7 or (num_done > 5 and self.is_worker_available()):
-                for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
-                    for uuid in bundle_uuids:
-                        bundle = self.status.bundle_details_by_uuid.get(uuid, None)
-                        if (
-                                bundle is not None and
-                                bundle.too_slow and
-                                bundle.src_bundle is None and
-                                config.config['executors_schedule_remote_backups']
-                        ):
-                            self.consider_backup_for_bundle(bundle)
-
-    def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
-        assert self.status.lock.locked()
-        if (
-            bundle.too_slow
-            and len(bundle.backup_bundles) == 0       # one backup per
-        ):
-            msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
-            logger.debug(msg)
-            self.schedule_backup_for_bundle(bundle)
-            return
-        elif (
-                bundle.super_slow
-                and len(bundle.backup_bundles) < 2    # two backups in dire situations
-                and self.status.total_idle() > 4
-        ):
-            msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
-            logger.debug(msg)
-            self.schedule_backup_for_bundle(bundle)
-            return
-
-    def check_if_cancelled(self, bundle: BundleDetails) -> bool:
+    def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
+        """See if a particular bundle is cancelled.  Do not block."""
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
-                logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+                logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
                 bundle.was_cancelled = True
                 return True
         return False
 
-    def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
+    def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -634,33 +919,26 @@ class RemoteExecutor(BaseExecutor):
             avoid_machine = bundle.src_bundle.machine
         worker = None
         while worker is None:
-            worker = self.find_available_worker_or_block(avoid_machine)
+            worker = self._find_available_worker_or_block(avoid_machine)
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
-        fname = bundle.fname
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{self.bundle_prefix(bundle)}: Running bundle on {worker}...')
+        logger.debug('%s: Running bundle on %s...', bundle, worker)
 
         # Before we do any work, make sure the bundle is still viable.
-        if self.check_if_cancelled(bundle):
+        # It may have been some time between when it was submitted and
+        # now due to lack of worker availability and someone else may
+        # have already finished it.
+        if self._check_if_cancelled(bundle):
             try:
-                return self.post_launch_work(bundle)
+                return self._process_work_result(bundle)
             except Exception as e:
-                logger.exception(e)
-                logger.error(
-                    f'{self.bundle_prefix(bundle)}: bundle says it\'s cancelled upfront but no results?!'
-                )
-                assert bundle.worker is not None
-                self.status.record_release_worker(
-                    bundle.worker,
-                    bundle.uuid,
-                    True,
-                )
-                self.release_worker(bundle.worker)
-                self.adjust_task_count(-1)
+                logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
+                self._release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                     # bundle.  For it to have been cancelled, a backup
@@ -669,79 +947,103 @@ class RemoteExecutor(BaseExecutor):
                     # it is done but we can't find the results it
                     # should have copied over.  Reschedule the whole
                     # thing.
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    logger.exception(e)
+                    logger.error(
+                        '%s: We are the original owner thread and yet there are '
+                        'no results for this bundle.  This is unexpected and bad.',
+                        bundle,
+                    )
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in post_launch_work (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
-                logger.info(f"{self.bundle_prefix(bundle)}: Copying work to {worker} via {cmd}.")
+                logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.info(f"{self.bundle_prefix(bundle)}: Copying done to {worker} in {xfer_latency:.1f}s.")
+                logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
-                logger.exception(e)
-                logger.error(
-                    f'{self.bundle_prefix(bundle)}: failed to send instructions to worker machine?!?'
-                )
-                assert bundle.worker is not None
-                self.status.record_release_worker(
-                    bundle.worker,
-                    bundle.uuid,
-                    True,
-                )
-                self.release_worker(bundle.worker)
-                self.adjust_task_count(-1)
+                self._release_worker(bundle)
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
-                    return self.emergency_retry_nasty_bundle(bundle)
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
+                    logger.exception(e)
+                    logger.error(
+                        "%s: Failed to send instructions to the worker machine?! "
+                        "This is not expected; we\'re the original bundle so this shouldn\'t "
+                        "be a race condition.  Attempting an emergency retry...",
+                        bundle,
+                    )
+                    return self._emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
+                    # code_file before we could copy it.  Ignore.
+                    logger.warning(
+                        '%s: Failed to send instructions to the worker machine... '
+                        'We\'re a backup and this may be caused by the original (or '
+                        'some other backup) already finishing this work.  Ignoring.',
+                        bundle,
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
-        # wait_for_process deal with it.
+        # _wait_for_process deal with it.
         self.status.record_processing_began(uuid)
-        cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
-               f'"source py39-venv/bin/activate &&'
-               f' /home/scott/lib/python_modules/remote_worker.py'
-               f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
-        logger.debug(f'{self.bundle_prefix(bundle)}: Executing {cmd} in the background to kick off work...')
+        cmd = (
+            f'{SSH} {bundle.username}@{bundle.machine} '
+            f'"source py39-venv/bin/activate &&'
+            f' /home/scott/lib/python_modules/remote_worker.py'
+            f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
+        )
+        logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
         p = cmd_in_background(cmd, silent=True)
-        bundle.pid = pid = p.pid
-        logger.debug(f'{self.bundle_prefix(bundle)}: Local ssh process pid={pid}; remote worker is {machine}.')
-        return self.wait_for_process(p, bundle, 0)
+        bundle.pid = p.pid
+        logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
+        return self._wait_for_process(p, bundle, 0)
+
+    def _wait_for_process(
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
+    ) -> Any:
+        """At this point we've copied the bundle's pickled code to the remote
+        worker and started an ssh process that should be invoking the
+        remote worker to have it execute the user's code.  See how
+        that's going and wait for it to complete or fail.  Note that
+        this code is recursive: there are codepaths where we decide to
+        stop waiting for an ssh process (because another backup seems
+        to have finished) but then fail to fetch or parse the results
+        from that backup and thus call ourselves to continue waiting
+        on an active ssh process.  This is the purpose of the depth
+        argument: to curtail potential infinite recursion by giving up
+        eventually.
+
+        Args:
+            p: the Popen record of the ssh job
+            bundle: the bundle of work being executed remotely
+            depth: how many retries we've made so far.  Starts at zero.
+
+        """
 
-    def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
-        uuid = bundle.uuid
         machine = bundle.machine
-        fname = bundle.fname
-        pid = p.pid
+        assert p is not None
+        pid = p.pid  # pid of the ssh process
         if depth > 3:
             logger.error(
-                f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+                "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
-            self.status.record_release_worker(
-                bundle.worker,
-                bundle.uuid,
-                True,
-            )
-            self.release_worker(bundle.worker)
-            self.adjust_task_count(-1)
-            return self.emergency_retry_nasty_bundle(bundle)
+            self._release_worker(bundle)
+            return self._emergency_retry_nasty_bundle(bundle)
 
         # Spin until either the ssh job we scheduled finishes the
         # bundle or some backup worker signals that they finished it
@@ -750,25 +1052,20 @@ class RemoteExecutor(BaseExecutor):
             try:
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
-                self.heartbeat()
-                if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{self.bundle_prefix(bundle)}: another worker finished bundle, checking it out...'
-                    )
+                if self._check_if_cancelled(bundle):
+                    logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
-                logger.info(
-                    f"{self.bundle_prefix(bundle)}: pid {pid} ({machine}) our ssh finished, checking it out..."
-                )
+                logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
                 p = None
                 break
 
         # If we get here we believe the bundle is done; either the ssh
         # subprocess finished (hopefully successfully) or we noticed
         # that some other worker seems to have completed the bundle
-        # and we're bailing out.
+        # before us and we're bailing out.
         try:
-            ret = self.post_launch_work(bundle)
+            ret = self._process_work_result(bundle)
             if ret is not None and p is not None:
                 p.terminate()
             return ret
@@ -780,23 +1077,19 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{self.bundle_prefix(bundle)}: Something unexpected just happened...')
+            logger.error('%s: Something unexpected just happened...', bundle)
             if p is not None:
                 logger.warning(
-                    f"{self.bundle_prefix(bundle)}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+                    "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
                 )
-                return self.wait_for_process(p, bundle, depth + 1)
+                return self._wait_for_process(p, bundle, depth + 1)
             else:
-                self.status.record_release_worker(
-                    bundle.worker,
-                    bundle.uuid,
-                    True,
-                )
-                self.release_worker(bundle.worker)
-                self.adjust_task_count(-1)
-                return self.emergency_retry_nasty_bundle(bundle)
+                self._release_worker(bundle)
+                return self._emergency_retry_nasty_bundle(bundle)
+
+    def _process_work_result(self, bundle: BundleDetails) -> Any:
+        """A bundle seems to be completed.  Check on the results."""
 
-    def post_launch_work(self, bundle: BundleDetails) -> Any:
         with self.status.lock:
             is_original = bundle.src_bundle is None
             was_cancelled = bundle.was_cancelled
@@ -804,8 +1097,6 @@ class RemoteExecutor(BaseExecutor):
             machine = bundle.machine
             result_file = bundle.result_file
             code_file = bundle.code_file
-            fname = bundle.fname
-            uuid = bundle.uuid
 
             # Whether original or backup, if we finished first we must
             # fetch the results if the computation happened on a
@@ -814,16 +1105,33 @@ class RemoteExecutor(BaseExecutor):
             if not was_cancelled:
                 assert bundle.machine is not None
                 if bundle.hostname not in bundle.machine:
-                    cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
+                    cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{self.bundle_prefix(bundle)}: Fetching results from {username}@{machine} via {cmd}"
+                        "%s: Fetching results back from %s@%s via %s",
+                        bundle,
+                        username,
+                        machine,
+                        cmd,
                     )
 
                     # If either of these throw they are handled in
-                    # wait_for_process.
-                    run_silently(cmd)
-                    run_silently(f'{SSH} {username}@{machine}'
-                                 f' "/bin/rm -f {code_file} {result_file}"')
+                    # _wait_for_process.
+                    attempts = 0
+                    while True:
+                        try:
+                            run_silently(cmd)
+                        except Exception as e:
+                            attempts += 1
+                            if attempts >= 3:
+                                raise e
+                        else:
+                            break
+
+                    # Cleanup remote /tmp files.
+                    run_silently(
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
+                    )
+                    logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -832,32 +1140,25 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
-            logger.debug(f"{self.bundle_prefix(bundle)}: Unpickling {result_file}.")
+            logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
-                with open(f'{result_file}', 'rb') as rb:
+                with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
-                msg = f'Failed to load {result_file}, this is bad news.'
-                logger.critical(msg)
-                self.status.record_release_worker(
-                    bundle.worker,
-                    bundle.uuid,
-                    True,
-                )
-                self.release_worker(bundle.worker)
-
-                # Re-raise the exception; the code in wait_for_process may
-                # decide to emergency_retry_nasty_bundle here.
-                raise Exception(e)
+                logger.exception(e)
+                logger.error('Failed to load %s... this is bad news.', result_file)
+                self._release_worker(bundle)
 
-            logger.debug(
-                f'Removing local (master) {code_file} and {result_file}.'
-            )
-            os.remove(f'{result_file}')
-            os.remove(f'{code_file}')
+                # Re-raise the exception; the code in _wait_for_process may
+                # decide to _emergency_retry_nasty_bundle here.
+                raise e
+            logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+            os.remove(result_file)
+            os.remove(code_file)
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
@@ -866,7 +1167,7 @@ class RemoteExecutor(BaseExecutor):
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
-                        f'{self.bundle_prefix(bundle)}: Notifying backup {backup.uuid} that it\'s cancelled'
+                        '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
                     )
                     backup.is_cancelled.set()
 
@@ -880,102 +1181,110 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
                 logger.debug(
-                    f'{self.bundle_prefix(bundle)}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+                    '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
-                bundle.src_bundle.is_cancelled.set()
-
-        assert bundle.worker is not None
-        self.status.record_release_worker(
-            bundle.worker,
-            bundle.uuid,
-            was_cancelled,
-        )
-        self.release_worker(bundle.worker)
-        self.adjust_task_count(-1)
+                orig_bundle.is_cancelled.set()
+        self._release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
-    def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
-        uuid = generate_uuid(as_hex=True)
+    def _create_original_bundle(self, pickle, function_name: str):
+        """Creates a bundle that is not a backup of any other bundle but
+        rather represents a user task.
+        """
+
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
-        logger.debug(f'Writing pickled code to {code_file}')
-        with open(f'{code_file}', 'wb') as wb:
+        logger.debug('Writing pickled code to %s', code_file)
+        with open(code_file, 'wb') as wb:
             wb.write(pickle)
 
         bundle = BundleDetails(
-            pickled_code = pickle,
-            uuid = uuid,
-            fname = fname,
-            worker = None,
-            username = None,
-            machine = None,
-            hostname = platform.node(),
-            code_file = code_file,
-            result_file = result_file,
-            pid = 0,
-            start_ts = time.time(),
-            end_ts = 0.0,
-            too_slow = False,
-            super_slow = False,
-            src_bundle = None,
-            is_cancelled = threading.Event(),
-            was_cancelled = False,
-            backup_bundles = [],
-            failure_count = 0,
+            pickled_code=pickle,
+            uuid=uuid,
+            function_name=function_name,
+            worker=None,
+            username=None,
+            machine=None,
+            hostname=platform.node(),
+            code_file=code_file,
+            result_file=result_file,
+            pid=0,
+            start_ts=time.time(),
+            end_ts=0.0,
+            slower_than_local_p95=False,
+            slower_than_global_p95=False,
+            src_bundle=None,
+            is_cancelled=threading.Event(),
+            was_cancelled=False,
+            backup_bundles=[],
+            failure_count=0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{self.bundle_prefix(bundle)}: Created an original bundle')
+        logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
-    def create_backup_bundle(self, src_bundle: BundleDetails):
+    def _create_backup_bundle(self, src_bundle: BundleDetails):
+        """Creates a bundle that is a backup of another bundle that is
+        running too slowly."""
+
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
 
         backup_bundle = BundleDetails(
-            pickled_code = src_bundle.pickled_code,
-            uuid = uuid,
-            fname = src_bundle.fname,
-            worker = None,
-            username = None,
-            machine = None,
-            hostname = src_bundle.hostname,
-            code_file = src_bundle.code_file,
-            result_file = src_bundle.result_file,
-            pid = 0,
-            start_ts = time.time(),
-            end_ts = 0.0,
-            too_slow = False,
-            super_slow = False,
-            src_bundle = src_bundle,
-            is_cancelled = threading.Event(),
-            was_cancelled = False,
-            backup_bundles = None,    # backup backups not allowed
-            failure_count = 0,
+            pickled_code=src_bundle.pickled_code,
+            uuid=uuid,
+            function_name=src_bundle.function_name,
+            worker=None,
+            username=None,
+            machine=None,
+            hostname=src_bundle.hostname,
+            code_file=src_bundle.code_file,
+            result_file=src_bundle.result_file,
+            pid=0,
+            start_ts=time.time(),
+            end_ts=0.0,
+            slower_than_local_p95=False,
+            slower_than_global_p95=False,
+            src_bundle=src_bundle,
+            is_cancelled=threading.Event(),
+            was_cancelled=False,
+            backup_bundles=None,  # backup backups not allowed
+            failure_count=0,
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{self.bundle_prefix(bundle)}: Created a backup bundle')
+        logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
-    def schedule_backup_for_bundle(self,
-                                   src_bundle: BundleDetails):
+    def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
+        """Schedule a backup of src_bundle."""
+
         assert self.status.lock.locked()
-        backup_bundle = self.create_backup_bundle(src_bundle)
+        assert src_bundle is not None
+        backup_bundle = self._create_backup_bundle(src_bundle)
         logger.debug(
-            f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+            '%s/%s: Scheduling backup for execution...',
+            backup_bundle.uuid,
+            backup_bundle.function_name,
         )
-        self._helper_executor.submit(self.launch, backup_bundle)
+        self._helper_executor.submit(self._launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
+
+    def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
+        """Something unexpectedly failed with bundle.  Either retry it
+        from the beginning or throw in the towel and give up on it."""
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
-        uuid = bundle.uuid
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -989,52 +1298,142 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{self.bundle_prefix(bundle)}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                '%s: Tried this bundle too many times already (%dx); giving up.',
+                bundle,
+                retry_limit,
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{self.bundle_prefix(bundle)}: This bundle can\'t be completed despite several backups and retries'
+                    f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
             else:
-                logger.error(f'{self.bundle_prefix(bundle)}: At least it\'s only a backup; better luck with the others.')
+                logger.error(
+                    '%s: At least it\'s only a backup; better luck with the others.', bundle
+                )
             return None
         else:
-            logger.warning(
-                f'>>> Emergency rescheduling {self.bundle_prefix(bundle)} because of unexected errors (wtf?!) <<<'
-            )
-            return self.launch(bundle, avoid_last_machine)
+            msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
+            logger.warning(msg)
+            warnings.warn(msg)
+            return self._launch(bundle, avoid_last_machine)
 
     @overrides
-    def submit(self,
-               function: Callable,
-               *args,
-               **kwargs) -> fut.Future:
-        pickle = make_cloud_pickle(function, *args, **kwargs)
-        bundle = self.create_original_bundle(pickle, function.__name__)
+    def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        """Submit work to be done.  This is the user entry point of this
+        class."""
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
+        pickle = _make_cloud_pickle(function, *args, **kwargs)
+        bundle = self._create_original_bundle(pickle, function.__name__)
         self.total_bundles_submitted += 1
-        return self._helper_executor.submit(self.launch, bundle)
+        return self._helper_executor.submit(self._launch, bundle)
+
+    @overrides
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        """Shutdown the executor."""
+        if not self.already_shutdown:
+            logging.debug('Shutting down RemoteExecutor %s', self.title)
+            self.heartbeat_stop_event.set()
+            self.heartbeat_thread.join()
+            self._helper_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
+
+
+class RemoteWorkerPoolProvider:
+    @abstractmethod
+    def get_remote_workers(self) -> List[RemoteWorkerRecord]:
+        pass
+
+
[email protected]_autoloaded_singleton()  # type: ignore
+class ConfigRemoteWorkerPoolProvider(RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent):
+    def __init__(self, json_remote_worker_pool: Dict[str, Any]):
+        self.remote_worker_pool = []
+        for record in json_remote_worker_pool['remote_worker_records']:
+            self.remote_worker_pool.append(self.dataclassFromDict(RemoteWorkerRecord, record))
+        assert len(self.remote_worker_pool) > 0
+
+    @staticmethod
+    def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
+        fieldSet = {f.name for f in fields(clsName) if f.init}
+        filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
+        return clsName(**filteredArgDict)
+
+    @overrides
+    def get_remote_workers(self) -> List[RemoteWorkerRecord]:
+        return self.remote_worker_pool
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        self._helper_executor.shutdown(wait)
-        logging.debug(f'Shutting down RemoteExecutor {self.title}')
-        print(self.histogram)
+    def get_persistent_data(self) -> List[RemoteWorkerRecord]:
+        return self.remote_worker_pool
+
+    @staticmethod
+    @overrides
+    def get_filename() -> str:
+        return config.config['remote_worker_records_file']
+
+    @staticmethod
+    @overrides
+    def should_we_load_data(filename: str) -> bool:
+        return True
+
+    @staticmethod
+    @overrides
+    def should_we_save_data(filename: str) -> bool:
+        return False
 
 
 @singleton
 class DefaultExecutors(object):
+    """A container for a default thread, process and remote executor.
+    These are not created until needed and we take care to clean up
+    before process exit automatically for the caller's convenience.
+    Instead of creating your own executor, consider using the one
+    from this pool.  e.g.::
+
+        @par.parallelize(method=par.Method.PROCESS)
+        def do_work(
+            solutions: List[Work],
+            shard_num: int,
+            ...
+        ):
+            <do the work>
+
+
+        def start_do_work(all_work: List[Work]):
+            shards = []
+            logger.debug('Sharding work into groups of 10.')
+            for subset in list_utils.shard(all_work, 10):
+                shards.append([x for x in subset])
+
+            logger.debug('Kicking off helper pool.')
+            try:
+                for n, shard in enumerate(shards):
+                    results.append(
+                        do_work(
+                            shard, n, shared_cache.get_name(), max_letter_pop_per_word
+                        )
+                    )
+                smart_future.wait_all(results)
+            finally:
+                # Note: if you forget to do this it will clean itself up
+                # during program termination including tearing down any
+                # active ssh connections.
+                executors.DefaultExecutors().process_pool().shutdown()
+    """
+
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
-    def ping(self, host) -> bool:
-        logger.debug(f'RUN> ping -c 1 {host}')
+    @staticmethod
+    def _ping(host) -> bool:
+        logger.debug('RUN> ping -c 1 %s', host)
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null',
-                timeout_seconds=1.0
-            )
+            x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
@@ -1052,85 +1451,34 @@ class DefaultExecutors(object):
     def remote_pool(self) -> RemoteExecutor:
         if self.remote_executor is None:
             logger.info('Looking for some helper machines...')
-            pool: List[RemoteWorkerRecord] = []
-            if self.ping('cheetah.house'):
-                logger.info('Found cheetah.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'cheetah.house',
-                        weight = 12,
-                        count = 4,
-                    ),
-                )
-            if self.ping('video.house'):
-                logger.info('Found video.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'video.house',
-                        weight = 1,
-                        count = 4,
-                    ),
-                )
-            if self.ping('wannabe.house'):
-                logger.info('Found wannabe.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'wannabe.house',
-                        weight = 2,
-                        count = 4,
-                    ),
-                )
-            if self.ping('meerkat.cabin'):
-                logger.info('Found meerkat.cabin')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'meerkat.cabin',
-                        weight = 5,
-                        count = 2,
-                    ),
-                )
-            if self.ping('backup.house'):
-                logger.info('Found backup.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'backup.house',
-                        weight = 1,
-                        count = 4,
-                    ),
-                )
-            if self.ping('kiosk.house'):
-                logger.info('Found kiosk.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'pi',
-                        machine = 'kiosk.house',
-                        weight = 1,
-                        count = 2,
-                    ),
-                )
-            if self.ping('puma.cabin'):
-                logger.info('Found puma.cabin')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'puma.cabin',
-                        weight = 12,
-                        count = 4,
-                    ),
-                )
+            provider = ConfigRemoteWorkerPoolProvider()
+            all_machines = provider.get_remote_workers()
+            pool = []
+
+            # Make sure we can ping each machine.
+            for record in all_machines:
+                if self._ping(record.machine):
+                    logger.info('%s is alive / responding to pings', record.machine)
+                    pool.append(record)
 
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
-                    logger.info(f'Reducing workload for {record.machine}.')
-                    record.count = 1
+                    logger.info('Reducing workload for %s.', record.machine)
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
             self.remote_executor = RemoteExecutor(pool, policy)
         return self.remote_executor
+
+    def shutdown(self) -> None:
+        if self.thread_executor is not None:
+            self.thread_executor.shutdown(wait=True, quiet=True)
+            self.thread_executor = None
+        if self.process_executor is not None:
+            self.process_executor.shutdown(wait=True, quiet=True)
+            self.process_executor = None
+        if self.remote_executor is not None:
+            self.remote_executor.shutdown(wait=True, quiet=True)
+            self.remote_executor = None