Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / executors.py
index 5b77a42dc3d29ca6f42673a369e23f0962343c62..e07933f454909d5a543e340320a578aab528d9ad 100644 (file)
@@ -1,34 +1,44 @@
 #!/usr/bin/env python3
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
 
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
 
 
-from abc import ABC, abstractmethod
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
+
+from __future__ import annotations
 import concurrent.futures as fut
 import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
 import logging
 import logging
-import numpy
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
 import os
 import platform
 import random
 import subprocess
 import threading
 import time
-from typing import Any, Callable, Dict, List, Optional, Set
 import warnings
 import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
 
 import cloudpickle  # type: ignore
+import numpy
 from overrides import overrides
 
 from overrides import overrides
 
-from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
 import argparse_utils
 import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 import histogram as hist
 import histogram as hist
+import string_utils
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
 from thread_utils import background_thread
 
 from thread_utils import background_thread
 
-
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
 logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
@@ -67,11 +77,13 @@ SCP = '/usr/bin/scp -C'
 
 
 def make_cloud_pickle(fun, *args, **kwargs):
 
 
 def make_cloud_pickle(fun, *args, **kwargs):
-    logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
+    logger.debug("Making cloudpickled bundle at %s", fun.__name__)
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
     return cloudpickle.dumps((fun, args, kwargs))
 
 
 class BaseExecutor(ABC):
