Dynamic config arguments stab #1.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
9 machines.
10
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
13
14 from __future__ import annotations
15 import concurrent.futures as fut
16 import logging
17 import os
18 import platform
19 import random
20 import subprocess
21 import threading
22 import time
23 import warnings
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
28
29 import cloudpickle  # type: ignore
30 import numpy
31 from overrides import overrides
32
33 import argparse_utils
34 import config
35 import histogram as hist
36 import string_utils
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_exitcode, cmd_in_background, run_silently
40 from thread_utils import background_thread
41
42 logger = logging.getLogger(__name__)
43
44 parser = config.add_commandline_args(
45     f"Executors ({__file__})", "Args related to processing executors."
46 )
47 parser.add_argument(
48     '--executors_threadpool_size',
49     type=int,
50     metavar='#THREADS',
51     help='Number of threads in the default threadpool, leave unset for default',
52     default=None,
53 )
54 parser.add_argument(
55     '--executors_processpool_size',
56     type=int,
57     metavar='#PROCESSES',
58     help='Number of processes in the default processpool, leave unset for default',
59     default=None,
60 )
61 parser.add_argument(
62     '--executors_schedule_remote_backups',
63     default=True,
64     action=argparse_utils.ActionNoYes,
65     help='Should we schedule duplicative backup work if a remote bundle is slow',
66 )
67 parser.add_argument(
68     '--executors_max_bundle_failures',
69     type=int,
70     default=3,
71     metavar='#FAILURES',
72     help='Maximum number of failures before giving up on a bundle',
73 )
74
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
77
78
79 def _make_cloud_pickle(fun, *args, **kwargs):
80     """Internal helper to create cloud pickles."""
81     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
82     return cloudpickle.dumps((fun, args, kwargs))
83
84
85 class BaseExecutor(ABC):
86     """The base executor interface definition.  The interface for
87     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
88     :class:`ThreadExecutor`.
89     """
90
91     def __init__(self, *, title=''):
92         self.title = title
93         self.histogram = hist.SimpleHistogram(
94             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
95         )
96         self.task_count = 0
97
98     @abstractmethod
99     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
100         pass
101
102     @abstractmethod
103     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
104         pass
105
106     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
107         """Shutdown the executor and return True if the executor is idle
108         (i.e. there are no pending or active tasks).  Return False
109         otherwise.  Note: this should only be called by the launcher
110         process.
111
112         """
113         if self.task_count == 0:
114             self.shutdown(wait=True, quiet=quiet)
115             return True
116         return False
117
118     def adjust_task_count(self, delta: int) -> None:
119         """Change the task count.  Note: do not call this method from a
120         worker, it should only be called by the launcher process /
121         thread / machine.
122
123         """
124         self.task_count += delta
125         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
126
127     def get_task_count(self) -> int:
128         """Change the task count.  Note: do not call this method from a
129         worker, it should only be called by the launcher process /
130         thread / machine.
131
132         """
133         return self.task_count
134
135
136 class ThreadExecutor(BaseExecutor):
137     """A threadpool executor.  This executor uses python threads to
138     schedule tasks.  Note that, at least as of python3.10, because of
139     the global lock in the interpreter itself, these do not
140     parallelize very well so this class is useful mostly for non-CPU
141     intensive tasks.
142
143     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
144     """
145
146     def __init__(self, max_workers: Optional[int] = None):
147         super().__init__()
148         workers = None
149         if max_workers is not None:
150             workers = max_workers
151         elif 'executors_threadpool_size' in config.config:
152             workers = config.config['executors_threadpool_size']
153         if workers is not None:
154             logger.debug('Creating threadpool executor with %d workers', workers)
155         else:
156             logger.debug('Creating a default sized threadpool executor')
157         self._thread_pool_executor = fut.ThreadPoolExecutor(
158             max_workers=workers, thread_name_prefix="thread_executor_helper"
159         )
160         self.already_shutdown = False
161
162     # This is run on a different thread; do not adjust task count here.
163     @staticmethod
164     def run_local_bundle(fun, *args, **kwargs):
165         logger.debug("Running local bundle at %s", fun.__name__)
166         result = fun(*args, **kwargs)
167         return result
168
169     @overrides
170     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
171         if self.already_shutdown:
172             raise Exception('Submitted work after shutdown.')
173         self.adjust_task_count(+1)
174         newargs = []
175         newargs.append(function)
176         for arg in args:
177             newargs.append(arg)
178         start = time.time()
179         result = self._thread_pool_executor.submit(
180             ThreadExecutor.run_local_bundle, *newargs, **kwargs
181         )
182         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
183         result.add_done_callback(lambda _: self.adjust_task_count(-1))
184         return result
185
186     @overrides
187     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
188         if not self.already_shutdown:
189             logger.debug('Shutting down threadpool executor %s', self.title)
190             self._thread_pool_executor.shutdown(wait)
191             if not quiet:
192                 print(self.histogram.__repr__(label_formatter='%ds'))
193             self.already_shutdown = True
194
195
196 class ProcessExecutor(BaseExecutor):
197     """An executor which runs tasks in child processes.
198
199     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
200     """
201
202     def __init__(self, max_workers=None):
203         super().__init__()
204         workers = None
205         if max_workers is not None:
206             workers = max_workers
207         elif 'executors_processpool_size' in config.config:
208             workers = config.config['executors_processpool_size']
209         if workers is not None:
210             logger.debug('Creating processpool executor with %d workers.', workers)
211         else:
212             logger.debug('Creating a default sized processpool executor')
213         self._process_executor = fut.ProcessPoolExecutor(
214             max_workers=workers,
215         )
216         self.already_shutdown = False
217
218     # This is run in another process; do not adjust task count here.
219     @staticmethod
220     def run_cloud_pickle(pickle):
221         fun, args, kwargs = cloudpickle.loads(pickle)
222         logger.debug("Running pickled bundle at %s", fun.__name__)
223         result = fun(*args, **kwargs)
224         return result
225
226     @overrides
227     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
228         if self.already_shutdown:
229             raise Exception('Submitted work after shutdown.')
230         start = time.time()
231         self.adjust_task_count(+1)
232         pickle = _make_cloud_pickle(function, *args, **kwargs)
233         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
234         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
235         result.add_done_callback(lambda _: self.adjust_task_count(-1))
236         return result
237
238     @overrides
239     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
240         if not self.already_shutdown:
241             logger.debug('Shutting down processpool executor %s', self.title)
242             self._process_executor.shutdown(wait)
243             if not quiet:
244                 print(self.histogram.__repr__(label_formatter='%ds'))
245             self.already_shutdown = True
246
247     def __getstate__(self):
248         state = self.__dict__.copy()
249         state['_process_executor'] = None
250         return state
251
252
253 class RemoteExecutorException(Exception):
254     """Thrown when a bundle cannot be executed despite several retries."""
255
256     pass
257
258
259 @dataclass
260 class RemoteWorkerRecord:
261     """A record of info about a remote worker."""
262
263     username: str
264     """Username we can ssh into on this machine to run work."""
265
266     machine: str
267     """Machine address / name."""
268
269     weight: int
270     """Relative probability for the weighted policy to select this
271     machine for scheduling work."""
272
273     count: int
274     """If this machine is selected, what is the maximum number of task
275     that it can handle?"""
276
277     def __hash__(self):
278         return hash((self.username, self.machine))
279
280     def __repr__(self):
281         return f'{self.username}@{self.machine}'
282
283
284 @dataclass
285 class BundleDetails:
286     """All info necessary to define some unit of work that needs to be
287     done, where it is being run, its state, whether it is an original
288     bundle of a backup bundle, how many times it has failed, etc...
289     """
290
291     pickled_code: bytes
292     """The code to run, cloud pickled"""
293
294     uuid: str
295     """A unique identifier"""
296
297     function_name: str
298     """The name of the function we pickled"""
299
300     worker: Optional[RemoteWorkerRecord]
301     """The remote worker running this bundle or None if none (yet)"""
302
303     username: Optional[str]
304     """The remote username running this bundle or None if none (yet)"""
305
306     machine: Optional[str]
307     """The remote machine running this bundle or None if none (yet)"""
308
309     hostname: str
310     """The controller machine"""
311
312     code_file: str
313     """A unique filename to hold the work to be done"""
314
315     result_file: str
316     """Where the results should be placed / read from"""
317
318     pid: int
319     """The process id of the local subprocess watching the ssh connection
320     to the remote machine"""
321
322     start_ts: float
323     """Starting time"""
324
325     end_ts: float
326     """Ending time"""
327
328     slower_than_local_p95: bool
329     """Currently slower then 95% of other bundles on remote host"""
330
331     slower_than_global_p95: bool
332     """Currently slower than 95% of other bundles globally"""
333
334     src_bundle: Optional[BundleDetails]
335     """If this is a backup bundle, this points to the original bundle
336     that it's backing up.  None otherwise."""
337
338     is_cancelled: threading.Event
339     """An event that can be signaled to indicate this bundle is cancelled.
340     This is set when another copy (backup or original) of this work has
341     completed successfully elsewhere."""
342
343     was_cancelled: bool
344     """True if this bundle was cancelled, False if it finished normally"""
345
346     backup_bundles: Optional[List[BundleDetails]]
347     """If we've created backups of this bundle, this is the list of them"""
348
349     failure_count: int
350     """How many times has this bundle failed already?"""
351
352     def __repr__(self):
353         uuid = self.uuid
354         if uuid[-9:-2] == '_backup':
355             uuid = uuid[:-9]
356             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
357         else:
358             suffix = uuid[-6:]
359
360         # We colorize the uuid based on some bits from it to make them
361         # stand out in the logging and help a reader correlate log messages
362         # related to the same bundle.
363         colorz = [
364             fg('violet red'),
365             fg('red'),
366             fg('orange'),
367             fg('peach orange'),
368             fg('yellow'),
369             fg('marigold yellow'),
370             fg('green yellow'),
371             fg('tea green'),
372             fg('cornflower blue'),
373             fg('turquoise blue'),
374             fg('tropical blue'),
375             fg('lavender purple'),
376             fg('medium purple'),
377         ]
378         c = colorz[int(uuid[-2:], 16) % len(colorz)]
379         function_name = self.function_name if self.function_name is not None else 'nofname'
380         machine = self.machine if self.machine is not None else 'nomachine'
381         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
382
383
384 class RemoteExecutorStatus:
385     """A status 'scoreboard' for a remote executor tracking various
386     metrics and able to render a periodic dump of global state.
387     """
388
389     def __init__(self, total_worker_count: int) -> None:
390         """C'tor.
391
392         Args:
393             total_worker_count: number of workers in the pool
394
395         """
396         self.worker_count: int = total_worker_count
397         self.known_workers: Set[RemoteWorkerRecord] = set()
398         self.start_time: float = time.time()
399         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
400         self.end_per_bundle: Dict[str, float] = defaultdict(float)
401         self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
402         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
403         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
404         self.finished_bundle_timings: List[float] = []
405         self.last_periodic_dump: Optional[float] = None
406         self.total_bundles_submitted: int = 0
407
408         # Protects reads and modification using self.  Also used
409         # as a memory fence for modifications to bundle.
410         self.lock: threading.Lock = threading.Lock()
411
412     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
413         """Record that bundle with uuid is assigned to a particular worker.
414
415         Args:
416             worker: the record of the worker to which uuid is assigned
417             uuid: the uuid of a bundle that has been assigned to a worker
418         """
419         with self.lock:
420             self.record_acquire_worker_already_locked(worker, uuid)
421
422     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
423         """Same as above but an entry point that doesn't acquire the lock
424         for codepaths where it's already held."""
425         assert self.lock.locked()
426         self.known_workers.add(worker)
427         self.start_per_bundle[uuid] = None
428         x = self.in_flight_bundles_by_worker.get(worker, set())
429         x.add(uuid)
430         self.in_flight_bundles_by_worker[worker] = x
431
432     def record_bundle_details(self, details: BundleDetails) -> None:
433         """Register the details about a bundle of work."""
434         with self.lock:
435             self.record_bundle_details_already_locked(details)
436
437     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
438         """Same as above but for codepaths that already hold the lock."""
439         assert self.lock.locked()
440         self.bundle_details_by_uuid[details.uuid] = details
441
442     def record_release_worker(
443         self,
444         worker: RemoteWorkerRecord,
445         uuid: str,
446         was_cancelled: bool,
447     ) -> None:
448         """Record that a bundle has released a worker."""
449         with self.lock:
450             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
451
452     def record_release_worker_already_locked(
453         self,
454         worker: RemoteWorkerRecord,
455         uuid: str,
456         was_cancelled: bool,
457     ) -> None:
458         """Same as above but for codepaths that already hold the lock."""
459         assert self.lock.locked()
460         ts = time.time()
461         self.end_per_bundle[uuid] = ts
462         self.in_flight_bundles_by_worker[worker].remove(uuid)
463         if not was_cancelled:
464             start = self.start_per_bundle[uuid]
465             assert start is not None
466             bundle_latency = ts - start
467             x = self.finished_bundle_timings_per_worker.get(worker, [])
468             x.append(bundle_latency)
469             self.finished_bundle_timings_per_worker[worker] = x
470             self.finished_bundle_timings.append(bundle_latency)
471
472     def record_processing_began(self, uuid: str):
473         """Record when work on a bundle begins."""
474         with self.lock:
475             self.start_per_bundle[uuid] = time.time()
476
477     def total_in_flight(self) -> int:
478         """How many bundles are in flight currently?"""
479         assert self.lock.locked()
480         total_in_flight = 0
481         for worker in self.known_workers:
482             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
483         return total_in_flight
484
485     def total_idle(self) -> int:
486         """How many idle workers are there currently?"""
487         assert self.lock.locked()
488         return self.worker_count - self.total_in_flight()
489
490     def __repr__(self):
491         assert self.lock.locked()
492         ts = time.time()
493         total_finished = len(self.finished_bundle_timings)
494         total_in_flight = self.total_in_flight()
495         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
496         qall = None
497         if len(self.finished_bundle_timings) > 1:
498             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
499             ret += (
500                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
501                 f'✅={total_finished}/{self.total_bundles_submitted}, '
502                 f'💻n={total_in_flight}/{self.worker_count}\n'
503             )
504         else:
505             ret += (
506                 f'⏱={ts-self.start_time:.1f}s, '
507                 f'✅={total_finished}/{self.total_bundles_submitted}, '
508                 f'💻n={total_in_flight}/{self.worker_count}\n'
509             )
510
511         for worker in self.known_workers:
512             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
513             timings = self.finished_bundle_timings_per_worker.get(worker, [])
514             count = len(timings)
515             qworker = None
516             if count > 1:
517                 qworker = numpy.quantile(timings, [0.5, 0.95])
518                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
519             else:
520                 ret += '\n'
521             if count > 0:
522                 ret += f'    ...finished {count} total bundle(s) so far\n'
523             in_flight = len(self.in_flight_bundles_by_worker[worker])
524             if in_flight > 0:
525                 ret += f'    ...{in_flight} bundles currently in flight:\n'
526                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
527                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
528                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
529                     if self.start_per_bundle[bundle_uuid] is not None:
530                         sec = ts - self.start_per_bundle[bundle_uuid]
531                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
532                     else:
533                         ret += f'       {details} setting up / copying data...'
534                         sec = 0.0
535
536                     if qworker is not None:
537                         if sec > qworker[1]:
538                             ret += f'{bg("red")}>💻p95{reset()} '
539                             if details is not None:
540                                 details.slower_than_local_p95 = True
541                         else:
542                             if details is not None:
543                                 details.slower_than_local_p95 = False
544
545                     if qall is not None:
546                         if sec > qall[1]:
547                             ret += f'{bg("red")}>∀p95{reset()} '
548                             if details is not None:
549                                 details.slower_than_global_p95 = True
550                         else:
551                             details.slower_than_global_p95 = False
552                     ret += '\n'
553         return ret
554
555     def periodic_dump(self, total_bundles_submitted: int) -> None:
556         assert self.lock.locked()
557         self.total_bundles_submitted = total_bundles_submitted
558         ts = time.time()
559         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
560             print(self)
561             self.last_periodic_dump = ts
562
563
564 class RemoteWorkerSelectionPolicy(ABC):
565     """A policy for selecting a remote worker base class."""
566
567     def __init__(self):
568         self.workers: Optional[List[RemoteWorkerRecord]] = None
569
570     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
571         self.workers = workers
572
573     @abstractmethod
574     def is_worker_available(self) -> bool:
575         pass
576
577     @abstractmethod
578     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
579         pass
580
581
582 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
583     """A remote worker selector that uses weighted RNG."""
584
585     @overrides
586     def is_worker_available(self) -> bool:
587         if self.workers:
588             for worker in self.workers:
589                 if worker.count > 0:
590                     return True
591         return False
592
593     @overrides
594     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
595         grabbag = []
596         if self.workers:
597             for worker in self.workers:
598                 if worker.machine != machine_to_avoid:
599                     if worker.count > 0:
600                         for _ in range(worker.count * worker.weight):
601                             grabbag.append(worker)
602
603         if len(grabbag) == 0:
604             logger.debug('There are no available workers that avoid %s', machine_to_avoid)
605             if self.workers:
606                 for worker in self.workers:
607                     if worker.count > 0:
608                         for _ in range(worker.count * worker.weight):
609                             grabbag.append(worker)
610
611         if len(grabbag) == 0:
612             logger.warning('There are no available workers?!')
613             return None
614
615         worker = random.sample(grabbag, 1)[0]
616         assert worker.count > 0
617         worker.count -= 1
618         logger.debug('Selected worker %s', worker)
619         return worker
620
621
622 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
623     """A remote worker selector that just round robins."""
624
625     def __init__(self) -> None:
626         super().__init__()
627         self.index = 0
628
629     @overrides
630     def is_worker_available(self) -> bool:
631         if self.workers:
632             for worker in self.workers:
633                 if worker.count > 0:
634                     return True
635         return False
636
637     @overrides
638     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
639         if self.workers:
640             x = self.index
641             while True:
642                 worker = self.workers[x]
643                 if worker.count > 0:
644                     worker.count -= 1
645                     x += 1
646                     if x >= len(self.workers):
647                         x = 0
648                     self.index = x
649                     logger.debug('Selected worker %s', worker)
650                     return worker
651                 x += 1
652                 if x >= len(self.workers):
653                     x = 0
654                 if x == self.index:
655                     logger.warning('Unexpectedly could not find a worker, retrying...')
656                     return None
657         return None
658
659
660 class RemoteExecutor(BaseExecutor):
661     """An executor that uses processes on remote machines to do work.  This
662     works by creating "bundles" of work with pickled code in each to be
663     executed.  Each bundle is assigned a remote worker based on some policy
664     heuristics.  Once assigned to a remote worker, a local subprocess is
665     created.  It copies the pickled code to the remote machine via ssh/scp
666     and then starts up work on the remote machine again using ssh.  When
667     the work is complete it copies the results back to the local machine.
668
669     So there is essentially one "controller" machine (which may also be
670     in the remote executor pool and therefore do task work in addition to
671     controlling) and N worker machines.  This code runs on the controller
672     whereas on the worker machines we invoke pickled user code via a
673     shim in :file:`remote_worker.py`.
674
675     Some redundancy and safety provisions are made; e.g. slower than
676     expected tasks have redundant backups created and if a task fails
677     repeatedly we consider it poisoned and give up on it.
678
679     .. warning::
680
681         The network overhead / latency of copying work from the
682         controller machine to the remote workers is relatively high.
683         This executor probably only makes sense to use with
684         computationally expensive tasks such as jobs that will execute
685         for ~30 seconds or longer.
686
687     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
688     """
689
690     def __init__(
691         self,
692         workers: List[RemoteWorkerRecord],
693         policy: RemoteWorkerSelectionPolicy,
694     ) -> None:
695         """C'tor.
696
697         Args:
698             workers: A list of remote workers we can call on to do tasks.
699             policy: A policy for selecting remote workers for tasks.
700         """
701
702         super().__init__()
703         self.workers = workers
704         self.policy = policy
705         self.worker_count = 0
706         for worker in self.workers:
707             self.worker_count += worker.count
708         if self.worker_count <= 0:
709             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
710             logger.critical(msg)
711             raise RemoteExecutorException(msg)
712         self.policy.register_worker_pool(self.workers)
713         self.cv = threading.Condition()
714         logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
715         self._helper_executor = fut.ThreadPoolExecutor(
716             thread_name_prefix="remote_executor_helper",
717             max_workers=self.worker_count,
718         )
719         self.status = RemoteExecutorStatus(self.worker_count)
720         self.total_bundles_submitted = 0
721         self.backup_lock = threading.Lock()
722         self.last_backup = None
723         (
724             self.heartbeat_thread,
725             self.heartbeat_stop_event,
726         ) = self._run_periodic_heartbeat()
727         self.already_shutdown = False
728
729     @background_thread
730     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
731         """
732         We create a background thread to invoke :meth:`_heartbeat` regularly
733         while we are scheduling work.  It does some accounting such as
734         looking for slow bundles to tag for backup creation, checking for
735         unexpected failures, and printing a fancy message on stdout.
736         """
737         while not stop_event.is_set():
738             time.sleep(5.0)
739             logger.debug('Running periodic heartbeat code...')
740             self._heartbeat()
741         logger.debug('Periodic heartbeat thread shutting down.')
742
743     def _heartbeat(self) -> None:
744         # Note: this is invoked on a background thread, not an
745         # executor thread.  Be careful what you do with it b/c it
746         # needs to get back and dump status again periodically.
747         with self.status.lock:
748             self.status.periodic_dump(self.total_bundles_submitted)
749
750             # Look for bundles to reschedule via executor.submit
751             if config.config['executors_schedule_remote_backups']:
752                 self._maybe_schedule_backup_bundles()
753
754     def _maybe_schedule_backup_bundles(self):
755         """Maybe schedule backup bundles if we see a very slow bundle."""
756
757         assert self.status.lock.locked()
758         num_done = len(self.status.finished_bundle_timings)
759         num_idle_workers = self.worker_count - self.task_count
760         now = time.time()
761         if (
762             num_done >= 2
763             and num_idle_workers > 0
764             and (self.last_backup is None or (now - self.last_backup > 9.0))
765             and self.backup_lock.acquire(blocking=False)
766         ):
767             try:
768                 assert self.backup_lock.locked()
769
770                 bundle_to_backup = None
771                 best_score = None
772                 for (
773                     worker,
774                     bundle_uuids,
775                 ) in self.status.in_flight_bundles_by_worker.items():
776
777                     # Prefer to schedule backups of bundles running on
778                     # slower machines.
779                     base_score = 0
780                     for record in self.workers:
781                         if worker.machine == record.machine:
782                             base_score = float(record.weight)
783                             base_score = 1.0 / base_score
784                             base_score *= 200.0
785                             base_score = int(base_score)
786                             break
787
788                     for uuid in bundle_uuids:
789                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
790                         if (
791                             bundle is not None
792                             and bundle.src_bundle is None
793                             and bundle.backup_bundles is not None
794                         ):
795                             score = base_score
796
797                             # Schedule backups of bundles running
798                             # longer; especially those that are
799                             # unexpectedly slow.
800                             start_ts = self.status.start_per_bundle[uuid]
801                             if start_ts is not None:
802                                 runtime = now - start_ts
803                                 score += runtime
804                                 logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
805
806                                 if bundle.slower_than_local_p95:
807                                     score += runtime / 2
808                                     logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
809
810                                 if bundle.slower_than_global_p95:
811                                     score += runtime / 4
812                                     logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
813
814                             # Prefer backups of bundles that don't
815                             # have backups already.
816                             backup_count = len(bundle.backup_bundles)
817                             if backup_count == 0:
818                                 score *= 2
819                             elif backup_count == 1:
820                                 score /= 2
821                             elif backup_count == 2:
822                                 score /= 8
823                             else:
824                                 score = 0
825                             logger.debug(
826                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
827                                 bundle,
828                                 score,
829                             )
830
831                             if score != 0 and (best_score is None or score > best_score):
832                                 bundle_to_backup = bundle
833                                 assert bundle is not None
834                                 assert bundle.backup_bundles is not None
835                                 assert bundle.src_bundle is None
836                                 best_score = score
837
838                 # Note: this is all still happening on the heartbeat
839                 # runner thread.  That's ok because
840                 # _schedule_backup_for_bundle uses the executor to
841                 # submit the bundle again which will cause it to be
842                 # picked up by a worker thread and allow this thread
843                 # to return to run future heartbeats.
844                 if bundle_to_backup is not None:
845                     self.last_backup = now
846                     logger.info(
847                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
848                         bundle_to_backup,
849                         best_score,
850                     )
851                     self._schedule_backup_for_bundle(bundle_to_backup)
852             finally:
853                 self.backup_lock.release()
854
855     def _is_worker_available(self) -> bool:
856         """Is there a worker available currently?"""
857         return self.policy.is_worker_available()
858
859     def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
860         """Try to acquire a worker."""
861         return self.policy.acquire_worker(machine_to_avoid)
862
863     def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
864         """Find a worker or block until one becomes available."""
865         with self.cv:
866             while not self._is_worker_available():
867                 self.cv.wait()
868             worker = self._acquire_worker(machine_to_avoid)
869             if worker is not None:
870                 return worker
871         msg = "We should never reach this point in the code"
872         logger.critical(msg)
873         raise Exception(msg)
874
875     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
876         """Release a previously acquired worker."""
877         worker = bundle.worker
878         assert worker is not None
879         logger.debug('Released worker %s', worker)
880         self.status.record_release_worker(
881             worker,
882             bundle.uuid,
883             was_cancelled,
884         )
885         with self.cv:
886             worker.count += 1
887             self.cv.notify()
888         self.adjust_task_count(-1)
889
890     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
891         """See if a particular bundle is cancelled.  Do not block."""
892         with self.status.lock:
893             if bundle.is_cancelled.wait(timeout=0.0):
894                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
895                 bundle.was_cancelled = True
896                 return True
897         return False
898
899     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
900         """Find a worker for bundle or block until one is available."""
901
902         self.adjust_task_count(+1)
903         uuid = bundle.uuid
904         hostname = bundle.hostname
905         avoid_machine = override_avoid_machine
906         is_original = bundle.src_bundle is None
907
908         # Try not to schedule a backup on the same host as the original.
909         if avoid_machine is None and bundle.src_bundle is not None:
910             avoid_machine = bundle.src_bundle.machine
911         worker = None
912         while worker is None:
913             worker = self._find_available_worker_or_block(avoid_machine)
914         assert worker is not None
915
916         # Ok, found a worker.
917         bundle.worker = worker
918         machine = bundle.machine = worker.machine
919         username = bundle.username = worker.username
920         self.status.record_acquire_worker(worker, uuid)
921         logger.debug('%s: Running bundle on %s...', bundle, worker)
922
923         # Before we do any work, make sure the bundle is still viable.
924         # It may have been some time between when it was submitted and
925         # now due to lack of worker availability and someone else may
926         # have already finished it.
927         if self._check_if_cancelled(bundle):
928             try:
929                 return self._process_work_result(bundle)
930             except Exception as e:
931                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
932                 self._release_worker(bundle)
933                 if is_original:
934                     # Weird.  We are the original owner of this
935                     # bundle.  For it to have been cancelled, a backup
936                     # must have already started and completed before
937                     # we even for started.  Moreover, the backup says
938                     # it is done but we can't find the results it
939                     # should have copied over.  Reschedule the whole
940                     # thing.
941                     logger.exception(e)
942                     logger.error(
943                         '%s: We are the original owner thread and yet there are '
944                         'no results for this bundle.  This is unexpected and bad.',
945                         bundle,
946                     )
947                     return self._emergency_retry_nasty_bundle(bundle)
948                 else:
949                     # We're a backup and our bundle is cancelled
950                     # before we even got started.  Do nothing and let
951                     # the original bundle's thread worry about either
952                     # finding the results or complaining about it.
953                     return None
954
955         # Send input code / data to worker machine if it's not local.
956         if hostname not in machine:
957             try:
958                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
959                 start_ts = time.time()
960                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
961                 run_silently(cmd)
962                 xfer_latency = time.time() - start_ts
963                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
964             except Exception as e:
965                 self._release_worker(bundle)
966                 if is_original:
967                     # Weird.  We tried to copy the code to the worker
968                     # and it failed...  And we're the original bundle.
969                     # We have to retry.
970                     logger.exception(e)
971                     logger.error(
972                         "%s: Failed to send instructions to the worker machine?! "
973                         "This is not expected; we\'re the original bundle so this shouldn\'t "
974                         "be a race condition.  Attempting an emergency retry...",
975                         bundle,
976                     )
977                     return self._emergency_retry_nasty_bundle(bundle)
978                 else:
979                     # This is actually expected; we're a backup.
980                     # There's a race condition where someone else
981                     # already finished the work and removed the source
982                     # code_file before we could copy it.  Ignore.
983                     logger.warning(
984                         '%s: Failed to send instructions to the worker machine... '
985                         'We\'re a backup and this may be caused by the original (or '
986                         'some other backup) already finishing this work.  Ignoring.',
987                         bundle,
988                     )
989                     return None
990
991         # Kick off the work.  Note that if this fails we let
992         # _wait_for_process deal with it.
993         self.status.record_processing_began(uuid)
994         cmd = (
995             f'{SSH} {bundle.username}@{bundle.machine} '
996             f'"source py39-venv/bin/activate &&'
997             f' /home/scott/lib/python_modules/remote_worker.py'
998             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
999         )
1000         logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
1001         p = cmd_in_background(cmd, silent=True)
1002         bundle.pid = p.pid
1003         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
1004         return self._wait_for_process(p, bundle, 0)
1005
1006     def _wait_for_process(
1007         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1008     ) -> Any:
1009         """At this point we've copied the bundle's pickled code to the remote
1010         worker and started an ssh process that should be invoking the
1011         remote worker to have it execute the user's code.  See how
1012         that's going and wait for it to complete or fail.  Note that
1013         this code is recursive: there are codepaths where we decide to
1014         stop waiting for an ssh process (because another backup seems
1015         to have finished) but then fail to fetch or parse the results
1016         from that backup and thus call ourselves to continue waiting
1017         on an active ssh process.  This is the purpose of the depth
1018         argument: to curtail potential infinite recursion by giving up
1019         eventually.
1020
1021         Args:
1022             p: the Popen record of the ssh job
1023             bundle: the bundle of work being executed remotely
1024             depth: how many retries we've made so far.  Starts at zero.
1025
1026         """
1027
1028         machine = bundle.machine
1029         assert p is not None
1030         pid = p.pid  # pid of the ssh process
1031         if depth > 3:
1032             logger.error(
1033                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1034             )
1035             p.terminate()
1036             self._release_worker(bundle)
1037             return self._emergency_retry_nasty_bundle(bundle)
1038
1039         # Spin until either the ssh job we scheduled finishes the
1040         # bundle or some backup worker signals that they finished it
1041         # before we could.
1042         while True:
1043             try:
1044                 p.wait(timeout=0.25)
1045             except subprocess.TimeoutExpired:
1046                 if self._check_if_cancelled(bundle):
1047                     logger.info('%s: looks like another worker finished bundle...', bundle)
1048                     break
1049             else:
1050                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1051                 p = None
1052                 break
1053
1054         # If we get here we believe the bundle is done; either the ssh
1055         # subprocess finished (hopefully successfully) or we noticed
1056         # that some other worker seems to have completed the bundle
1057         # before us and we're bailing out.
1058         try:
1059             ret = self._process_work_result(bundle)
1060             if ret is not None and p is not None:
1061                 p.terminate()
1062             return ret
1063
1064         # Something went wrong; e.g. we could not copy the results
1065         # back, cleanup after ourselves on the remote machine, or
1066         # unpickle the results we got from the remove machine.  If we
1067         # still have an active ssh subprocess, keep waiting on it.
1068         # Otherwise, time for an emergency reschedule.
1069         except Exception as e:
1070             logger.exception(e)
1071             logger.error('%s: Something unexpected just happened...', bundle)
1072             if p is not None:
1073                 logger.warning(
1074                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1075                 )
1076                 return self._wait_for_process(p, bundle, depth + 1)
1077             else:
1078                 self._release_worker(bundle)
1079                 return self._emergency_retry_nasty_bundle(bundle)
1080
1081     def _process_work_result(self, bundle: BundleDetails) -> Any:
1082         """A bundle seems to be completed.  Check on the results."""
1083
1084         with self.status.lock:
1085             is_original = bundle.src_bundle is None
1086             was_cancelled = bundle.was_cancelled
1087             username = bundle.username
1088             machine = bundle.machine
1089             result_file = bundle.result_file
1090             code_file = bundle.code_file
1091
1092             # Whether original or backup, if we finished first we must
1093             # fetch the results if the computation happened on a
1094             # remote machine.
1095             bundle.end_ts = time.time()
1096             if not was_cancelled:
1097                 assert bundle.machine is not None
1098                 if bundle.hostname not in bundle.machine:
1099                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1100                     logger.info(
1101                         "%s: Fetching results back from %s@%s via %s",
1102                         bundle,
1103                         username,
1104                         machine,
1105                         cmd,
1106                     )
1107
1108                     # If either of these throw they are handled in
1109                     # _wait_for_process.
1110                     attempts = 0
1111                     while True:
1112                         try:
1113                             run_silently(cmd)
1114                         except Exception as e:
1115                             attempts += 1
1116                             if attempts >= 3:
1117                                 raise e
1118                         else:
1119                             break
1120
1121                     # Cleanup remote /tmp files.
1122                     run_silently(
1123                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1124                     )
1125                     logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1126                 dur = bundle.end_ts - bundle.start_ts
1127                 self.histogram.add_item(dur)
1128
1129         # Only the original worker should unpickle the file contents
1130         # though since it's the only one whose result matters.  The
1131         # original is also the only job that may delete result_file
1132         # from disk.  Note that the original may have been cancelled
1133         # if one of the backups finished first; it still must read the
1134         # result from disk.  It still does that here with is_cancelled
1135         # set.
1136         if is_original:
1137             logger.debug("%s: Unpickling %s.", bundle, result_file)
1138             try:
1139                 with open(result_file, 'rb') as rb:
1140                     serialized = rb.read()
1141                 result = cloudpickle.loads(serialized)
1142             except Exception as e:
1143                 logger.exception(e)
1144                 logger.error('Failed to load %s... this is bad news.', result_file)
1145                 self._release_worker(bundle)
1146
1147                 # Re-raise the exception; the code in _wait_for_process may
1148                 # decide to _emergency_retry_nasty_bundle here.
1149                 raise e
1150             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1151             os.remove(result_file)
1152             os.remove(code_file)
1153
1154             # Notify any backups that the original is done so they
1155             # should stop ASAP.  Do this whether or not we
1156             # finished first since there could be more than one
1157             # backup.
1158             if bundle.backup_bundles is not None:
1159                 for backup in bundle.backup_bundles:
1160                     logger.debug(
1161                         '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1162                     )
1163                     backup.is_cancelled.set()
1164
1165         # This is a backup job and, by now, we have already fetched
1166         # the bundle results.
1167         else:
1168             # Backup results don't matter, they just need to leave the
1169             # result file in the right place for their originals to
1170             # read/unpickle later.
1171             result = None
1172
1173             # Tell the original to stop if we finished first.
1174             if not was_cancelled:
1175                 orig_bundle = bundle.src_bundle
1176                 assert orig_bundle is not None
1177                 logger.debug(
1178                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1179                 )
1180                 orig_bundle.is_cancelled.set()
1181         self._release_worker(bundle, was_cancelled=was_cancelled)
1182         return result
1183
1184     def _create_original_bundle(self, pickle, function_name: str):
1185         """Creates a bundle that is not a backup of any other bundle but
1186         rather represents a user task.
1187         """
1188
1189         uuid = string_utils.generate_uuid(omit_dashes=True)
1190         code_file = f'/tmp/{uuid}.code.bin'
1191         result_file = f'/tmp/{uuid}.result.bin'
1192
1193         logger.debug('Writing pickled code to %s', code_file)
1194         with open(code_file, 'wb') as wb:
1195             wb.write(pickle)
1196
1197         bundle = BundleDetails(
1198             pickled_code=pickle,
1199             uuid=uuid,
1200             function_name=function_name,
1201             worker=None,
1202             username=None,
1203             machine=None,
1204             hostname=platform.node(),
1205             code_file=code_file,
1206             result_file=result_file,
1207             pid=0,
1208             start_ts=time.time(),
1209             end_ts=0.0,
1210             slower_than_local_p95=False,
1211             slower_than_global_p95=False,
1212             src_bundle=None,
1213             is_cancelled=threading.Event(),
1214             was_cancelled=False,
1215             backup_bundles=[],
1216             failure_count=0,
1217         )
1218         self.status.record_bundle_details(bundle)
1219         logger.debug('%s: Created an original bundle', bundle)
1220         return bundle
1221
1222     def _create_backup_bundle(self, src_bundle: BundleDetails):
1223         """Creates a bundle that is a backup of another bundle that is
1224         running too slowly."""
1225
1226         assert self.status.lock.locked()
1227         assert src_bundle.backup_bundles is not None
1228         n = len(src_bundle.backup_bundles)
1229         uuid = src_bundle.uuid + f'_backup#{n}'
1230
1231         backup_bundle = BundleDetails(
1232             pickled_code=src_bundle.pickled_code,
1233             uuid=uuid,
1234             function_name=src_bundle.function_name,
1235             worker=None,
1236             username=None,
1237             machine=None,
1238             hostname=src_bundle.hostname,
1239             code_file=src_bundle.code_file,
1240             result_file=src_bundle.result_file,
1241             pid=0,
1242             start_ts=time.time(),
1243             end_ts=0.0,
1244             slower_than_local_p95=False,
1245             slower_than_global_p95=False,
1246             src_bundle=src_bundle,
1247             is_cancelled=threading.Event(),
1248             was_cancelled=False,
1249             backup_bundles=None,  # backup backups not allowed
1250             failure_count=0,
1251         )
1252         src_bundle.backup_bundles.append(backup_bundle)
1253         self.status.record_bundle_details_already_locked(backup_bundle)
1254         logger.debug('%s: Created a backup bundle', backup_bundle)
1255         return backup_bundle
1256
1257     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1258         """Schedule a backup of src_bundle."""
1259
1260         assert self.status.lock.locked()
1261         assert src_bundle is not None
1262         backup_bundle = self._create_backup_bundle(src_bundle)
1263         logger.debug(
1264             '%s/%s: Scheduling backup for execution...',
1265             backup_bundle.uuid,
1266             backup_bundle.function_name,
1267         )
1268         self._helper_executor.submit(self._launch, backup_bundle)
1269
1270         # Results from backups don't matter; if they finish first
1271         # they will move the result_file to this machine and let
1272         # the original pick them up and unpickle them (and return
1273         # a result).
1274
1275     def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1276         """Something unexpectedly failed with bundle.  Either retry it
1277         from the beginning or throw in the towel and give up on it."""
1278
1279         is_original = bundle.src_bundle is None
1280         bundle.worker = None
1281         avoid_last_machine = bundle.machine
1282         bundle.machine = None
1283         bundle.username = None
1284         bundle.failure_count += 1
1285         if is_original:
1286             retry_limit = 3
1287         else:
1288             retry_limit = 2
1289
1290         if bundle.failure_count > retry_limit:
1291             logger.error(
1292                 '%s: Tried this bundle too many times already (%dx); giving up.',
1293                 bundle,
1294                 retry_limit,
1295             )
1296             if is_original:
1297                 raise RemoteExecutorException(
1298                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1299                 )
1300             else:
1301                 logger.error(
1302                     '%s: At least it\'s only a backup; better luck with the others.', bundle
1303                 )
1304             return None
1305         else:
1306             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1307             logger.warning(msg)
1308             warnings.warn(msg)
1309             return self._launch(bundle, avoid_last_machine)
1310
1311     @overrides
1312     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1313         """Submit work to be done.  This is the user entry point of this
1314         class."""
1315         if self.already_shutdown:
1316             raise Exception('Submitted work after shutdown.')
1317         pickle = _make_cloud_pickle(function, *args, **kwargs)
1318         bundle = self._create_original_bundle(pickle, function.__name__)
1319         self.total_bundles_submitted += 1
1320         return self._helper_executor.submit(self._launch, bundle)
1321
1322     @overrides
1323     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1324         """Shutdown the executor."""
1325         if not self.already_shutdown:
1326             logging.debug('Shutting down RemoteExecutor %s', self.title)
1327             self.heartbeat_stop_event.set()
1328             self.heartbeat_thread.join()
1329             self._helper_executor.shutdown(wait)
1330             if not quiet:
1331                 print(self.histogram.__repr__(label_formatter='%ds'))
1332             self.already_shutdown = True
1333
1334
1335 @singleton
1336 class DefaultExecutors(object):
1337     """A container for a default thread, process and remote executor.
1338     These are not created until needed and we take care to clean up
1339     before process exit automatically for the caller's convenience.
1340     Instead of creating your own executor, consider using the one
1341     from this pool.  e.g.::
1342
1343         @par.parallelize(method=par.Method.PROCESS)
1344         def do_work(
1345             solutions: List[Work],
1346             shard_num: int,
1347             ...
1348         ):
1349             <do the work>
1350
1351
1352         def start_do_work(all_work: List[Work]):
1353             shards = []
1354             logger.debug('Sharding work into groups of 10.')
1355             for subset in list_utils.shard(all_work, 10):
1356                 shards.append([x for x in subset])
1357
1358             logger.debug('Kicking off helper pool.')
1359             try:
1360                 for n, shard in enumerate(shards):
1361                     results.append(
1362                         do_work(
1363                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1364                         )
1365                     )
1366                 smart_future.wait_all(results)
1367             finally:
1368                 # Note: if you forget to do this it will clean itself up
1369                 # during program termination including tearing down any
1370                 # active ssh connections.
1371                 executors.DefaultExecutors().process_pool().shutdown()
1372     """
1373
1374     def __init__(self):
1375         self.thread_executor: Optional[ThreadExecutor] = None
1376         self.process_executor: Optional[ProcessExecutor] = None
1377         self.remote_executor: Optional[RemoteExecutor] = None
1378
1379     @staticmethod
1380     def _ping(host) -> bool:
1381         logger.debug('RUN> ping -c 1 %s', host)
1382         try:
1383             x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1384             return x == 0
1385         except Exception:
1386             return False
1387
1388     def thread_pool(self) -> ThreadExecutor:
1389         if self.thread_executor is None:
1390             self.thread_executor = ThreadExecutor()
1391         return self.thread_executor
1392
1393     def process_pool(self) -> ProcessExecutor:
1394         if self.process_executor is None:
1395             self.process_executor = ProcessExecutor()
1396         return self.process_executor
1397
1398     def remote_pool(self) -> RemoteExecutor:
1399         if self.remote_executor is None:
1400             logger.info('Looking for some helper machines...')
1401             pool: List[RemoteWorkerRecord] = []
1402             if self._ping('cheetah.house'):
1403                 logger.info('Found cheetah.house')
1404                 pool.append(
1405                     RemoteWorkerRecord(
1406                         username='scott',
1407                         machine='cheetah.house',
1408                         weight=24,
1409                         count=5,
1410                     ),
1411                 )
1412             if self._ping('meerkat.cabin'):
1413                 logger.info('Found meerkat.cabin')
1414                 pool.append(
1415                     RemoteWorkerRecord(
1416                         username='scott',
1417                         machine='meerkat.cabin',
1418                         weight=12,
1419                         count=2,
1420                     ),
1421                 )
1422             if self._ping('wannabe.house'):
1423                 logger.info('Found wannabe.house')
1424                 pool.append(
1425                     RemoteWorkerRecord(
1426                         username='scott',
1427                         machine='wannabe.house',
1428                         weight=14,
1429                         count=2,
1430                     ),
1431                 )
1432             if self._ping('puma.cabin'):
1433                 logger.info('Found puma.cabin')
1434                 pool.append(
1435                     RemoteWorkerRecord(
1436                         username='scott',
1437                         machine='puma.cabin',
1438                         weight=24,
1439                         count=5,
1440                     ),
1441                 )
1442             if self._ping('backup.house'):
1443                 logger.info('Found backup.house')
1444                 pool.append(
1445                     RemoteWorkerRecord(
1446                         username='scott',
1447                         machine='backup.house',
1448                         weight=9,
1449                         count=2,
1450                     ),
1451                 )
1452
1453             # The controller machine has a lot to do; go easy on it.
1454             for record in pool:
1455                 if record.machine == platform.node() and record.count > 1:
1456                     logger.info('Reducing workload for %s.', record.machine)
1457                     record.count = max(int(record.count / 2), 1)
1458
1459             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1460             policy.register_worker_pool(pool)
1461             self.remote_executor = RemoteExecutor(pool, policy)
1462         return self.remote_executor
1463
1464     def shutdown(self) -> None:
1465         if self.thread_executor is not None:
1466             self.thread_executor.shutdown(wait=True, quiet=True)
1467             self.thread_executor = None
1468         if self.process_executor is not None:
1469             self.process_executor.shutdown(wait=True, quiet=True)
1470             self.process_executor = None
1471         if self.remote_executor is not None:
1472             self.remote_executor.shutdown(wait=True, quiet=True)
1473             self.remote_executor = None