Improve docstrings for sphinx.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
9 machines.
10
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
13
14 from __future__ import annotations
15 import concurrent.futures as fut
16 import logging
17 import os
18 import platform
19 import random
20 import subprocess
21 import threading
22 import time
23 import warnings
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
28
29 import cloudpickle  # type: ignore
30 import numpy
31 from overrides import overrides
32
33 import argparse_utils
34 import config
35 import histogram as hist
36 import string_utils
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
40 from thread_utils import background_thread
41
42 logger = logging.getLogger(__name__)
43
44 parser = config.add_commandline_args(
45     f"Executors ({__file__})", "Args related to processing executors."
46 )
47 parser.add_argument(
48     '--executors_threadpool_size',
49     type=int,
50     metavar='#THREADS',
51     help='Number of threads in the default threadpool, leave unset for default',
52     default=None,
53 )
54 parser.add_argument(
55     '--executors_processpool_size',
56     type=int,
57     metavar='#PROCESSES',
58     help='Number of processes in the default processpool, leave unset for default',
59     default=None,
60 )
61 parser.add_argument(
62     '--executors_schedule_remote_backups',
63     default=True,
64     action=argparse_utils.ActionNoYes,
65     help='Should we schedule duplicative backup work if a remote bundle is slow',
66 )
67 parser.add_argument(
68     '--executors_max_bundle_failures',
69     type=int,
70     default=3,
71     metavar='#FAILURES',
72     help='Maximum number of failures before giving up on a bundle',
73 )
74
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
77
78
79 def _make_cloud_pickle(fun, *args, **kwargs):
80     """Internal helper to create cloud pickles."""
81     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
82     return cloudpickle.dumps((fun, args, kwargs))
83
84
85 class BaseExecutor(ABC):
86     """The base executor interface definition.  The interface for
87     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
88     :class:`ThreadExecutor`.
89     """
90
91     def __init__(self, *, title=''):
92         self.title = title
93         self.histogram = hist.SimpleHistogram(
94             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
95         )
96         self.task_count = 0
97
98     @abstractmethod
99     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
100         pass
101
102     @abstractmethod
103     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
104         pass
105
106     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
107         """Shutdown the executor and return True if the executor is idle
108         (i.e. there are no pending or active tasks).  Return False
109         otherwise.  Note: this should only be called by the launcher
110         process.
111
112         """
113         if self.task_count == 0:
114             self.shutdown(wait=True, quiet=quiet)
115             return True
116         return False
117
118     def adjust_task_count(self, delta: int) -> None:
119         """Change the task count.  Note: do not call this method from a
120         worker, it should only be called by the launcher process /
121         thread / machine.
122
123         """
124         self.task_count += delta
125         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
126
127     def get_task_count(self) -> int:
128         """Change the task count.  Note: do not call this method from a
129         worker, it should only be called by the launcher process /
130         thread / machine.
131
132         """
133         return self.task_count
134
135
136 class ThreadExecutor(BaseExecutor):
137     """A threadpool executor.  This executor uses python threads to
138     schedule tasks.  Note that, at least as of python3.10, because of
139     the global lock in the interpreter itself, these do not
140     parallelize very well so this class is useful mostly for non-CPU
141     intensive tasks.
142
143     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
144     """
145
146     def __init__(self, max_workers: Optional[int] = None):
147         super().__init__()
148         workers = None
149         if max_workers is not None:
150             workers = max_workers
151         elif 'executors_threadpool_size' in config.config:
152             workers = config.config['executors_threadpool_size']
153         logger.debug('Creating threadpool executor with %d workers', workers)
154         self._thread_pool_executor = fut.ThreadPoolExecutor(
155             max_workers=workers, thread_name_prefix="thread_executor_helper"
156         )
157         self.already_shutdown = False
158
159     # This is run on a different thread; do not adjust task count here.
160     @staticmethod
161     def run_local_bundle(fun, *args, **kwargs):
162         logger.debug("Running local bundle at %s", fun.__name__)
163         result = fun(*args, **kwargs)
164         return result
165
166     @overrides
167     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
168         if self.already_shutdown:
169             raise Exception('Submitted work after shutdown.')
170         self.adjust_task_count(+1)
171         newargs = []
172         newargs.append(function)
173         for arg in args:
174             newargs.append(arg)
175         start = time.time()
176         result = self._thread_pool_executor.submit(
177             ThreadExecutor.run_local_bundle, *newargs, **kwargs
178         )
179         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
180         result.add_done_callback(lambda _: self.adjust_task_count(-1))
181         return result
182
183     @overrides
184     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
185         if not self.already_shutdown:
186             logger.debug('Shutting down threadpool executor %s', self.title)
187             self._thread_pool_executor.shutdown(wait)
188             if not quiet:
189                 print(self.histogram.__repr__(label_formatter='%ds'))
190             self.already_shutdown = True
191
192
193 class ProcessExecutor(BaseExecutor):
194     """An executor which runs tasks in child processes.
195
196     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
197     """
198
199     def __init__(self, max_workers=None):
200         super().__init__()
201         workers = None
202         if max_workers is not None:
203             workers = max_workers
204         elif 'executors_processpool_size' in config.config:
205             workers = config.config['executors_processpool_size']
206         logger.debug('Creating processpool executor with %d workers.', workers)
207         self._process_executor = fut.ProcessPoolExecutor(
208             max_workers=workers,
209         )
210         self.already_shutdown = False
211
212     # This is run in another process; do not adjust task count here.
213     @staticmethod
214     def run_cloud_pickle(pickle):
215         fun, args, kwargs = cloudpickle.loads(pickle)
216         logger.debug("Running pickled bundle at %s", fun.__name__)
217         result = fun(*args, **kwargs)
218         return result
219
220     @overrides
221     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
222         if self.already_shutdown:
223             raise Exception('Submitted work after shutdown.')
224         start = time.time()
225         self.adjust_task_count(+1)
226         pickle = _make_cloud_pickle(function, *args, **kwargs)
227         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
228         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
229         result.add_done_callback(lambda _: self.adjust_task_count(-1))
230         return result
231
232     @overrides
233     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
234         if not self.already_shutdown:
235             logger.debug('Shutting down processpool executor %s', self.title)
236             self._process_executor.shutdown(wait)
237             if not quiet:
238                 print(self.histogram.__repr__(label_formatter='%ds'))
239             self.already_shutdown = True
240
241     def __getstate__(self):
242         state = self.__dict__.copy()
243         state['_process_executor'] = None
244         return state
245
246
247 class RemoteExecutorException(Exception):
248     """Thrown when a bundle cannot be executed despite several retries."""
249
250     pass
251
252
253 @dataclass
254 class RemoteWorkerRecord:
255     """A record of info about a remote worker."""
256
257     username: str
258     """Username we can ssh into on this machine to run work."""
259
260     machine: str
261     """Machine address / name."""
262
263     weight: int
264     """Relative probability for the weighted policy to select this
265     machine for scheduling work."""
266
267     count: int
268     """If this machine is selected, what is the maximum number of task
269     that it can handle?"""
270
271     def __hash__(self):
272         return hash((self.username, self.machine))
273
274     def __repr__(self):
275         return f'{self.username}@{self.machine}'
276
277
278 @dataclass
279 class BundleDetails:
280     """All info necessary to define some unit of work that needs to be
281     done, where it is being run, its state, whether it is an original
282     bundle of a backup bundle, how many times it has failed, etc...
283     """
284
285     pickled_code: bytes
286     """The code to run, cloud pickled"""
287
288     uuid: str
289     """A unique identifier"""
290
291     function_name: str
292     """The name of the function we pickled"""
293
294     worker: Optional[RemoteWorkerRecord]
295     """The remote worker running this bundle or None if none (yet)"""
296
297     username: Optional[str]
298     """The remote username running this bundle or None if none (yet)"""
299
300     machine: Optional[str]
301     """The remote machine running this bundle or None if none (yet)"""
302
303     hostname: str
304     """The controller machine"""
305
306     code_file: str
307     """A unique filename to hold the work to be done"""
308
309     result_file: str
310     """Where the results should be placed / read from"""
311
312     pid: int
313     """The process id of the local subprocess watching the ssh connection
314     to the remote machine"""
315
316     start_ts: float
317     """Starting time"""
318
319     end_ts: float
320     """Ending time"""
321
322     slower_than_local_p95: bool
323     """Currently slower then 95% of other bundles on remote host"""
324
325     slower_than_global_p95: bool
326     """Currently slower than 95% of other bundles globally"""
327
328     src_bundle: Optional[BundleDetails]
329     """If this is a backup bundle, this points to the original bundle
330     that it's backing up.  None otherwise."""
331
332     is_cancelled: threading.Event
333     """An event that can be signaled to indicate this bundle is cancelled.
334     This is set when another copy (backup or original) of this work has
335     completed successfully elsewhere."""
336
337     was_cancelled: bool
338     """True if this bundle was cancelled, False if it finished normally"""
339
340     backup_bundles: Optional[List[BundleDetails]]
341     """If we've created backups of this bundle, this is the list of them"""
342
343     failure_count: int
344     """How many times has this bundle failed already?"""
345
346     def __repr__(self):
347         uuid = self.uuid
348         if uuid[-9:-2] == '_backup':
349             uuid = uuid[:-9]
350             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
351         else:
352             suffix = uuid[-6:]
353
354         # We colorize the uuid based on some bits from it to make them
355         # stand out in the logging and help a reader correlate log messages
356         # related to the same bundle.
357         colorz = [
358             fg('violet red'),
359             fg('red'),
360             fg('orange'),
361             fg('peach orange'),
362             fg('yellow'),
363             fg('marigold yellow'),
364             fg('green yellow'),
365             fg('tea green'),
366             fg('cornflower blue'),
367             fg('turquoise blue'),
368             fg('tropical blue'),
369             fg('lavender purple'),
370             fg('medium purple'),
371         ]
372         c = colorz[int(uuid[-2:], 16) % len(colorz)]
373         function_name = self.function_name if self.function_name is not None else 'nofname'
374         machine = self.machine if self.machine is not None else 'nomachine'
375         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
376
377
378 class RemoteExecutorStatus:
379     """A status 'scoreboard' for a remote executor tracking various
380     metrics and able to render a periodic dump of global state.
381     """
382
383     def __init__(self, total_worker_count: int) -> None:
384         """C'tor.
385
386         Args:
387             total_worker_count: number of workers in the pool
388
389         """
390         self.worker_count: int = total_worker_count
391         self.known_workers: Set[RemoteWorkerRecord] = set()
392         self.start_time: float = time.time()
393         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
394         self.end_per_bundle: Dict[str, float] = defaultdict(float)
395         self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
396         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
397         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
398         self.finished_bundle_timings: List[float] = []
399         self.last_periodic_dump: Optional[float] = None
400         self.total_bundles_submitted: int = 0
401
402         # Protects reads and modification using self.  Also used
403         # as a memory fence for modifications to bundle.
404         self.lock: threading.Lock = threading.Lock()
405
406     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
407         """Record that bundle with uuid is assigned to a particular worker.
408
409         Args:
410             worker: the record of the worker to which uuid is assigned
411             uuid: the uuid of a bundle that has been assigned to a worker
412         """
413         with self.lock:
414             self.record_acquire_worker_already_locked(worker, uuid)
415
416     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
417         """Same as above but an entry point that doesn't acquire the lock
418         for codepaths where it's already held."""
419         assert self.lock.locked()
420         self.known_workers.add(worker)
421         self.start_per_bundle[uuid] = None
422         x = self.in_flight_bundles_by_worker.get(worker, set())
423         x.add(uuid)
424         self.in_flight_bundles_by_worker[worker] = x
425
426     def record_bundle_details(self, details: BundleDetails) -> None:
427         """Register the details about a bundle of work."""
428         with self.lock:
429             self.record_bundle_details_already_locked(details)
430
431     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
432         """Same as above but for codepaths that already hold the lock."""
433         assert self.lock.locked()
434         self.bundle_details_by_uuid[details.uuid] = details
435
436     def record_release_worker(
437         self,
438         worker: RemoteWorkerRecord,
439         uuid: str,
440         was_cancelled: bool,
441     ) -> None:
442         """Record that a bundle has released a worker."""
443         with self.lock:
444             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
445
446     def record_release_worker_already_locked(
447         self,
448         worker: RemoteWorkerRecord,
449         uuid: str,
450         was_cancelled: bool,
451     ) -> None:
452         """Same as above but for codepaths that already hold the lock."""
453         assert self.lock.locked()
454         ts = time.time()
455         self.end_per_bundle[uuid] = ts
456         self.in_flight_bundles_by_worker[worker].remove(uuid)
457         if not was_cancelled:
458             start = self.start_per_bundle[uuid]
459             assert start is not None
460             bundle_latency = ts - start
461             x = self.finished_bundle_timings_per_worker.get(worker, [])
462             x.append(bundle_latency)
463             self.finished_bundle_timings_per_worker[worker] = x
464             self.finished_bundle_timings.append(bundle_latency)
465
466     def record_processing_began(self, uuid: str):
467         """Record when work on a bundle begins."""
468         with self.lock:
469             self.start_per_bundle[uuid] = time.time()
470
471     def total_in_flight(self) -> int:
472         """How many bundles are in flight currently?"""
473         assert self.lock.locked()
474         total_in_flight = 0
475         for worker in self.known_workers:
476             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
477         return total_in_flight
478
479     def total_idle(self) -> int:
480         """How many idle workers are there currently?"""
481         assert self.lock.locked()
482         return self.worker_count - self.total_in_flight()
483
484     def __repr__(self):
485         assert self.lock.locked()
486         ts = time.time()
487         total_finished = len(self.finished_bundle_timings)
488         total_in_flight = self.total_in_flight()
489         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
490         qall = None
491         if len(self.finished_bundle_timings) > 1:
492             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
493             ret += (
494                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
495                 f'✅={total_finished}/{self.total_bundles_submitted}, '
496                 f'💻n={total_in_flight}/{self.worker_count}\n'
497             )
498         else:
499             ret += (
500                 f'⏱={ts-self.start_time:.1f}s, '
501                 f'✅={total_finished}/{self.total_bundles_submitted}, '
502                 f'💻n={total_in_flight}/{self.worker_count}\n'
503             )
504
505         for worker in self.known_workers:
506             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
507             timings = self.finished_bundle_timings_per_worker.get(worker, [])
508             count = len(timings)
509             qworker = None
510             if count > 1:
511                 qworker = numpy.quantile(timings, [0.5, 0.95])
512                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
513             else:
514                 ret += '\n'
515             if count > 0:
516                 ret += f'    ...finished {count} total bundle(s) so far\n'
517             in_flight = len(self.in_flight_bundles_by_worker[worker])
518             if in_flight > 0:
519                 ret += f'    ...{in_flight} bundles currently in flight:\n'
520                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
521                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
522                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
523                     if self.start_per_bundle[bundle_uuid] is not None:
524                         sec = ts - self.start_per_bundle[bundle_uuid]
525                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
526                     else:
527                         ret += f'       {details} setting up / copying data...'
528                         sec = 0.0
529
530                     if qworker is not None:
531                         if sec > qworker[1]:
532                             ret += f'{bg("red")}>💻p95{reset()} '
533                             if details is not None:
534                                 details.slower_than_local_p95 = True
535                         else:
536                             if details is not None:
537                                 details.slower_than_local_p95 = False
538
539                     if qall is not None:
540                         if sec > qall[1]:
541                             ret += f'{bg("red")}>∀p95{reset()} '
542                             if details is not None:
543                                 details.slower_than_global_p95 = True
544                         else:
545                             details.slower_than_global_p95 = False
546                     ret += '\n'
547         return ret
548
549     def periodic_dump(self, total_bundles_submitted: int) -> None:
550         assert self.lock.locked()
551         self.total_bundles_submitted = total_bundles_submitted
552         ts = time.time()
553         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
554             print(self)
555             self.last_periodic_dump = ts
556
557
558 class RemoteWorkerSelectionPolicy(ABC):
559     """A policy for selecting a remote worker base class."""
560
561     def __init__(self):
562         self.workers: Optional[List[RemoteWorkerRecord]] = None
563
564     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
565         self.workers = workers
566
567     @abstractmethod
568     def is_worker_available(self) -> bool:
569         pass
570
571     @abstractmethod
572     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
573         pass
574
575
576 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
577     """A remote worker selector that uses weighted RNG."""
578
579     @overrides
580     def is_worker_available(self) -> bool:
581         if self.workers:
582             for worker in self.workers:
583                 if worker.count > 0:
584                     return True
585         return False
586
587     @overrides
588     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
589         grabbag = []
590         if self.workers:
591             for worker in self.workers:
592                 if worker.machine != machine_to_avoid:
593                     if worker.count > 0:
594                         for _ in range(worker.count * worker.weight):
595                             grabbag.append(worker)
596
597         if len(grabbag) == 0:
598             logger.debug('There are no available workers that avoid %s', machine_to_avoid)
599             if self.workers:
600                 for worker in self.workers:
601                     if worker.count > 0:
602                         for _ in range(worker.count * worker.weight):
603                             grabbag.append(worker)
604
605         if len(grabbag) == 0:
606             logger.warning('There are no available workers?!')
607             return None
608
609         worker = random.sample(grabbag, 1)[0]
610         assert worker.count > 0
611         worker.count -= 1
612         logger.debug('Selected worker %s', worker)
613         return worker
614
615
616 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
617     """A remote worker selector that just round robins."""
618
619     def __init__(self) -> None:
620         super().__init__()
621         self.index = 0
622
623     @overrides
624     def is_worker_available(self) -> bool:
625         if self.workers:
626             for worker in self.workers:
627                 if worker.count > 0:
628                     return True
629         return False
630
631     @overrides
632     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
633         if self.workers:
634             x = self.index
635             while True:
636                 worker = self.workers[x]
637                 if worker.count > 0:
638                     worker.count -= 1
639                     x += 1
640                     if x >= len(self.workers):
641                         x = 0
642                     self.index = x
643                     logger.debug('Selected worker %s', worker)
644                     return worker
645                 x += 1
646                 if x >= len(self.workers):
647                     x = 0
648                 if x == self.index:
649                     logger.warning('Unexpectedly could not find a worker, retrying...')
650                     return None
651         return None
652
653
654 class RemoteExecutor(BaseExecutor):
655     """An executor that uses processes on remote machines to do work.  This
656     works by creating "bundles" of work with pickled code in each to be
657     executed.  Each bundle is assigned a remote worker based on some policy
658     heuristics.  Once assigned to a remote worker, a local subprocess is
659     created.  It copies the pickled code to the remote machine via ssh/scp
660     and then starts up work on the remote machine again using ssh.  When
661     the work is complete it copies the results back to the local machine.
662
663     So there is essentially one "controller" machine (which may also be
664     in the remote executor pool and therefore do task work in addition to
665     controlling) and N worker machines.  This code runs on the controller
666     whereas on the worker machines we invoke pickled user code via a
667     shim in :file:`remote_worker.py`.
668
669     Some redundancy and safety provisions are made; e.g. slower than
670     expected tasks have redundant backups created and if a task fails
671     repeatedly we consider it poisoned and give up on it.
672
673     .. warning::
674
675         The network overhead / latency of copying work from the
676         controller machine to the remote workers is relatively high.
677         This executor probably only makes sense to use with
678         computationally expensive tasks such as jobs that will execute
679         for ~30 seconds or longer.
680
681     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
682     """
683
684     def __init__(
685         self,
686         workers: List[RemoteWorkerRecord],
687         policy: RemoteWorkerSelectionPolicy,
688     ) -> None:
689         """C'tor.
690
691         Args:
692             workers: A list of remote workers we can call on to do tasks.
693             policy: A policy for selecting remote workers for tasks.
694         """
695
696         super().__init__()
697         self.workers = workers
698         self.policy = policy
699         self.worker_count = 0
700         for worker in self.workers:
701             self.worker_count += worker.count
702         if self.worker_count <= 0:
703             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
704             logger.critical(msg)
705             raise RemoteExecutorException(msg)
706         self.policy.register_worker_pool(self.workers)
707         self.cv = threading.Condition()
708         logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
709         self._helper_executor = fut.ThreadPoolExecutor(
710             thread_name_prefix="remote_executor_helper",
711             max_workers=self.worker_count,
712         )
713         self.status = RemoteExecutorStatus(self.worker_count)
714         self.total_bundles_submitted = 0
715         self.backup_lock = threading.Lock()
716         self.last_backup = None
717         (
718             self.heartbeat_thread,
719             self.heartbeat_stop_event,
720         ) = self._run_periodic_heartbeat()
721         self.already_shutdown = False
722
723     @background_thread
724     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
725         """
726         We create a background thread to invoke :meth:`_heartbeat` regularly
727         while we are scheduling work.  It does some accounting such as
728         looking for slow bundles to tag for backup creation, checking for
729         unexpected failures, and printing a fancy message on stdout.
730         """
731         while not stop_event.is_set():
732             time.sleep(5.0)
733             logger.debug('Running periodic heartbeat code...')
734             self._heartbeat()
735         logger.debug('Periodic heartbeat thread shutting down.')
736
737     def _heartbeat(self) -> None:
738         # Note: this is invoked on a background thread, not an
739         # executor thread.  Be careful what you do with it b/c it
740         # needs to get back and dump status again periodically.
741         with self.status.lock:
742             self.status.periodic_dump(self.total_bundles_submitted)
743
744             # Look for bundles to reschedule via executor.submit
745             if config.config['executors_schedule_remote_backups']:
746                 self._maybe_schedule_backup_bundles()
747
748     def _maybe_schedule_backup_bundles(self):
749         """Maybe schedule backup bundles if we see a very slow bundle."""
750
751         assert self.status.lock.locked()
752         num_done = len(self.status.finished_bundle_timings)
753         num_idle_workers = self.worker_count - self.task_count
754         now = time.time()
755         if (
756             num_done >= 2
757             and num_idle_workers > 0
758             and (self.last_backup is None or (now - self.last_backup > 9.0))
759             and self.backup_lock.acquire(blocking=False)
760         ):
761             try:
762                 assert self.backup_lock.locked()
763
764                 bundle_to_backup = None
765                 best_score = None
766                 for (
767                     worker,
768                     bundle_uuids,
769                 ) in self.status.in_flight_bundles_by_worker.items():
770
771                     # Prefer to schedule backups of bundles running on
772                     # slower machines.
773                     base_score = 0
774                     for record in self.workers:
775                         if worker.machine == record.machine:
776                             base_score = float(record.weight)
777                             base_score = 1.0 / base_score
778                             base_score *= 200.0
779                             base_score = int(base_score)
780                             break
781
782                     for uuid in bundle_uuids:
783                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
784                         if (
785                             bundle is not None
786                             and bundle.src_bundle is None
787                             and bundle.backup_bundles is not None
788                         ):
789                             score = base_score
790
791                             # Schedule backups of bundles running
792                             # longer; especially those that are
793                             # unexpectedly slow.
794                             start_ts = self.status.start_per_bundle[uuid]
795                             if start_ts is not None:
796                                 runtime = now - start_ts
797                                 score += runtime
798                                 logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
799
800                                 if bundle.slower_than_local_p95:
801                                     score += runtime / 2
802                                     logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
803
804                                 if bundle.slower_than_global_p95:
805                                     score += runtime / 4
806                                     logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
807
808                             # Prefer backups of bundles that don't
809                             # have backups already.
810                             backup_count = len(bundle.backup_bundles)
811                             if backup_count == 0:
812                                 score *= 2
813                             elif backup_count == 1:
814                                 score /= 2
815                             elif backup_count == 2:
816                                 score /= 8
817                             else:
818                                 score = 0
819                             logger.debug(
820                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
821                                 bundle,
822                                 score,
823                             )
824
825                             if score != 0 and (best_score is None or score > best_score):
826                                 bundle_to_backup = bundle
827                                 assert bundle is not None
828                                 assert bundle.backup_bundles is not None
829                                 assert bundle.src_bundle is None
830                                 best_score = score
831
832                 # Note: this is all still happening on the heartbeat
833                 # runner thread.  That's ok because
834                 # _schedule_backup_for_bundle uses the executor to
835                 # submit the bundle again which will cause it to be
836                 # picked up by a worker thread and allow this thread
837                 # to return to run future heartbeats.
838                 if bundle_to_backup is not None:
839                     self.last_backup = now
840                     logger.info(
841                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
842                         bundle_to_backup,
843                         best_score,
844                     )
845                     self._schedule_backup_for_bundle(bundle_to_backup)
846             finally:
847                 self.backup_lock.release()
848
849     def _is_worker_available(self) -> bool:
850         """Is there a worker available currently?"""
851         return self.policy.is_worker_available()
852
853     def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
854         """Try to acquire a worker."""
855         return self.policy.acquire_worker(machine_to_avoid)
856
857     def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
858         """Find a worker or block until one becomes available."""
859         with self.cv:
860             while not self._is_worker_available():
861                 self.cv.wait()
862             worker = self._acquire_worker(machine_to_avoid)
863             if worker is not None:
864                 return worker
865         msg = "We should never reach this point in the code"
866         logger.critical(msg)
867         raise Exception(msg)
868
869     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
870         """Release a previously acquired worker."""
871         worker = bundle.worker
872         assert worker is not None
873         logger.debug('Released worker %s', worker)
874         self.status.record_release_worker(
875             worker,
876             bundle.uuid,
877             was_cancelled,
878         )
879         with self.cv:
880             worker.count += 1
881             self.cv.notify()
882         self.adjust_task_count(-1)
883
884     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
885         """See if a particular bundle is cancelled.  Do not block."""
886         with self.status.lock:
887             if bundle.is_cancelled.wait(timeout=0.0):
888                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
889                 bundle.was_cancelled = True
890                 return True
891         return False
892
893     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
894         """Find a worker for bundle or block until one is available."""
895
896         self.adjust_task_count(+1)
897         uuid = bundle.uuid
898         hostname = bundle.hostname
899         avoid_machine = override_avoid_machine
900         is_original = bundle.src_bundle is None
901
902         # Try not to schedule a backup on the same host as the original.
903         if avoid_machine is None and bundle.src_bundle is not None:
904             avoid_machine = bundle.src_bundle.machine
905         worker = None
906         while worker is None:
907             worker = self._find_available_worker_or_block(avoid_machine)
908         assert worker is not None
909
910         # Ok, found a worker.
911         bundle.worker = worker
912         machine = bundle.machine = worker.machine
913         username = bundle.username = worker.username
914         self.status.record_acquire_worker(worker, uuid)
915         logger.debug('%s: Running bundle on %s...', bundle, worker)
916
917         # Before we do any work, make sure the bundle is still viable.
918         # It may have been some time between when it was submitted and
919         # now due to lack of worker availability and someone else may
920         # have already finished it.
921         if self._check_if_cancelled(bundle):
922             try:
923                 return self._process_work_result(bundle)
924             except Exception as e:
925                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
926                 self._release_worker(bundle)
927                 if is_original:
928                     # Weird.  We are the original owner of this
929                     # bundle.  For it to have been cancelled, a backup
930                     # must have already started and completed before
931                     # we even for started.  Moreover, the backup says
932                     # it is done but we can't find the results it
933                     # should have copied over.  Reschedule the whole
934                     # thing.
935                     logger.exception(e)
936                     logger.error(
937                         '%s: We are the original owner thread and yet there are '
938                         'no results for this bundle.  This is unexpected and bad.',
939                         bundle,
940                     )
941                     return self._emergency_retry_nasty_bundle(bundle)
942                 else:
943                     # We're a backup and our bundle is cancelled
944                     # before we even got started.  Do nothing and let
945                     # the original bundle's thread worry about either
946                     # finding the results or complaining about it.
947                     return None
948
949         # Send input code / data to worker machine if it's not local.
950         if hostname not in machine:
951             try:
952                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
953                 start_ts = time.time()
954                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
955                 run_silently(cmd)
956                 xfer_latency = time.time() - start_ts
957                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
958             except Exception as e:
959                 self._release_worker(bundle)
960                 if is_original:
961                     # Weird.  We tried to copy the code to the worker
962                     # and it failed...  And we're the original bundle.
963                     # We have to retry.
964                     logger.exception(e)
965                     logger.error(
966                         "%s: Failed to send instructions to the worker machine?! "
967                         "This is not expected; we\'re the original bundle so this shouldn\'t "
968                         "be a race condition.  Attempting an emergency retry...",
969                         bundle,
970                     )
971                     return self._emergency_retry_nasty_bundle(bundle)
972                 else:
973                     # This is actually expected; we're a backup.
974                     # There's a race condition where someone else
975                     # already finished the work and removed the source
976                     # code_file before we could copy it.  Ignore.
977                     logger.warning(
978                         '%s: Failed to send instructions to the worker machine... '
979                         'We\'re a backup and this may be caused by the original (or '
980                         'some other backup) already finishing this work.  Ignoring.',
981                         bundle,
982                     )
983                     return None
984
985         # Kick off the work.  Note that if this fails we let
986         # _wait_for_process deal with it.
987         self.status.record_processing_began(uuid)
988         cmd = (
989             f'{SSH} {bundle.username}@{bundle.machine} '
990             f'"source py38-venv/bin/activate &&'
991             f' /home/scott/lib/python_modules/remote_worker.py'
992             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
993         )
994         logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
995         p = cmd_in_background(cmd, silent=True)
996         bundle.pid = p.pid
997         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
998         return self._wait_for_process(p, bundle, 0)
999
1000     def _wait_for_process(
1001         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1002     ) -> Any:
1003         """At this point we've copied the bundle's pickled code to the remote
1004         worker and started an ssh process that should be invoking the
1005         remote worker to have it execute the user's code.  See how
1006         that's going and wait for it to complete or fail.  Note that
1007         this code is recursive: there are codepaths where we decide to
1008         stop waiting for an ssh process (because another backup seems
1009         to have finished) but then fail to fetch or parse the results
1010         from that backup and thus call ourselves to continue waiting
1011         on an active ssh process.  This is the purpose of the depth
1012         argument: to curtail potential infinite recursion by giving up
1013         eventually.
1014
1015         Args:
1016             p: the Popen record of the ssh job
1017             bundle: the bundle of work being executed remotely
1018             depth: how many retries we've made so far.  Starts at zero.
1019
1020         """
1021
1022         machine = bundle.machine
1023         assert p is not None
1024         pid = p.pid  # pid of the ssh process
1025         if depth > 3:
1026             logger.error(
1027                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1028             )
1029             p.terminate()
1030             self._release_worker(bundle)
1031             return self._emergency_retry_nasty_bundle(bundle)
1032
1033         # Spin until either the ssh job we scheduled finishes the
1034         # bundle or some backup worker signals that they finished it
1035         # before we could.
1036         while True:
1037             try:
1038                 p.wait(timeout=0.25)
1039             except subprocess.TimeoutExpired:
1040                 if self._check_if_cancelled(bundle):
1041                     logger.info('%s: looks like another worker finished bundle...', bundle)
1042                     break
1043             else:
1044                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1045                 p = None
1046                 break
1047
1048         # If we get here we believe the bundle is done; either the ssh
1049         # subprocess finished (hopefully successfully) or we noticed
1050         # that some other worker seems to have completed the bundle
1051         # before us and we're bailing out.
1052         try:
1053             ret = self._process_work_result(bundle)
1054             if ret is not None and p is not None:
1055                 p.terminate()
1056             return ret
1057
1058         # Something went wrong; e.g. we could not copy the results
1059         # back, cleanup after ourselves on the remote machine, or
1060         # unpickle the results we got from the remove machine.  If we
1061         # still have an active ssh subprocess, keep waiting on it.
1062         # Otherwise, time for an emergency reschedule.
1063         except Exception as e:
1064             logger.exception(e)
1065             logger.error('%s: Something unexpected just happened...', bundle)
1066             if p is not None:
1067                 logger.warning(
1068                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1069                 )
1070                 return self._wait_for_process(p, bundle, depth + 1)
1071             else:
1072                 self._release_worker(bundle)
1073                 return self._emergency_retry_nasty_bundle(bundle)
1074
1075     def _process_work_result(self, bundle: BundleDetails) -> Any:
1076         """A bundle seems to be completed.  Check on the results."""
1077
1078         with self.status.lock:
1079             is_original = bundle.src_bundle is None
1080             was_cancelled = bundle.was_cancelled
1081             username = bundle.username
1082             machine = bundle.machine
1083             result_file = bundle.result_file
1084             code_file = bundle.code_file
1085
1086             # Whether original or backup, if we finished first we must
1087             # fetch the results if the computation happened on a
1088             # remote machine.
1089             bundle.end_ts = time.time()
1090             if not was_cancelled:
1091                 assert bundle.machine is not None
1092                 if bundle.hostname not in bundle.machine:
1093                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1094                     logger.info(
1095                         "%s: Fetching results back from %s@%s via %s",
1096                         bundle,
1097                         username,
1098                         machine,
1099                         cmd,
1100                     )
1101
1102                     # If either of these throw they are handled in
1103                     # _wait_for_process.
1104                     attempts = 0
1105                     while True:
1106                         try:
1107                             run_silently(cmd)
1108                         except Exception as e:
1109                             attempts += 1
1110                             if attempts >= 3:
1111                                 raise e
1112                         else:
1113                             break
1114
1115                     # Cleanup remote /tmp files.
1116                     run_silently(
1117                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1118                     )
1119                     logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1120                 dur = bundle.end_ts - bundle.start_ts
1121                 self.histogram.add_item(dur)
1122
1123         # Only the original worker should unpickle the file contents
1124         # though since it's the only one whose result matters.  The
1125         # original is also the only job that may delete result_file
1126         # from disk.  Note that the original may have been cancelled
1127         # if one of the backups finished first; it still must read the
1128         # result from disk.  It still does that here with is_cancelled
1129         # set.
1130         if is_original:
1131             logger.debug("%s: Unpickling %s.", bundle, result_file)
1132             try:
1133                 with open(result_file, 'rb') as rb:
1134                     serialized = rb.read()
1135                 result = cloudpickle.loads(serialized)
1136             except Exception as e:
1137                 logger.exception(e)
1138                 logger.error('Failed to load %s... this is bad news.', result_file)
1139                 self._release_worker(bundle)
1140
1141                 # Re-raise the exception; the code in _wait_for_process may
1142                 # decide to _emergency_retry_nasty_bundle here.
1143                 raise e
1144             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1145             os.remove(result_file)
1146             os.remove(code_file)
1147
1148             # Notify any backups that the original is done so they
1149             # should stop ASAP.  Do this whether or not we
1150             # finished first since there could be more than one
1151             # backup.
1152             if bundle.backup_bundles is not None:
1153                 for backup in bundle.backup_bundles:
1154                     logger.debug(
1155                         '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1156                     )
1157                     backup.is_cancelled.set()
1158
1159         # This is a backup job and, by now, we have already fetched
1160         # the bundle results.
1161         else:
1162             # Backup results don't matter, they just need to leave the
1163             # result file in the right place for their originals to
1164             # read/unpickle later.
1165             result = None
1166
1167             # Tell the original to stop if we finished first.
1168             if not was_cancelled:
1169                 orig_bundle = bundle.src_bundle
1170                 assert orig_bundle is not None
1171                 logger.debug(
1172                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1173                 )
1174                 orig_bundle.is_cancelled.set()
1175         self._release_worker(bundle, was_cancelled=was_cancelled)
1176         return result
1177
1178     def _create_original_bundle(self, pickle, function_name: str):
1179         """Creates a bundle that is not a backup of any other bundle but
1180         rather represents a user task.
1181         """
1182
1183         uuid = string_utils.generate_uuid(omit_dashes=True)
1184         code_file = f'/tmp/{uuid}.code.bin'
1185         result_file = f'/tmp/{uuid}.result.bin'
1186
1187         logger.debug('Writing pickled code to %s', code_file)
1188         with open(code_file, 'wb') as wb:
1189             wb.write(pickle)
1190
1191         bundle = BundleDetails(
1192             pickled_code=pickle,
1193             uuid=uuid,
1194             function_name=function_name,
1195             worker=None,
1196             username=None,
1197             machine=None,
1198             hostname=platform.node(),
1199             code_file=code_file,
1200             result_file=result_file,
1201             pid=0,
1202             start_ts=time.time(),
1203             end_ts=0.0,
1204             slower_than_local_p95=False,
1205             slower_than_global_p95=False,
1206             src_bundle=None,
1207             is_cancelled=threading.Event(),
1208             was_cancelled=False,
1209             backup_bundles=[],
1210             failure_count=0,
1211         )
1212         self.status.record_bundle_details(bundle)
1213         logger.debug('%s: Created an original bundle', bundle)
1214         return bundle
1215
1216     def _create_backup_bundle(self, src_bundle: BundleDetails):
1217         """Creates a bundle that is a backup of another bundle that is
1218         running too slowly."""
1219
1220         assert self.status.lock.locked()
1221         assert src_bundle.backup_bundles is not None
1222         n = len(src_bundle.backup_bundles)
1223         uuid = src_bundle.uuid + f'_backup#{n}'
1224
1225         backup_bundle = BundleDetails(
1226             pickled_code=src_bundle.pickled_code,
1227             uuid=uuid,
1228             function_name=src_bundle.function_name,
1229             worker=None,
1230             username=None,
1231             machine=None,
1232             hostname=src_bundle.hostname,
1233             code_file=src_bundle.code_file,
1234             result_file=src_bundle.result_file,
1235             pid=0,
1236             start_ts=time.time(),
1237             end_ts=0.0,
1238             slower_than_local_p95=False,
1239             slower_than_global_p95=False,
1240             src_bundle=src_bundle,
1241             is_cancelled=threading.Event(),
1242             was_cancelled=False,
1243             backup_bundles=None,  # backup backups not allowed
1244             failure_count=0,
1245         )
1246         src_bundle.backup_bundles.append(backup_bundle)
1247         self.status.record_bundle_details_already_locked(backup_bundle)
1248         logger.debug('%s: Created a backup bundle', backup_bundle)
1249         return backup_bundle
1250
1251     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1252         """Schedule a backup of src_bundle."""
1253
1254         assert self.status.lock.locked()
1255         assert src_bundle is not None
1256         backup_bundle = self._create_backup_bundle(src_bundle)
1257         logger.debug(
1258             '%s/%s: Scheduling backup for execution...',
1259             backup_bundle.uuid,
1260             backup_bundle.function_name,
1261         )
1262         self._helper_executor.submit(self._launch, backup_bundle)
1263
1264         # Results from backups don't matter; if they finish first
1265         # they will move the result_file to this machine and let
1266         # the original pick them up and unpickle them (and return
1267         # a result).
1268
1269     def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1270         """Something unexpectedly failed with bundle.  Either retry it
1271         from the beginning or throw in the towel and give up on it."""
1272
1273         is_original = bundle.src_bundle is None
1274         bundle.worker = None
1275         avoid_last_machine = bundle.machine
1276         bundle.machine = None
1277         bundle.username = None
1278         bundle.failure_count += 1
1279         if is_original:
1280             retry_limit = 3
1281         else:
1282             retry_limit = 2
1283
1284         if bundle.failure_count > retry_limit:
1285             logger.error(
1286                 '%s: Tried this bundle too many times already (%dx); giving up.',
1287                 bundle,
1288                 retry_limit,
1289             )
1290             if is_original:
1291                 raise RemoteExecutorException(
1292                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1293                 )
1294             else:
1295                 logger.error(
1296                     '%s: At least it\'s only a backup; better luck with the others.', bundle
1297                 )
1298             return None
1299         else:
1300             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1301             logger.warning(msg)
1302             warnings.warn(msg)
1303             return self._launch(bundle, avoid_last_machine)
1304
1305     @overrides
1306     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1307         """Submit work to be done.  This is the user entry point of this
1308         class."""
1309         if self.already_shutdown:
1310             raise Exception('Submitted work after shutdown.')
1311         pickle = _make_cloud_pickle(function, *args, **kwargs)
1312         bundle = self._create_original_bundle(pickle, function.__name__)
1313         self.total_bundles_submitted += 1
1314         return self._helper_executor.submit(self._launch, bundle)
1315
1316     @overrides
1317     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1318         """Shutdown the executor."""
1319         if not self.already_shutdown:
1320             logging.debug('Shutting down RemoteExecutor %s', self.title)
1321             self.heartbeat_stop_event.set()
1322             self.heartbeat_thread.join()
1323             self._helper_executor.shutdown(wait)
1324             if not quiet:
1325                 print(self.histogram.__repr__(label_formatter='%ds'))
1326             self.already_shutdown = True
1327
1328
1329 @singleton
1330 class DefaultExecutors(object):
1331     """A container for a default thread, process and remote executor.
1332     These are not created until needed and we take care to clean up
1333     before process exit automatically for the caller's convenience.
1334     Instead of creating your own executor, consider using the one
1335     from this pool.  e.g.::
1336
1337         @par.parallelize(method=par.Method.PROCESS)
1338         def do_work(
1339             solutions: List[Work],
1340             shard_num: int,
1341             ...
1342         ):
1343             <do the work>
1344
1345
1346         def start_do_work(all_work: List[Work]):
1347             shards = []
1348             logger.debug('Sharding work into groups of 10.')
1349             for subset in list_utils.shard(all_work, 10):
1350                 shards.append([x for x in subset])
1351
1352             logger.debug('Kicking off helper pool.')
1353             try:
1354                 for n, shard in enumerate(shards):
1355                     results.append(
1356                         do_work(
1357                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1358                         )
1359                     )
1360                 smart_future.wait_all(results)
1361             finally:
1362                 # Note: if you forget to do this it will clean itself up
1363                 # during program termination including tearing down any
1364                 # active ssh connections.
1365                 executors.DefaultExecutors().process_pool().shutdown()
1366     """
1367
1368     def __init__(self):
1369         self.thread_executor: Optional[ThreadExecutor] = None
1370         self.process_executor: Optional[ProcessExecutor] = None
1371         self.remote_executor: Optional[RemoteExecutor] = None
1372
1373     @staticmethod
1374     def _ping(host) -> bool:
1375         logger.debug('RUN> ping -c 1 %s', host)
1376         try:
1377             x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1378             return x == 0
1379         except Exception:
1380             return False
1381
1382     def thread_pool(self) -> ThreadExecutor:
1383         if self.thread_executor is None:
1384             self.thread_executor = ThreadExecutor()
1385         return self.thread_executor
1386
1387     def process_pool(self) -> ProcessExecutor:
1388         if self.process_executor is None:
1389             self.process_executor = ProcessExecutor()
1390         return self.process_executor
1391
1392     def remote_pool(self) -> RemoteExecutor:
1393         if self.remote_executor is None:
1394             logger.info('Looking for some helper machines...')
1395             pool: List[RemoteWorkerRecord] = []
1396             if self._ping('cheetah.house'):
1397                 logger.info('Found cheetah.house')
1398                 pool.append(
1399                     RemoteWorkerRecord(
1400                         username='scott',
1401                         machine='cheetah.house',
1402                         weight=24,
1403                         count=5,
1404                     ),
1405                 )
1406             if self._ping('meerkat.cabin'):
1407                 logger.info('Found meerkat.cabin')
1408                 pool.append(
1409                     RemoteWorkerRecord(
1410                         username='scott',
1411                         machine='meerkat.cabin',
1412                         weight=12,
1413                         count=2,
1414                     ),
1415                 )
1416             if self._ping('wannabe.house'):
1417                 logger.info('Found wannabe.house')
1418                 pool.append(
1419                     RemoteWorkerRecord(
1420                         username='scott',
1421                         machine='wannabe.house',
1422                         weight=14,
1423                         count=2,
1424                     ),
1425                 )
1426             if self._ping('puma.cabin'):
1427                 logger.info('Found puma.cabin')
1428                 pool.append(
1429                     RemoteWorkerRecord(
1430                         username='scott',
1431                         machine='puma.cabin',
1432                         weight=24,
1433                         count=5,
1434                     ),
1435                 )
1436             if self._ping('backup.house'):
1437                 logger.info('Found backup.house')
1438                 pool.append(
1439                     RemoteWorkerRecord(
1440                         username='scott',
1441                         machine='backup.house',
1442                         weight=9,
1443                         count=2,
1444                     ),
1445                 )
1446
1447             # The controller machine has a lot to do; go easy on it.
1448             for record in pool:
1449                 if record.machine == platform.node() and record.count > 1:
1450                     logger.info('Reducing workload for %s.', record.machine)
1451                     record.count = max(int(record.count / 2), 1)
1452
1453             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1454             policy.register_worker_pool(pool)
1455             self.remote_executor = RemoteExecutor(pool, policy)
1456         return self.remote_executor
1457
1458     def shutdown(self) -> None:
1459         if self.thread_executor is not None:
1460             self.thread_executor.shutdown(wait=True, quiet=True)
1461             self.thread_executor = None
1462         if self.process_executor is not None:
1463             self.process_executor.shutdown(wait=True, quiet=True)
1464             self.process_executor = None
1465         if self.remote_executor is not None:
1466             self.remote_executor.shutdown(wait=True, quiet=True)
1467             self.remote_executor = None