+    """The base executor interface definition."""
+
     def __init__(self, *, title=''):
         self.title = title
         self.histogram = hist.SimpleHistogram(
     def __init__(self, *, title=''):
         self.title = title
         self.histogram = hist.SimpleHistogram(
@@ -84,10 +96,10 @@ class BaseExecutor(ABC):
         pass
 
     @abstractmethod
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
         pass
 
-    def shutdown_if_idle(self) -> bool:
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
         """Shutdown the executor and return True if the executor is idle
         (i.e. there are no pending or active tasks).  Return False
         otherwise.  Note: this should only be called by the launcher
@@ -95,7 +107,7 @@ class BaseExecutor(ABC):
 
         """
         if self.task_count == 0:
 
         """
         if self.task_count == 0:
-            self.shutdown()
+            self.shutdown(wait=True, quiet=quiet)
             return True
         return False
 
             return True
         return False
 
@@ -106,7 +118,7 @@ class BaseExecutor(ABC):
 
         """
         self.task_count += delta
 
         """
         self.task_count += delta
-        logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+        logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
 
     def get_task_count(self) -> int:
         """Change the task count.  Note: do not call this method from a
 
     def get_task_count(self) -> int:
         """Change the task count.  Note: do not call this method from a
@@ -118,6 +130,8 @@ class BaseExecutor(ABC):
 
 
 class ThreadExecutor(BaseExecutor):
 
 
 class ThreadExecutor(BaseExecutor):
+    """A threadpool executor instance."""
+
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
     def __init__(self, max_workers: Optional[int] = None):
         super().__init__()
         workers = None
@@ -125,15 +139,16 @@ class ThreadExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
             workers = max_workers
         elif 'executors_threadpool_size' in config.config:
             workers = config.config['executors_threadpool_size']
-        logger.debug(f'Creating threadpool executor with {workers} workers')
+        logger.debug('Creating threadpool executor with %d workers', workers)
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self.already_shutdown = False
 
     # This is run on a different thread; do not adjust task count here.
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
         self.already_shutdown = False
 
     # This is run on a different thread; do not adjust task count here.
-    def run_local_bundle(self, fun, *args, **kwargs):
-        logger.debug(f"Running local bundle at {fun.__name__}")
+    @staticmethod
+    def run_local_bundle(fun, *args, **kwargs):
+        logger.debug("Running local bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
         result = fun(*args, **kwargs)
         return result
 
@@ -148,21 +163,25 @@ class ThreadExecutor(BaseExecutor):
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
             newargs.append(arg)
         start = time.time()
         result = self._thread_pool_executor.submit(
-            self.run_local_bundle, *newargs, **kwargs
+            ThreadExecutor.run_local_bundle, *newargs, **kwargs
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
+        return result
 
     @overrides
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down threadpool executor {self.title}')
-            print(self.histogram)
+            logger.debug('Shutting down threadpool executor %s', self.title)
             self._thread_pool_executor.shutdown(wait)
             self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
             self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
+    """A processpool executor."""
+
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
     def __init__(self, max_workers=None):
         super().__init__()
         workers = None
@@ -170,16 +189,17 @@ class ProcessExecutor(BaseExecutor):
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
             workers = max_workers
         elif 'executors_processpool_size' in config.config:
             workers = config.config['executors_processpool_size']
-        logger.debug(f'Creating processpool executor with {workers} workers.')
+        logger.debug('Creating processpool executor with %d workers.', workers)
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self.already_shutdown = False
 
     # This is run in another process; do not adjust task count here.
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
         self.already_shutdown = False
 
     # This is run in another process; do not adjust task count here.
-    def run_cloud_pickle(self, pickle):
+    @staticmethod
+    def run_cloud_pickle(pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
         fun, args, kwargs = cloudpickle.loads(pickle)
-        logger.debug(f"Running pickled bundle at {fun.__name__}")
+        logger.debug("Running pickled bundle at %s", fun.__name__)
         result = fun(*args, **kwargs)
         return result
 
         result = fun(*args, **kwargs)
         return result
 
@@ -190,17 +210,18 @@ class ProcessExecutor(BaseExecutor):
         start = time.time()
         self.adjust_task_count(+1)
         pickle = make_cloud_pickle(function, *args, **kwargs)
         start = time.time()
         self.adjust_task_count(+1)
         pickle = make_cloud_pickle(function, *args, **kwargs)
-        result = self._process_executor.submit(self.run_cloud_pickle, pickle)
+        result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logger.debug(f'Shutting down processpool executor {self.title}')
+            logger.debug('Shutting down processpool executor %s', self.title)
             self._process_executor.shutdown(wait)
             self._process_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
     def __getstate__(self):
             self.already_shutdown = True
 
     def __getstate__(self):
@@ -217,6 +238,8 @@ class RemoteExecutorException(Exception):
 
 @dataclass
 class RemoteWorkerRecord:
 
 @dataclass
 class RemoteWorkerRecord:
+    """A record of info about a remote worker."""
+
     username: str
     machine: str
     weight: int
     username: str
     machine: str
     weight: int
@@ -231,6 +254,12 @@ class RemoteWorkerRecord:
 
 @dataclass
 class BundleDetails:
 
 @dataclass
 class BundleDetails:
+    """All info necessary to define some unit of work that needs to be
+    done, where it is being run, its state, whether it is an original
+    bundle of a backup bundle, how many times it has failed, etc...
+
+    """
+
     pickled_code: bytes
     uuid: str
     fname: str
     pickled_code: bytes
     uuid: str
     fname: str
@@ -245,7 +274,7 @@ class BundleDetails:
     end_ts: float
     slower_than_local_p95: bool
     slower_than_global_p95: bool
     end_ts: float
     slower_than_local_p95: bool
     slower_than_global_p95: bool
-    src_bundle: BundleDetails
+    src_bundle: Optional[BundleDetails]
     is_cancelled: threading.Event
     was_cancelled: bool
     backup_bundles: Optional[List[BundleDetails]]
     is_cancelled: threading.Event
     was_cancelled: bool
     backup_bundles: Optional[List[BundleDetails]]
@@ -281,15 +310,15 @@ class BundleDetails:
 
 
 class RemoteExecutorStatus:
 
 
 class RemoteExecutorStatus:
+    """A status 'scoreboard' for a remote executor."""
+
     def __init__(self, total_worker_count: int) -> None:
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
     def __init__(self, total_worker_count: int) -> None:
         self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
         self.start_time: float = time.time()
-        self.start_per_bundle: Dict[str, float] = defaultdict(float)
+        self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
-        self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
-        ] = {}
+        self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
@@ -304,9 +333,7 @@ class RemoteExecutorStatus:
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
         with self.lock:
             self.record_acquire_worker_already_locked(worker, uuid)
 
-    def record_acquire_worker_already_locked(
-        self, worker: RemoteWorkerRecord, uuid: str
-    ) -> None:
+    def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
         assert self.lock.locked()
         self.known_workers.add(worker)
         self.start_per_bundle[uuid] = None
@@ -342,8 +369,10 @@ class RemoteExecutorStatus:
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
         self.end_per_bundle[uuid] = ts
         self.in_flight_bundles_by_worker[worker].remove(uuid)
         if not was_cancelled:
-            bundle_latency = ts - self.start_per_bundle[uuid]
-            x = self.finished_bundle_timings_per_worker.get(worker, list())
+            start = self.start_per_bundle[uuid]
+            assert start is not None
+            bundle_latency = ts - start
+            x = self.finished_bundle_timings_per_worker.get(worker, [])
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
             x.append(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
             self.finished_bundle_timings.append(bundle_latency)
@@ -438,7 +467,12 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    def register_worker_pool(self, workers):
+    """A policy for selecting a remote worker base class."""
+
+    def __init__(self):
+        self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+    def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
         self.workers = workers
 
     @abstractmethod
         self.workers = workers
 
     @abstractmethod
@@ -451,30 +485,33 @@ class RemoteWorkerSelectionPolicy(ABC):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 
 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that uses weighted RNG."""
+
     @overrides
     def is_worker_available(self) -> bool:
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
         return False
 
     @overrides
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
-        for worker in self.workers:
-            if worker.machine != machine_to_avoid:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+        if self.workers:
+            for worker in self.workers:
+                if worker.machine != machine_to_avoid:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
 
         if len(grabbag) == 0:
-            logger.debug(
-                f'There are no available workers that avoid {machine_to_avoid}...'
-            )
-            for worker in self.workers:
-                if worker.count > 0:
-                    for _ in range(worker.count * worker.weight):
-                        grabbag.append(worker)
+            logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+            if self.workers:
+                for worker in self.workers:
+                    if worker.count > 0:
+                        for _ in range(worker.count * worker.weight):
+                            grabbag.append(worker)
 
         if len(grabbag) == 0:
             logger.warning('There are no available workers?!')
 
         if len(grabbag) == 0:
             logger.warning('There are no available workers?!')
@@ -483,46 +520,51 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         worker = random.sample(grabbag, 1)[0]
         assert worker.count > 0
         worker.count -= 1
         worker = random.sample(grabbag, 1)[0]
         assert worker.count > 0
         worker.count -= 1
-        logger.debug(f'Chose worker {worker}')
+        logger.debug('Selected worker %s', worker)
         return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
         return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+    """A remote worker selector that just round robins."""
+
     def __init__(self) -> None:
     def __init__(self) -> None:
+        super().__init__()
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
         self.index = 0
 
     @overrides
     def is_worker_available(self) -> bool:
-        for worker in self.workers:
-            if worker.count > 0:
-                return True
+        if self.workers:
+            for worker in self.workers:
+                if worker.count > 0:
+                    return True
         return False
 
     @overrides
         return False
 
     @overrides
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
-        x = self.index
-        while True:
-            worker = self.workers[x]
-            if worker.count > 0:
-                worker.count -= 1
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+        if self.workers:
+            x = self.index
+            while True:
+                worker = self.workers[x]
+                if worker.count > 0:
+                    worker.count -= 1
+                    x += 1
+                    if x >= len(self.workers):
+                        x = 0
+                    self.index = x
+                    logger.debug('Selected worker %s', worker)
+                    return worker
                 x += 1
                 if x >= len(self.workers):
                     x = 0
                 x += 1
                 if x >= len(self.workers):
                     x = 0
-                self.index = x
-                logger.debug(f'Selected worker {worker}')
-                return worker
-            x += 1
-            if x >= len(self.workers):
-                x = 0
-            if x == self.index:
-                msg = 'Unexpectedly could not find a worker, retrying...'
-                logger.warning(msg)
-                return None
+                if x == self.index:
+                    logger.warning('Unexpectedly could not find a worker, retrying...')
+                    return None
+        return None
 
 
 class RemoteExecutor(BaseExecutor):
 
 
 class RemoteExecutor(BaseExecutor):
+    """A remote work executor."""
+
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
     def __init__(
         self,
         workers: List[RemoteWorkerRecord],
@@ -540,9 +582,7 @@ class RemoteExecutor(BaseExecutor):
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
             raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
-        logger.debug(
-            f'Creating {self.worker_count} local threads, one per remote worker.'
-        )
+        logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
         self._helper_executor = fut.ThreadPoolExecutor(
             thread_name_prefix="remote_executor_helper",
             max_workers=self.worker_count,
@@ -582,8 +622,8 @@ class RemoteExecutor(BaseExecutor):
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
-            num_done > 2
-            and num_idle_workers > 1
+            num_done >= 2
+            and num_idle_workers > 0
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
@@ -624,21 +664,15 @@ class RemoteExecutor(BaseExecutor):
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
                             if start_ts is not None:
                                 runtime = now - start_ts
                                 score += runtime
-                                logger.debug(
-                                    f'score[{bundle}] => {score}  # latency boost'
-                                )
+                                logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
 
                                 if bundle.slower_than_local_p95:
                                     score += runtime / 2
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >worker p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
 
                                 if bundle.slower_than_global_p95:
                                     score += runtime / 4
-                                    logger.debug(
-                                        f'score[{bundle}] => {score}  # >global p95'
-                                    )
+                                    logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
 
                             # Prefer backups of bundles that don't
                             # have backups already.
 
                             # Prefer backups of bundles that don't
                             # have backups already.
@@ -652,12 +686,12 @@ class RemoteExecutor(BaseExecutor):
                             else:
                                 score = 0
                             logger.debug(
                             else:
                                 score = 0
                             logger.debug(
-                                f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
+                                'score[%s] => %.1f  # {backup_count} dup backup factor',
+                                bundle,
+                                score,
                             )
 
                             )
 
-                            if score != 0 and (
-                                best_score is None or score > best_score
-                            ):
+                            if score != 0 and (best_score is None or score > best_score):
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
                                 bundle_to_backup = bundle
                                 assert bundle is not None
                                 assert bundle.backup_bundles is not None
@@ -673,7 +707,9 @@ class RemoteExecutor(BaseExecutor):
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
                 if bundle_to_backup is not None:
                     self.last_backup = now
                     logger.info(
-                        f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+                        '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+                        bundle_to_backup,
+                        best_score,
                     )
                     self.schedule_backup_for_bundle(bundle_to_backup)
             finally:
                     )
                     self.schedule_backup_for_bundle(bundle_to_backup)
             finally:
@@ -682,14 +718,10 @@ class RemoteExecutor(BaseExecutor):
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
-    def acquire_worker(
-        self, machine_to_avoid: str = None
-    ) -> Optional[RemoteWorkerRecord]:
+    def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
         return self.policy.acquire_worker(machine_to_avoid)
 
         return self.policy.acquire_worker(machine_to_avoid)
 
-    def find_available_worker_or_block(
-        self, machine_to_avoid: str = None
-    ) -> RemoteWorkerRecord:
+    def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
         with self.cv:
             while not self.is_worker_available():
                 self.cv.wait()
@@ -703,7 +735,7 @@ class RemoteExecutor(BaseExecutor):
     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
         worker = bundle.worker
         assert worker is not None
     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
         worker = bundle.worker
         assert worker is not None
-        logger.debug(f'Released worker {worker}')
+        logger.debug('Released worker %s', worker)
         self.status.record_release_worker(
             worker,
             bundle.uuid,
         self.status.record_release_worker(
             worker,
             bundle.uuid,
@@ -717,13 +749,14 @@ class RemoteExecutor(BaseExecutor):
     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
         with self.status.lock:
             if bundle.is_cancelled.wait(timeout=0.0):
-                logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
+                logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
                 bundle.was_cancelled = True
                 return True
         return False
 
     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
                 bundle.was_cancelled = True
                 return True
         return False
 
     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -736,14 +769,14 @@ class RemoteExecutor(BaseExecutor):
         worker = None
         while worker is None:
             worker = self.find_available_worker_or_block(avoid_machine)
         worker = None
         while worker is None:
             worker = self.find_available_worker_or_block(avoid_machine)
-        assert worker
+        assert worker is not None
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
 
         # Ok, found a worker.
         bundle.worker = worker
         machine = bundle.machine = worker.machine
         username = bundle.username = worker.username
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{bundle}: Running bundle on {worker}...')
+        logger.debug('%s: Running bundle on %s...', bundle, worker)
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
 
         # Before we do any work, make sure the bundle is still viable.
         # It may have been some time between when it was submitted and
@@ -753,9 +786,7 @@ class RemoteExecutor(BaseExecutor):
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
             try:
                 return self.process_work_result(bundle)
             except Exception as e:
-                logger.warning(
-                    f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
-                )
+                logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
                 self.release_worker(bundle)
                 if is_original:
                     # Weird.  We are the original owner of this
@@ -767,52 +798,52 @@ class RemoteExecutor(BaseExecutor):
                     # thing.
                     logger.exception(e)
                     logger.error(
                     # thing.
                     logger.exception(e)
                     logger.error(
-                        f'{bundle}: We are the original owner thread and yet there are '
-                        + 'no results for this bundle.  This is unexpected and bad.'
+                        '%s: We are the original owner thread and yet there are '
+                        'no results for this bundle.  This is unexpected and bad.',
+                        bundle,
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in process_work_result (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.  Let the original thread worry about
-                    # either finding the results or complaining about
-                    # it.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
                     return None
 
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = (
-                    f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
-                )
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 start_ts = time.time()
-                logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
+                logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
+                logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
             except Exception as e:
                 self.release_worker(bundle)
                 if is_original:
             except Exception as e:
                 self.release_worker(bundle)
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
                     logger.exception(e)
                     logger.error(
                     logger.exception(e)
                     logger.error(
-                        f"{bundle}: Failed to send instructions to the worker machine?! "
-                        + "This is not expected; we\'re the original bundle so this shouldn\'t "
-                        + "be a race condition.  Attempting an emergency retry..."
+                        "%s: Failed to send instructions to the worker machine?! "
+                        "This is not expected; we\'re the original bundle so this shouldn\'t "
+                        "be a race condition.  Attempting an emergency retry...",
+                        bundle,
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
-                    msg = f'{bundle}: Failed to send instructions to the worker machine... '
-                    msg += 'We\'re a backup and this may be caused by the original (or some '
-                    msg += 'other backup) already finishing this work.  Ignoring this.'
-                    logger.warning(msg)
+                    # code_file before we could copy it.  Ignore.
+                    logger.warning(
+                        '%s: Failed to send instructions to the worker machine... '
+                        'We\'re a backup and this may be caused by the original (or '
+                        'some other backup) already finishing this work.  Ignoring.',
+                        bundle,
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
                     return None
 
         # Kick off the work.  Note that if this fails we let
@@ -824,22 +855,21 @@ class RemoteExecutor(BaseExecutor):
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
             f' /home/scott/lib/python_modules/remote_worker.py'
             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
         )
-        logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
+        logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = p.pid
-        logger.debug(
-            f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
-        )
+        logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(
-        self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+        self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
     ) -> Any:
         machine = bundle.machine
     ) -> Any:
         machine = bundle.machine
+        assert p is not None
         pid = p.pid
         if depth > 3:
             logger.error(
         pid = p.pid
         if depth > 3:
             logger.error(
-                f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+                "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
             )
             p.terminate()
             self.release_worker(bundle)
             )
             p.terminate()
             self.release_worker(bundle)
@@ -853,12 +883,10 @@ class RemoteExecutor(BaseExecutor):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
                 p.wait(timeout=0.25)
             except subprocess.TimeoutExpired:
                 if self.check_if_cancelled(bundle):
-                    logger.info(
-                        f'{bundle}: looks like another worker finished bundle...'
-                    )
+                    logger.info('%s: looks like another worker finished bundle...', bundle)
                     break
             else:
                     break
             else:
-                logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
+                logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
                 p = None
                 break
 
                 p = None
                 break
 
@@ -879,10 +907,11 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{bundle}: Something unexpected just happened...')
+            logger.error('%s: Something unexpected just happened...', bundle)
             if p is not None:
             if p is not None:
-                msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
-                logger.warning(msg)
+                logger.warning(
+                    "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+                )
                 return self.wait_for_process(p, bundle, depth + 1)
             else:
                 self.release_worker(bundle)
                 return self.wait_for_process(p, bundle, depth + 1)
             else:
                 self.release_worker(bundle)
@@ -906,7 +935,11 @@ class RemoteExecutor(BaseExecutor):
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+                        "%s: Fetching results back from %s@%s via %s",
+                        bundle,
+                        username,
+                        machine,
+                        cmd,
                     )
 
                     # If either of these throw they are handled in
                     )
 
                     # If either of these throw they are handled in
@@ -918,17 +951,15 @@ class RemoteExecutor(BaseExecutor):
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
                         except Exception as e:
                             attempts += 1
                             if attempts >= 3:
-                                raise (e)
+                                raise e
                         else:
                             break
 
                         else:
                             break
 
+                    # Cleanup remote /tmp files.
                     run_silently(
                     run_silently(
-                        f'{SSH} {username}@{machine}'
-                        f' "/bin/rm -f {code_file} {result_file}"'
-                    )
-                    logger.debug(
-                        f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+                        f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
                     )
+                    logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
                 dur = bundle.end_ts - bundle.start_ts
                 self.histogram.add_item(dur)
 
@@ -937,25 +968,25 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
         if is_original:
-            logger.debug(f"{bundle}: Unpickling {result_file}.")
+            logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
             try:
                 with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
                 logger.exception(e)
-                msg = f'Failed to load {result_file}... this is bad news.'
-                logger.critical(msg)
+                logger.error('Failed to load %s... this is bad news.', result_file)
                 self.release_worker(bundle)
 
                 # Re-raise the exception; the code in wait_for_process may
                 # decide to emergency_retry_nasty_bundle here.
                 self.release_worker(bundle)
 
                 # Re-raise the exception; the code in wait_for_process may
                 # decide to emergency_retry_nasty_bundle here.
-                raise Exception(e)
-            logger.debug(f'Removing local (master) {code_file} and {result_file}.')
-            os.remove(f'{result_file}')
-            os.remove(f'{code_file}')
+                raise e
+            logger.debug('Removing local (master) %s and %s.', code_file, result_file)
+            os.remove(result_file)
+            os.remove(code_file)
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
 
             # Notify any backups that the original is done so they
             # should stop ASAP.  Do this whether or not we
@@ -964,7 +995,7 @@ class RemoteExecutor(BaseExecutor):
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
-                        f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+                        '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
                     )
                     backup.is_cancelled.set()
 
                     )
                     backup.is_cancelled.set()
 
@@ -978,22 +1009,22 @@ class RemoteExecutor(BaseExecutor):
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
 
             # Tell the original to stop if we finished first.
             if not was_cancelled:
+                orig_bundle = bundle.src_bundle
+                assert orig_bundle is not None
                 logger.debug(
                 logger.debug(
-                    f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+                    '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
                 )
                 )
-                bundle.src_bundle.is_cancelled.set()
+                orig_bundle.is_cancelled.set()
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
     def create_original_bundle(self, pickle, fname: str):
         self.release_worker(bundle, was_cancelled=was_cancelled)
         return result
 
     def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
-
-        uuid = generate_uuid(omit_dashes=True)
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
-        logger.debug(f'Writing pickled code to {code_file}')
-        with open(f'{code_file}', 'wb') as wb:
+        logger.debug('Writing pickled code to %s', code_file)
+        with open(code_file, 'wb') as wb:
             wb.write(pickle)
 
         bundle = BundleDetails(
             wb.write(pickle)
 
         bundle = BundleDetails(
@@ -1018,10 +1049,11 @@ class RemoteExecutor(BaseExecutor):
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
             failure_count=0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{bundle}: Created an original bundle')
+        logger.debug('%s: Created an original bundle', bundle)
         return bundle
 
     def create_backup_bundle(self, src_bundle: BundleDetails):
         return bundle
 
     def create_backup_bundle(self, src_bundle: BundleDetails):
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
@@ -1049,7 +1081,7 @@ class RemoteExecutor(BaseExecutor):
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{backup_bundle}: Created a backup bundle')
+        logger.debug('%s: Created a backup bundle', backup_bundle)
         return backup_bundle
 
     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
         return backup_bundle
 
     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
@@ -1057,15 +1089,16 @@ class RemoteExecutor(BaseExecutor):
         assert src_bundle is not None
         backup_bundle = self.create_backup_bundle(src_bundle)
         logger.debug(
         assert src_bundle is not None
         backup_bundle = self.create_backup_bundle(src_bundle)
         logger.debug(
-            f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+            '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
         )
         self._helper_executor.submit(self.launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
         )
         self._helper_executor.submit(self.launch, backup_bundle)
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
 
 
-    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+    def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
         is_original = bundle.src_bundle is None
         bundle.worker = None
         avoid_last_machine = bundle.machine
@@ -1079,15 +1112,17 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                '%s: Tried this bundle too many times already (%dx); giving up.',
+                bundle,
+                retry_limit,
             )
             if is_original:
                 raise RemoteExecutorException(
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+                    f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
             else:
                 logger.error(
                 )
             else:
                 logger.error(
-                    f'{bundle}: At least it\'s only a backup; better luck with the others.'
+                    '%s: At least it\'s only a backup; better luck with the others.', bundle
                 )
             return None
         else:
                 )
             return None
         else:
@@ -1106,29 +1141,35 @@ class RemoteExecutor(BaseExecutor):
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
-    def shutdown(self, wait=True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         if not self.already_shutdown:
         if not self.already_shutdown:
-            logging.debug(f'Shutting down RemoteExecutor {self.title}')
+            logging.debug('Shutting down RemoteExecutor %s', self.title)
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
             self.heartbeat_stop_event.set()
             self.heartbeat_thread.join()
             self._helper_executor.shutdown(wait)
-            print(self.histogram)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
             self.already_shutdown = True
 
 
 @singleton
 class DefaultExecutors(object):
             self.already_shutdown = True
 
 
 @singleton
 class DefaultExecutors(object):
+    """A container for a default thread, process and remote executor.
+    These are not created until needed and we take care to clean up
+    before process exit.
+
+    """
+
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
     def __init__(self):
         self.thread_executor: Optional[ThreadExecutor] = None
         self.process_executor: Optional[ProcessExecutor] = None
         self.remote_executor: Optional[RemoteExecutor] = None
 
-    def ping(self, host) -> bool:
-        logger.debug(f'RUN> ping -c 1 {host}')
+    @staticmethod
+    def ping(host) -> bool:
+        logger.debug('RUN> ping -c 1 %s', host)
         try:
         try:
-            x = cmd_with_timeout(
-                f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
-            )
+            x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
             return x == 0
         except Exception:
             return False
             return x == 0
         except Exception:
             return False
@@ -1153,8 +1194,8 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
-                        weight=30,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
             if self.ping('meerkat.cabin'):
                     ),
                 )
             if self.ping('meerkat.cabin'):
@@ -1173,8 +1214,8 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
-                        weight=25,
-                        count=10,
+                        weight=14,
+                        count=2,
                     ),
                 )
             if self.ping('puma.cabin'):
                     ),
                 )
             if self.ping('puma.cabin'):
@@ -1183,8 +1224,8 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
-                        weight=30,
-                        count=6,
+                        weight=24,
+                        count=5,
                     ),
                 )
             if self.ping('backup.house'):
                     ),
                 )
             if self.ping('backup.house'):
@@ -1193,7 +1234,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
-                        weight=8,
+                        weight=9,
                         count=2,
                     ),
                 )
                         count=2,
                     ),
                 )
@@ -1201,8 +1242,8 @@ class DefaultExecutors(object):
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
             # The controller machine has a lot to do; go easy on it.
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
-                    logger.info(f'Reducing workload for {record.machine}.')
-                    record.count = 1
+                    logger.info('Reducing workload for %s.', record.machine)
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
@@ -1211,11 +1252,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None
             self.remote_executor = None