Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
9 machines.
10
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
13
14 from __future__ import annotations
15
16 import concurrent.futures as fut
17 import logging
18 import os
19 import platform
20 import random
21 import subprocess
22 import threading
23 import time
24 import warnings
25 from abc import ABC, abstractmethod
26 from collections import defaultdict
27 from dataclasses import dataclass, fields
28 from typing import Any, Callable, Dict, List, Optional, Set
29
30 import cloudpickle  # type: ignore
31 import numpy
32 from overrides import overrides
33
34 import pyutils.typez.histogram as hist
35 from pyutils import argparse_utils, config, persistent, string_utils
36 from pyutils.ansi import bg, fg, reset, underline
37 from pyutils.decorator_utils import singleton
38 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
39 from pyutils.parallelize.thread_utils import background_thread
40
41 logger = logging.getLogger(__name__)
42
43 parser = config.add_commandline_args(
44     f"Executors ({__file__})", "Args related to processing executors."
45 )
46 parser.add_argument(
47     '--executors_threadpool_size',
48     type=int,
49     metavar='#THREADS',
50     help='Number of threads in the default threadpool, leave unset for default',
51     default=None,
52 )
53 parser.add_argument(
54     '--executors_processpool_size',
55     type=int,
56     metavar='#PROCESSES',
57     help='Number of processes in the default processpool, leave unset for default',
58     default=None,
59 )
60 parser.add_argument(
61     '--executors_schedule_remote_backups',
62     default=True,
63     action=argparse_utils.ActionNoYes,
64     help='Should we schedule duplicative backup work if a remote bundle is slow',
65 )
66 parser.add_argument(
67     '--executors_max_bundle_failures',
68     type=int,
69     default=3,
70     metavar='#FAILURES',
71     help='Maximum number of failures before giving up on a bundle',
72 )
73 parser.add_argument(
74     '--remote_worker_records_file',
75     type=str,
76     metavar='FILENAME',
77     help='Path of the remote worker records file (JSON)',
78     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
79 )
80
81
82 SSH = '/usr/bin/ssh -oForwardX11=no'
83 SCP = '/usr/bin/scp -C'
84
85
86 def _make_cloud_pickle(fun, *args, **kwargs):
87     """Internal helper to create cloud pickles."""
88     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
89     return cloudpickle.dumps((fun, args, kwargs))
90
91
92 class BaseExecutor(ABC):
93     """The base executor interface definition.  The interface for
94     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
95     :class:`ThreadExecutor`.
96     """
97
98     def __init__(self, *, title=''):
99         self.title = title
100         self.histogram = hist.SimpleHistogram(
101             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
102         )
103         self.task_count = 0
104
105     @abstractmethod
106     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
107         pass
108
109     @abstractmethod
110     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
111         pass
112
113     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
114         """Shutdown the executor and return True if the executor is idle
115         (i.e. there are no pending or active tasks).  Return False
116         otherwise.  Note: this should only be called by the launcher
117         process.
118
119         """
120         if self.task_count == 0:
121             self.shutdown(wait=True, quiet=quiet)
122             return True
123         return False
124
125     def adjust_task_count(self, delta: int) -> None:
126         """Change the task count.  Note: do not call this method from a
127         worker, it should only be called by the launcher process /
128         thread / machine.
129
130         """
131         self.task_count += delta
132         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
133
134     def get_task_count(self) -> int:
135         """Change the task count.  Note: do not call this method from a
136         worker, it should only be called by the launcher process /
137         thread / machine.
138
139         """
140         return self.task_count
141
142
143 class ThreadExecutor(BaseExecutor):
144     """A threadpool executor.  This executor uses python threads to
145     schedule tasks.  Note that, at least as of python3.10, because of
146     the global lock in the interpreter itself, these do not
147     parallelize very well so this class is useful mostly for non-CPU
148     intensive tasks.
149
150     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
151     """
152
153     def __init__(self, max_workers: Optional[int] = None):
154         super().__init__()
155         workers = None
156         if max_workers is not None:
157             workers = max_workers
158         elif 'executors_threadpool_size' in config.config:
159             workers = config.config['executors_threadpool_size']
160         if workers is not None:
161             logger.debug('Creating threadpool executor with %d workers', workers)
162         else:
163             logger.debug('Creating a default sized threadpool executor')
164         self._thread_pool_executor = fut.ThreadPoolExecutor(
165             max_workers=workers, thread_name_prefix="thread_executor_helper"
166         )
167         self.already_shutdown = False
168
169     # This is run on a different thread; do not adjust task count here.
170     @staticmethod
171     def run_local_bundle(fun, *args, **kwargs):
172         logger.debug("Running local bundle at %s", fun.__name__)
173         result = fun(*args, **kwargs)
174         return result
175
176     @overrides
177     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
178         if self.already_shutdown:
179             raise Exception('Submitted work after shutdown.')
180         self.adjust_task_count(+1)
181         newargs = []
182         newargs.append(function)
183         for arg in args:
184             newargs.append(arg)
185         start = time.time()
186         result = self._thread_pool_executor.submit(
187             ThreadExecutor.run_local_bundle, *newargs, **kwargs
188         )
189         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
190         result.add_done_callback(lambda _: self.adjust_task_count(-1))
191         return result
192
193     @overrides
194     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
195         if not self.already_shutdown:
196             logger.debug('Shutting down threadpool executor %s', self.title)
197             self._thread_pool_executor.shutdown(wait)
198             if not quiet:
199                 print(self.histogram.__repr__(label_formatter='%ds'))
200             self.already_shutdown = True
201
202
203 class ProcessExecutor(BaseExecutor):
204     """An executor which runs tasks in child processes.
205
206     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
207     """
208
209     def __init__(self, max_workers=None):
210         super().__init__()
211         workers = None
212         if max_workers is not None:
213             workers = max_workers
214         elif 'executors_processpool_size' in config.config:
215             workers = config.config['executors_processpool_size']
216         if workers is not None:
217             logger.debug('Creating processpool executor with %d workers.', workers)
218         else:
219             logger.debug('Creating a default sized processpool executor')
220         self._process_executor = fut.ProcessPoolExecutor(
221             max_workers=workers,
222         )
223         self.already_shutdown = False
224
225     # This is run in another process; do not adjust task count here.
226     @staticmethod
227     def run_cloud_pickle(pickle):
228         fun, args, kwargs = cloudpickle.loads(pickle)
229         logger.debug("Running pickled bundle at %s", fun.__name__)
230         result = fun(*args, **kwargs)
231         return result
232
233     @overrides
234     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
235         if self.already_shutdown:
236             raise Exception('Submitted work after shutdown.')
237         start = time.time()
238         self.adjust_task_count(+1)
239         pickle = _make_cloud_pickle(function, *args, **kwargs)
240         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
241         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
242         result.add_done_callback(lambda _: self.adjust_task_count(-1))
243         return result
244
245     @overrides
246     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
247         if not self.already_shutdown:
248             logger.debug('Shutting down processpool executor %s', self.title)
249             self._process_executor.shutdown(wait)
250             if not quiet:
251                 print(self.histogram.__repr__(label_formatter='%ds'))
252             self.already_shutdown = True
253
254     def __getstate__(self):
255         state = self.__dict__.copy()
256         state['_process_executor'] = None
257         return state
258
259
260 class RemoteExecutorException(Exception):
261     """Thrown when a bundle cannot be executed despite several retries."""
262
263     pass
264
265
266 @dataclass
267 class RemoteWorkerRecord:
268     """A record of info about a remote worker."""
269
270     username: str
271     """Username we can ssh into on this machine to run work."""
272
273     machine: str
274     """Machine address / name."""
275
276     weight: int
277     """Relative probability for the weighted policy to select this
278     machine for scheduling work."""
279
280     count: int
281     """If this machine is selected, what is the maximum number of task
282     that it can handle?"""
283
284     def __hash__(self):
285         return hash((self.username, self.machine))
286
287     def __repr__(self):
288         return f'{self.username}@{self.machine}'
289
290
291 @dataclass
292 class BundleDetails:
293     """All info necessary to define some unit of work that needs to be
294     done, where it is being run, its state, whether it is an original
295     bundle of a backup bundle, how many times it has failed, etc...
296     """
297
298     pickled_code: bytes
299     """The code to run, cloud pickled"""
300
301     uuid: str
302     """A unique identifier"""
303
304     function_name: str
305     """The name of the function we pickled"""
306
307     worker: Optional[RemoteWorkerRecord]
308     """The remote worker running this bundle or None if none (yet)"""
309
310     username: Optional[str]
311     """The remote username running this bundle or None if none (yet)"""
312
313     machine: Optional[str]
314     """The remote machine running this bundle or None if none (yet)"""
315
316     hostname: str
317     """The controller machine"""
318
319     code_file: str
320     """A unique filename to hold the work to be done"""
321
322     result_file: str
323     """Where the results should be placed / read from"""
324
325     pid: int
326     """The process id of the local subprocess watching the ssh connection
327     to the remote machine"""
328
329     start_ts: float
330     """Starting time"""
331
332     end_ts: float
333     """Ending time"""
334
335     slower_than_local_p95: bool
336     """Currently slower then 95% of other bundles on remote host"""
337
338     slower_than_global_p95: bool
339     """Currently slower than 95% of other bundles globally"""
340
341     src_bundle: Optional[BundleDetails]
342     """If this is a backup bundle, this points to the original bundle
343     that it's backing up.  None otherwise."""
344
345     is_cancelled: threading.Event
346     """An event that can be signaled to indicate this bundle is cancelled.
347     This is set when another copy (backup or original) of this work has
348     completed successfully elsewhere."""
349
350     was_cancelled: bool
351     """True if this bundle was cancelled, False if it finished normally"""
352
353     backup_bundles: Optional[List[BundleDetails]]
354     """If we've created backups of this bundle, this is the list of them"""
355
356     failure_count: int
357     """How many times has this bundle failed already?"""
358
359     def __repr__(self):
360         uuid = self.uuid
361         if uuid[-9:-2] == '_backup':
362             uuid = uuid[:-9]
363             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
364         else:
365             suffix = uuid[-6:]
366
367         # We colorize the uuid based on some bits from it to make them
368         # stand out in the logging and help a reader correlate log messages
369         # related to the same bundle.
370         colorz = [
371             fg('violet red'),
372             fg('red'),
373             fg('orange'),
374             fg('peach orange'),
375             fg('yellow'),
376             fg('marigold yellow'),
377             fg('green yellow'),
378             fg('tea green'),
379             fg('cornflower blue'),
380             fg('turquoise blue'),
381             fg('tropical blue'),
382             fg('lavender purple'),
383             fg('medium purple'),
384         ]
385         c = colorz[int(uuid[-2:], 16) % len(colorz)]
386         function_name = (
387             self.function_name if self.function_name is not None else 'nofname'
388         )
389         machine = self.machine if self.machine is not None else 'nomachine'
390         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
391
392
393 class RemoteExecutorStatus:
394     """A status 'scoreboard' for a remote executor tracking various
395     metrics and able to render a periodic dump of global state.
396     """
397
398     def __init__(self, total_worker_count: int) -> None:
399         """C'tor.
400
401         Args:
402             total_worker_count: number of workers in the pool
403
404         """
405         self.worker_count: int = total_worker_count
406         self.known_workers: Set[RemoteWorkerRecord] = set()
407         self.start_time: float = time.time()
408         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
409         self.end_per_bundle: Dict[str, float] = defaultdict(float)
410         self.finished_bundle_timings_per_worker: Dict[
411             RemoteWorkerRecord, List[float]
412         ] = {}
413         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
414         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
415         self.finished_bundle_timings: List[float] = []
416         self.last_periodic_dump: Optional[float] = None
417         self.total_bundles_submitted: int = 0
418
419         # Protects reads and modification using self.  Also used
420         # as a memory fence for modifications to bundle.
421         self.lock: threading.Lock = threading.Lock()
422
423     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
424         """Record that bundle with uuid is assigned to a particular worker.
425
426         Args:
427             worker: the record of the worker to which uuid is assigned
428             uuid: the uuid of a bundle that has been assigned to a worker
429         """
430         with self.lock:
431             self.record_acquire_worker_already_locked(worker, uuid)
432
433     def record_acquire_worker_already_locked(
434         self, worker: RemoteWorkerRecord, uuid: str
435     ) -> None:
436         """Same as above but an entry point that doesn't acquire the lock
437         for codepaths where it's already held."""
438         assert self.lock.locked()
439         self.known_workers.add(worker)
440         self.start_per_bundle[uuid] = None
441         x = self.in_flight_bundles_by_worker.get(worker, set())
442         x.add(uuid)
443         self.in_flight_bundles_by_worker[worker] = x
444
445     def record_bundle_details(self, details: BundleDetails) -> None:
446         """Register the details about a bundle of work."""
447         with self.lock:
448             self.record_bundle_details_already_locked(details)
449
450     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
451         """Same as above but for codepaths that already hold the lock."""
452         assert self.lock.locked()
453         self.bundle_details_by_uuid[details.uuid] = details
454
455     def record_release_worker(
456         self,
457         worker: RemoteWorkerRecord,
458         uuid: str,
459         was_cancelled: bool,
460     ) -> None:
461         """Record that a bundle has released a worker."""
462         with self.lock:
463             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
464
465     def record_release_worker_already_locked(
466         self,
467         worker: RemoteWorkerRecord,
468         uuid: str,
469         was_cancelled: bool,
470     ) -> None:
471         """Same as above but for codepaths that already hold the lock."""
472         assert self.lock.locked()
473         ts = time.time()
474         self.end_per_bundle[uuid] = ts
475         self.in_flight_bundles_by_worker[worker].remove(uuid)
476         if not was_cancelled:
477             start = self.start_per_bundle[uuid]
478             assert start is not None
479             bundle_latency = ts - start
480             x = self.finished_bundle_timings_per_worker.get(worker, [])
481             x.append(bundle_latency)
482             self.finished_bundle_timings_per_worker[worker] = x
483             self.finished_bundle_timings.append(bundle_latency)
484
485     def record_processing_began(self, uuid: str):
486         """Record when work on a bundle begins."""
487         with self.lock:
488             self.start_per_bundle[uuid] = time.time()
489
490     def total_in_flight(self) -> int:
491         """How many bundles are in flight currently?"""
492         assert self.lock.locked()
493         total_in_flight = 0
494         for worker in self.known_workers:
495             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
496         return total_in_flight
497
498     def total_idle(self) -> int:
499         """How many idle workers are there currently?"""
500         assert self.lock.locked()
501         return self.worker_count - self.total_in_flight()
502
503     def __repr__(self):
504         assert self.lock.locked()
505         ts = time.time()
506         total_finished = len(self.finished_bundle_timings)
507         total_in_flight = self.total_in_flight()
508         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
509         qall = None
510         if len(self.finished_bundle_timings) > 1:
511             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
512             ret += (
513                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
514                 f'✅={total_finished}/{self.total_bundles_submitted}, '
515                 f'💻n={total_in_flight}/{self.worker_count}\n'
516             )
517         else:
518             ret += (
519                 f'⏱={ts-self.start_time:.1f}s, '
520                 f'✅={total_finished}/{self.total_bundles_submitted}, '
521                 f'💻n={total_in_flight}/{self.worker_count}\n'
522             )
523
524         for worker in self.known_workers:
525             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
526             timings = self.finished_bundle_timings_per_worker.get(worker, [])
527             count = len(timings)
528             qworker = None
529             if count > 1:
530                 qworker = numpy.quantile(timings, [0.5, 0.95])
531                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
532             else:
533                 ret += '\n'
534             if count > 0:
535                 ret += f'    ...finished {count} total bundle(s) so far\n'
536             in_flight = len(self.in_flight_bundles_by_worker[worker])
537             if in_flight > 0:
538                 ret += f'    ...{in_flight} bundles currently in flight:\n'
539                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
540                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
541                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
542                     if self.start_per_bundle[bundle_uuid] is not None:
543                         sec = ts - self.start_per_bundle[bundle_uuid]
544                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
545                     else:
546                         ret += f'       {details} setting up / copying data...'
547                         sec = 0.0
548
549                     if qworker is not None:
550                         if sec > qworker[1]:
551                             ret += f'{bg("red")}>💻p95{reset()} '
552                             if details is not None:
553                                 details.slower_than_local_p95 = True
554                         else:
555                             if details is not None:
556                                 details.slower_than_local_p95 = False
557
558                     if qall is not None:
559                         if sec > qall[1]:
560                             ret += f'{bg("red")}>∀p95{reset()} '
561                             if details is not None:
562                                 details.slower_than_global_p95 = True
563                         else:
564                             details.slower_than_global_p95 = False
565                     ret += '\n'
566         return ret
567
568     def periodic_dump(self, total_bundles_submitted: int) -> None:
569         assert self.lock.locked()
570         self.total_bundles_submitted = total_bundles_submitted
571         ts = time.time()
572         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
573             print(self)
574             self.last_periodic_dump = ts
575
576
577 class RemoteWorkerSelectionPolicy(ABC):
578     """A policy for selecting a remote worker base class."""
579
580     def __init__(self):
581         self.workers: Optional[List[RemoteWorkerRecord]] = None
582
583     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
584         self.workers = workers
585
586     @abstractmethod
587     def is_worker_available(self) -> bool:
588         pass
589
590     @abstractmethod
591     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
592         pass
593
594
595 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
596     """A remote worker selector that uses weighted RNG."""
597
598     @overrides
599     def is_worker_available(self) -> bool:
600         if self.workers:
601             for worker in self.workers:
602                 if worker.count > 0:
603                     return True
604         return False
605
606     @overrides
607     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
608         grabbag = []
609         if self.workers:
610             for worker in self.workers:
611                 if worker.machine != machine_to_avoid:
612                     if worker.count > 0:
613                         for _ in range(worker.count * worker.weight):
614                             grabbag.append(worker)
615
616         if len(grabbag) == 0:
617             logger.debug(
618                 'There are no available workers that avoid %s', machine_to_avoid
619             )
620             if self.workers:
621                 for worker in self.workers:
622                     if worker.count > 0:
623                         for _ in range(worker.count * worker.weight):
624                             grabbag.append(worker)
625
626         if len(grabbag) == 0:
627             logger.warning('There are no available workers?!')
628             return None
629
630         worker = random.sample(grabbag, 1)[0]
631         assert worker.count > 0
632         worker.count -= 1
633         logger.debug('Selected worker %s', worker)
634         return worker
635
636
637 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
638     """A remote worker selector that just round robins."""
639
640     def __init__(self) -> None:
641         super().__init__()
642         self.index = 0
643
644     @overrides
645     def is_worker_available(self) -> bool:
646         if self.workers:
647             for worker in self.workers:
648                 if worker.count > 0:
649                     return True
650         return False
651
652     @overrides
653     def acquire_worker(
654         self, machine_to_avoid: str = None
655     ) -> Optional[RemoteWorkerRecord]:
656         if self.workers:
657             x = self.index
658             while True:
659                 worker = self.workers[x]
660                 if worker.count > 0:
661                     worker.count -= 1
662                     x += 1
663                     if x >= len(self.workers):
664                         x = 0
665                     self.index = x
666                     logger.debug('Selected worker %s', worker)
667                     return worker
668                 x += 1
669                 if x >= len(self.workers):
670                     x = 0
671                 if x == self.index:
672                     logger.warning('Unexpectedly could not find a worker, retrying...')
673                     return None
674         return None
675
676
677 class RemoteExecutor(BaseExecutor):
678     """An executor that uses processes on remote machines to do work.  This
679     works by creating "bundles" of work with pickled code in each to be
680     executed.  Each bundle is assigned a remote worker based on some policy
681     heuristics.  Once assigned to a remote worker, a local subprocess is
682     created.  It copies the pickled code to the remote machine via ssh/scp
683     and then starts up work on the remote machine again using ssh.  When
684     the work is complete it copies the results back to the local machine.
685
686     So there is essentially one "controller" machine (which may also be
687     in the remote executor pool and therefore do task work in addition to
688     controlling) and N worker machines.  This code runs on the controller
689     whereas on the worker machines we invoke pickled user code via a
690     shim in :file:`remote_worker.py`.
691
692     Some redundancy and safety provisions are made; e.g. slower than
693     expected tasks have redundant backups created and if a task fails
694     repeatedly we consider it poisoned and give up on it.
695
696     .. warning::
697
698         The network overhead / latency of copying work from the
699         controller machine to the remote workers is relatively high.
700         This executor probably only makes sense to use with
701         computationally expensive tasks such as jobs that will execute
702         for ~30 seconds or longer.
703
704     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
705     """
706
707     def __init__(
708         self,
709         workers: List[RemoteWorkerRecord],
710         policy: RemoteWorkerSelectionPolicy,
711     ) -> None:
712         """C'tor.
713
714         Args:
715             workers: A list of remote workers we can call on to do tasks.
716             policy: A policy for selecting remote workers for tasks.
717         """
718
719         super().__init__()
720         self.workers = workers
721         self.policy = policy
722         self.worker_count = 0
723         for worker in self.workers:
724             self.worker_count += worker.count
725         if self.worker_count <= 0:
726             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
727             logger.critical(msg)
728             raise RemoteExecutorException(msg)
729         self.policy.register_worker_pool(self.workers)
730         self.cv = threading.Condition()
731         logger.debug(
732             'Creating %d local threads, one per remote worker.', self.worker_count
733         )
734         self._helper_executor = fut.ThreadPoolExecutor(
735             thread_name_prefix="remote_executor_helper",
736             max_workers=self.worker_count,
737         )
738         self.status = RemoteExecutorStatus(self.worker_count)
739         self.total_bundles_submitted = 0
740         self.backup_lock = threading.Lock()
741         self.last_backup = None
742         (
743             self.heartbeat_thread,
744             self.heartbeat_stop_event,
745         ) = self._run_periodic_heartbeat()
746         self.already_shutdown = False
747
748     @background_thread
749     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
750         """
751         We create a background thread to invoke :meth:`_heartbeat` regularly
752         while we are scheduling work.  It does some accounting such as
753         looking for slow bundles to tag for backup creation, checking for
754         unexpected failures, and printing a fancy message on stdout.
755         """
756         while not stop_event.is_set():
757             time.sleep(5.0)
758             logger.debug('Running periodic heartbeat code...')
759             self._heartbeat()
760         logger.debug('Periodic heartbeat thread shutting down.')
761
762     def _heartbeat(self) -> None:
763         # Note: this is invoked on a background thread, not an
764         # executor thread.  Be careful what you do with it b/c it
765         # needs to get back and dump status again periodically.
766         with self.status.lock:
767             self.status.periodic_dump(self.total_bundles_submitted)
768
769             # Look for bundles to reschedule via executor.submit
770             if config.config['executors_schedule_remote_backups']:
771                 self._maybe_schedule_backup_bundles()
772
773     def _maybe_schedule_backup_bundles(self):
774         """Maybe schedule backup bundles if we see a very slow bundle."""
775
776         assert self.status.lock.locked()
777         num_done = len(self.status.finished_bundle_timings)
778         num_idle_workers = self.worker_count - self.task_count
779         now = time.time()
780         if (
781             num_done >= 2
782             and num_idle_workers > 0
783             and (self.last_backup is None or (now - self.last_backup > 9.0))
784             and self.backup_lock.acquire(blocking=False)
785         ):
786             try:
787                 assert self.backup_lock.locked()
788
789                 bundle_to_backup = None
790                 best_score = None
791                 for (
792                     worker,
793                     bundle_uuids,
794                 ) in self.status.in_flight_bundles_by_worker.items():
795
796                     # Prefer to schedule backups of bundles running on
797                     # slower machines.
798                     base_score = 0
799                     for record in self.workers:
800                         if worker.machine == record.machine:
801                             base_score = float(record.weight)
802                             base_score = 1.0 / base_score
803                             base_score *= 200.0
804                             base_score = int(base_score)
805                             break
806
807                     for uuid in bundle_uuids:
808                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
809                         if (
810                             bundle is not None
811                             and bundle.src_bundle is None
812                             and bundle.backup_bundles is not None
813                         ):
814                             score = base_score
815
816                             # Schedule backups of bundles running
817                             # longer; especially those that are
818                             # unexpectedly slow.
819                             start_ts = self.status.start_per_bundle[uuid]
820                             if start_ts is not None:
821                                 runtime = now - start_ts
822                                 score += runtime
823                                 logger.debug(
824                                     'score[%s] => %.1f  # latency boost', bundle, score
825                                 )
826
827                                 if bundle.slower_than_local_p95:
828                                     score += runtime / 2
829                                     logger.debug(
830                                         'score[%s] => %.1f  # >worker p95',
831                                         bundle,
832                                         score,
833                                     )
834
835                                 if bundle.slower_than_global_p95:
836                                     score += runtime / 4
837                                     logger.debug(
838                                         'score[%s] => %.1f  # >global p95',
839                                         bundle,
840                                         score,
841                                     )
842
843                             # Prefer backups of bundles that don't
844                             # have backups already.
845                             backup_count = len(bundle.backup_bundles)
846                             if backup_count == 0:
847                                 score *= 2
848                             elif backup_count == 1:
849                                 score /= 2
850                             elif backup_count == 2:
851                                 score /= 8
852                             else:
853                                 score = 0
854                             logger.debug(
855                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
856                                 bundle,
857                                 score,
858                             )
859
860                             if score != 0 and (
861                                 best_score is None or score > best_score
862                             ):
863                                 bundle_to_backup = bundle
864                                 assert bundle is not None
865                                 assert bundle.backup_bundles is not None
866                                 assert bundle.src_bundle is None
867                                 best_score = score
868
869                 # Note: this is all still happening on the heartbeat
870                 # runner thread.  That's ok because
871                 # _schedule_backup_for_bundle uses the executor to
872                 # submit the bundle again which will cause it to be
873                 # picked up by a worker thread and allow this thread
874                 # to return to run future heartbeats.
875                 if bundle_to_backup is not None:
876                     self.last_backup = now
877                     logger.info(
878                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
879                         bundle_to_backup,
880                         best_score,
881                     )
882                     self._schedule_backup_for_bundle(bundle_to_backup)
883             finally:
884                 self.backup_lock.release()
885
886     def _is_worker_available(self) -> bool:
887         """Is there a worker available currently?"""
888         return self.policy.is_worker_available()
889
890     def _acquire_worker(
891         self, machine_to_avoid: str = None
892     ) -> Optional[RemoteWorkerRecord]:
893         """Try to acquire a worker."""
894         return self.policy.acquire_worker(machine_to_avoid)
895
896     def _find_available_worker_or_block(
897         self, machine_to_avoid: str = None
898     ) -> RemoteWorkerRecord:
899         """Find a worker or block until one becomes available."""
900         with self.cv:
901             while not self._is_worker_available():
902                 self.cv.wait()
903             worker = self._acquire_worker(machine_to_avoid)
904             if worker is not None:
905                 return worker
906         msg = "We should never reach this point in the code"
907         logger.critical(msg)
908         raise Exception(msg)
909
910     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
911         """Release a previously acquired worker."""
912         worker = bundle.worker
913         assert worker is not None
914         logger.debug('Released worker %s', worker)
915         self.status.record_release_worker(
916             worker,
917             bundle.uuid,
918             was_cancelled,
919         )
920         with self.cv:
921             worker.count += 1
922             self.cv.notify()
923         self.adjust_task_count(-1)
924
925     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
926         """See if a particular bundle is cancelled.  Do not block."""
927         with self.status.lock:
928             if bundle.is_cancelled.wait(timeout=0.0):
929                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
930                 bundle.was_cancelled = True
931                 return True
932         return False
933
934     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
935         """Find a worker for bundle or block until one is available."""
936
937         self.adjust_task_count(+1)
938         uuid = bundle.uuid
939         hostname = bundle.hostname
940         avoid_machine = override_avoid_machine
941         is_original = bundle.src_bundle is None
942
943         # Try not to schedule a backup on the same host as the original.
944         if avoid_machine is None and bundle.src_bundle is not None:
945             avoid_machine = bundle.src_bundle.machine
946         worker = None
947         while worker is None:
948             worker = self._find_available_worker_or_block(avoid_machine)
949         assert worker is not None
950
951         # Ok, found a worker.
952         bundle.worker = worker
953         machine = bundle.machine = worker.machine
954         username = bundle.username = worker.username
955         self.status.record_acquire_worker(worker, uuid)
956         logger.debug('%s: Running bundle on %s...', bundle, worker)
957
958         # Before we do any work, make sure the bundle is still viable.
959         # It may have been some time between when it was submitted and
960         # now due to lack of worker availability and someone else may
961         # have already finished it.
962         if self._check_if_cancelled(bundle):
963             try:
964                 return self._process_work_result(bundle)
965             except Exception as e:
966                 logger.warning(
967                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
968                 )
969                 self._release_worker(bundle)
970                 if is_original:
971                     # Weird.  We are the original owner of this
972                     # bundle.  For it to have been cancelled, a backup
973                     # must have already started and completed before
974                     # we even for started.  Moreover, the backup says
975                     # it is done but we can't find the results it
976                     # should have copied over.  Reschedule the whole
977                     # thing.
978                     logger.exception(e)
979                     logger.error(
980                         '%s: We are the original owner thread and yet there are '
981                         'no results for this bundle.  This is unexpected and bad.',
982                         bundle,
983                     )
984                     return self._emergency_retry_nasty_bundle(bundle)
985                 else:
986                     # We're a backup and our bundle is cancelled
987                     # before we even got started.  Do nothing and let
988                     # the original bundle's thread worry about either
989                     # finding the results or complaining about it.
990                     return None
991
992         # Send input code / data to worker machine if it's not local.
993         if hostname not in machine:
994             try:
995                 cmd = (
996                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
997                 )
998                 start_ts = time.time()
999                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1000                 run_silently(cmd)
1001                 xfer_latency = time.time() - start_ts
1002                 logger.debug(
1003                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1004                 )
1005             except Exception as e:
1006                 self._release_worker(bundle)
1007                 if is_original:
1008                     # Weird.  We tried to copy the code to the worker
1009                     # and it failed...  And we're the original bundle.
1010                     # We have to retry.
1011                     logger.exception(e)
1012                     logger.error(
1013                         "%s: Failed to send instructions to the worker machine?! "
1014                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1015                         "be a race condition.  Attempting an emergency retry...",
1016                         bundle,
1017                     )
1018                     return self._emergency_retry_nasty_bundle(bundle)
1019                 else:
1020                     # This is actually expected; we're a backup.
1021                     # There's a race condition where someone else
1022                     # already finished the work and removed the source
1023                     # code_file before we could copy it.  Ignore.
1024                     logger.warning(
1025                         '%s: Failed to send instructions to the worker machine... '
1026                         'We\'re a backup and this may be caused by the original (or '
1027                         'some other backup) already finishing this work.  Ignoring.',
1028                         bundle,
1029                     )
1030                     return None
1031
1032         # Kick off the work.  Note that if this fails we let
1033         # _wait_for_process deal with it.
1034         self.status.record_processing_began(uuid)
1035         cmd = (
1036             f'{SSH} {bundle.username}@{bundle.machine} '
1037             f'"source py39-venv/bin/activate &&'
1038             f' /home/scott/lib/python_modules/remote_worker.py'
1039             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1040         )
1041         logger.debug(
1042             '%s: Executing %s in the background to kick off work...', bundle, cmd
1043         )
1044         p = cmd_in_background(cmd, silent=True)
1045         bundle.pid = p.pid
1046         logger.debug(
1047             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1048         )
1049         return self._wait_for_process(p, bundle, 0)
1050
1051     def _wait_for_process(
1052         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1053     ) -> Any:
1054         """At this point we've copied the bundle's pickled code to the remote
1055         worker and started an ssh process that should be invoking the
1056         remote worker to have it execute the user's code.  See how
1057         that's going and wait for it to complete or fail.  Note that
1058         this code is recursive: there are codepaths where we decide to
1059         stop waiting for an ssh process (because another backup seems
1060         to have finished) but then fail to fetch or parse the results
1061         from that backup and thus call ourselves to continue waiting
1062         on an active ssh process.  This is the purpose of the depth
1063         argument: to curtail potential infinite recursion by giving up
1064         eventually.
1065
1066         Args:
1067             p: the Popen record of the ssh job
1068             bundle: the bundle of work being executed remotely
1069             depth: how many retries we've made so far.  Starts at zero.
1070
1071         """
1072
1073         machine = bundle.machine
1074         assert p is not None
1075         pid = p.pid  # pid of the ssh process
1076         if depth > 3:
1077             logger.error(
1078                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1079                 pid,
1080             )
1081             p.terminate()
1082             self._release_worker(bundle)
1083             return self._emergency_retry_nasty_bundle(bundle)
1084
1085         # Spin until either the ssh job we scheduled finishes the
1086         # bundle or some backup worker signals that they finished it
1087         # before we could.
1088         while True:
1089             try:
1090                 p.wait(timeout=0.25)
1091             except subprocess.TimeoutExpired:
1092                 if self._check_if_cancelled(bundle):
1093                     logger.info(
1094                         '%s: looks like another worker finished bundle...', bundle
1095                     )
1096                     break
1097             else:
1098                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1099                 p = None
1100                 break
1101
1102         # If we get here we believe the bundle is done; either the ssh
1103         # subprocess finished (hopefully successfully) or we noticed
1104         # that some other worker seems to have completed the bundle
1105         # before us and we're bailing out.
1106         try:
1107             ret = self._process_work_result(bundle)
1108             if ret is not None and p is not None:
1109                 p.terminate()
1110             return ret
1111
1112         # Something went wrong; e.g. we could not copy the results
1113         # back, cleanup after ourselves on the remote machine, or
1114         # unpickle the results we got from the remove machine.  If we
1115         # still have an active ssh subprocess, keep waiting on it.
1116         # Otherwise, time for an emergency reschedule.
1117         except Exception as e:
1118             logger.exception(e)
1119             logger.error('%s: Something unexpected just happened...', bundle)
1120             if p is not None:
1121                 logger.warning(
1122                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1123                     bundle,
1124                 )
1125                 return self._wait_for_process(p, bundle, depth + 1)
1126             else:
1127                 self._release_worker(bundle)
1128                 return self._emergency_retry_nasty_bundle(bundle)
1129
1130     def _process_work_result(self, bundle: BundleDetails) -> Any:
1131         """A bundle seems to be completed.  Check on the results."""
1132
1133         with self.status.lock:
1134             is_original = bundle.src_bundle is None
1135             was_cancelled = bundle.was_cancelled
1136             username = bundle.username
1137             machine = bundle.machine
1138             result_file = bundle.result_file
1139             code_file = bundle.code_file
1140
1141             # Whether original or backup, if we finished first we must
1142             # fetch the results if the computation happened on a
1143             # remote machine.
1144             bundle.end_ts = time.time()
1145             if not was_cancelled:
1146                 assert bundle.machine is not None
1147                 if bundle.hostname not in bundle.machine:
1148                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1149                     logger.info(
1150                         "%s: Fetching results back from %s@%s via %s",
1151                         bundle,
1152                         username,
1153                         machine,
1154                         cmd,
1155                     )
1156
1157                     # If either of these throw they are handled in
1158                     # _wait_for_process.
1159                     attempts = 0
1160                     while True:
1161                         try:
1162                             run_silently(cmd)
1163                         except Exception as e:
1164                             attempts += 1
1165                             if attempts >= 3:
1166                                 raise e
1167                         else:
1168                             break
1169
1170                     # Cleanup remote /tmp files.
1171                     run_silently(
1172                         f'{SSH} {username}@{machine}'
1173                         f' "/bin/rm -f {code_file} {result_file}"'
1174                     )
1175                     logger.debug(
1176                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1177                     )
1178                 dur = bundle.end_ts - bundle.start_ts
1179                 self.histogram.add_item(dur)
1180
1181         # Only the original worker should unpickle the file contents
1182         # though since it's the only one whose result matters.  The
1183         # original is also the only job that may delete result_file
1184         # from disk.  Note that the original may have been cancelled
1185         # if one of the backups finished first; it still must read the
1186         # result from disk.  It still does that here with is_cancelled
1187         # set.
1188         if is_original:
1189             logger.debug("%s: Unpickling %s.", bundle, result_file)
1190             try:
1191                 with open(result_file, 'rb') as rb:
1192                     serialized = rb.read()
1193                 result = cloudpickle.loads(serialized)
1194             except Exception as e:
1195                 logger.exception(e)
1196                 logger.error('Failed to load %s... this is bad news.', result_file)
1197                 self._release_worker(bundle)
1198
1199                 # Re-raise the exception; the code in _wait_for_process may
1200                 # decide to _emergency_retry_nasty_bundle here.
1201                 raise e
1202             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1203             os.remove(result_file)
1204             os.remove(code_file)
1205
1206             # Notify any backups that the original is done so they
1207             # should stop ASAP.  Do this whether or not we
1208             # finished first since there could be more than one
1209             # backup.
1210             if bundle.backup_bundles is not None:
1211                 for backup in bundle.backup_bundles:
1212                     logger.debug(
1213                         '%s: Notifying backup %s that it\'s cancelled',
1214                         bundle,
1215                         backup.uuid,
1216                     )
1217                     backup.is_cancelled.set()
1218
1219         # This is a backup job and, by now, we have already fetched
1220         # the bundle results.
1221         else:
1222             # Backup results don't matter, they just need to leave the
1223             # result file in the right place for their originals to
1224             # read/unpickle later.
1225             result = None
1226
1227             # Tell the original to stop if we finished first.
1228             if not was_cancelled:
1229                 orig_bundle = bundle.src_bundle
1230                 assert orig_bundle is not None
1231                 logger.debug(
1232                     '%s: Notifying original %s we beat them to it.',
1233                     bundle,
1234                     orig_bundle.uuid,
1235                 )
1236                 orig_bundle.is_cancelled.set()
1237         self._release_worker(bundle, was_cancelled=was_cancelled)
1238         return result
1239
1240     def _create_original_bundle(self, pickle, function_name: str):
1241         """Creates a bundle that is not a backup of any other bundle but
1242         rather represents a user task.
1243         """
1244
1245         uuid = string_utils.generate_uuid(omit_dashes=True)
1246         code_file = f'/tmp/{uuid}.code.bin'
1247         result_file = f'/tmp/{uuid}.result.bin'
1248
1249         logger.debug('Writing pickled code to %s', code_file)
1250         with open(code_file, 'wb') as wb:
1251             wb.write(pickle)
1252
1253         bundle = BundleDetails(
1254             pickled_code=pickle,
1255             uuid=uuid,
1256             function_name=function_name,
1257             worker=None,
1258             username=None,
1259             machine=None,
1260             hostname=platform.node(),
1261             code_file=code_file,
1262             result_file=result_file,
1263             pid=0,
1264             start_ts=time.time(),
1265             end_ts=0.0,
1266             slower_than_local_p95=False,
1267             slower_than_global_p95=False,
1268             src_bundle=None,
1269             is_cancelled=threading.Event(),
1270             was_cancelled=False,
1271             backup_bundles=[],
1272             failure_count=0,
1273         )
1274         self.status.record_bundle_details(bundle)
1275         logger.debug('%s: Created an original bundle', bundle)
1276         return bundle
1277
1278     def _create_backup_bundle(self, src_bundle: BundleDetails):
1279         """Creates a bundle that is a backup of another bundle that is
1280         running too slowly."""
1281
1282         assert self.status.lock.locked()
1283         assert src_bundle.backup_bundles is not None
1284         n = len(src_bundle.backup_bundles)
1285         uuid = src_bundle.uuid + f'_backup#{n}'
1286
1287         backup_bundle = BundleDetails(
1288             pickled_code=src_bundle.pickled_code,
1289             uuid=uuid,
1290             function_name=src_bundle.function_name,
1291             worker=None,
1292             username=None,
1293             machine=None,
1294             hostname=src_bundle.hostname,
1295             code_file=src_bundle.code_file,
1296             result_file=src_bundle.result_file,
1297             pid=0,
1298             start_ts=time.time(),
1299             end_ts=0.0,
1300             slower_than_local_p95=False,
1301             slower_than_global_p95=False,
1302             src_bundle=src_bundle,
1303             is_cancelled=threading.Event(),
1304             was_cancelled=False,
1305             backup_bundles=None,  # backup backups not allowed
1306             failure_count=0,
1307         )
1308         src_bundle.backup_bundles.append(backup_bundle)
1309         self.status.record_bundle_details_already_locked(backup_bundle)
1310         logger.debug('%s: Created a backup bundle', backup_bundle)
1311         return backup_bundle
1312
1313     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1314         """Schedule a backup of src_bundle."""
1315
1316         assert self.status.lock.locked()
1317         assert src_bundle is not None
1318         backup_bundle = self._create_backup_bundle(src_bundle)
1319         logger.debug(
1320             '%s/%s: Scheduling backup for execution...',
1321             backup_bundle.uuid,
1322             backup_bundle.function_name,
1323         )
1324         self._helper_executor.submit(self._launch, backup_bundle)
1325
1326         # Results from backups don't matter; if they finish first
1327         # they will move the result_file to this machine and let
1328         # the original pick them up and unpickle them (and return
1329         # a result).
1330
1331     def _emergency_retry_nasty_bundle(
1332         self, bundle: BundleDetails
1333     ) -> Optional[fut.Future]:
1334         """Something unexpectedly failed with bundle.  Either retry it
1335         from the beginning or throw in the towel and give up on it."""
1336
1337         is_original = bundle.src_bundle is None
1338         bundle.worker = None
1339         avoid_last_machine = bundle.machine
1340         bundle.machine = None
1341         bundle.username = None
1342         bundle.failure_count += 1
1343         if is_original:
1344             retry_limit = 3
1345         else:
1346             retry_limit = 2
1347
1348         if bundle.failure_count > retry_limit:
1349             logger.error(
1350                 '%s: Tried this bundle too many times already (%dx); giving up.',
1351                 bundle,
1352                 retry_limit,
1353             )
1354             if is_original:
1355                 raise RemoteExecutorException(
1356                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1357                 )
1358             else:
1359                 logger.error(
1360                     '%s: At least it\'s only a backup; better luck with the others.',
1361                     bundle,
1362                 )
1363             return None
1364         else:
1365             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1366             logger.warning(msg)
1367             warnings.warn(msg)
1368             return self._launch(bundle, avoid_last_machine)
1369
1370     @overrides
1371     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1372         """Submit work to be done.  This is the user entry point of this
1373         class."""
1374         if self.already_shutdown:
1375             raise Exception('Submitted work after shutdown.')
1376         pickle = _make_cloud_pickle(function, *args, **kwargs)
1377         bundle = self._create_original_bundle(pickle, function.__name__)
1378         self.total_bundles_submitted += 1
1379         return self._helper_executor.submit(self._launch, bundle)
1380
1381     @overrides
1382     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1383         """Shutdown the executor."""
1384         if not self.already_shutdown:
1385             logging.debug('Shutting down RemoteExecutor %s', self.title)
1386             self.heartbeat_stop_event.set()
1387             self.heartbeat_thread.join()
1388             self._helper_executor.shutdown(wait)
1389             if not quiet:
1390                 print(self.histogram.__repr__(label_formatter='%ds'))
1391             self.already_shutdown = True
1392
1393
1394 class RemoteWorkerPoolProvider:
1395     @abstractmethod
1396     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1397         pass
1398
1399
1400 @persistent.persistent_autoloaded_singleton()  # type: ignore
1401 class ConfigRemoteWorkerPoolProvider(
1402     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1403 ):
1404     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1405         self.remote_worker_pool = []
1406         for record in json_remote_worker_pool['remote_worker_records']:
1407             self.remote_worker_pool.append(
1408                 self.dataclassFromDict(RemoteWorkerRecord, record)
1409             )
1410         assert len(self.remote_worker_pool) > 0
1411
1412     @staticmethod
1413     def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1414         fieldSet = {f.name for f in fields(clsName) if f.init}
1415         filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1416         return clsName(**filteredArgDict)
1417
1418     @overrides
1419     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1420         return self.remote_worker_pool
1421
1422     @overrides
1423     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1424         return self.remote_worker_pool
1425
1426     @staticmethod
1427     @overrides
1428     def get_filename() -> str:
1429         return config.config['remote_worker_records_file']
1430
1431     @staticmethod
1432     @overrides
1433     def should_we_load_data(filename: str) -> bool:
1434         return True
1435
1436     @staticmethod
1437     @overrides
1438     def should_we_save_data(filename: str) -> bool:
1439         return False
1440
1441
1442 @singleton
1443 class DefaultExecutors(object):
1444     """A container for a default thread, process and remote executor.
1445     These are not created until needed and we take care to clean up
1446     before process exit automatically for the caller's convenience.
1447     Instead of creating your own executor, consider using the one
1448     from this pool.  e.g.::
1449
1450         @par.parallelize(method=par.Method.PROCESS)
1451         def do_work(
1452             solutions: List[Work],
1453             shard_num: int,
1454             ...
1455         ):
1456             <do the work>
1457
1458
1459         def start_do_work(all_work: List[Work]):
1460             shards = []
1461             logger.debug('Sharding work into groups of 10.')
1462             for subset in list_utils.shard(all_work, 10):
1463                 shards.append([x for x in subset])
1464
1465             logger.debug('Kicking off helper pool.')
1466             try:
1467                 for n, shard in enumerate(shards):
1468                     results.append(
1469                         do_work(
1470                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1471                         )
1472                     )
1473                 smart_future.wait_all(results)
1474             finally:
1475                 # Note: if you forget to do this it will clean itself up
1476                 # during program termination including tearing down any
1477                 # active ssh connections.
1478                 executors.DefaultExecutors().process_pool().shutdown()
1479     """
1480
1481     def __init__(self):
1482         self.thread_executor: Optional[ThreadExecutor] = None
1483         self.process_executor: Optional[ProcessExecutor] = None
1484         self.remote_executor: Optional[RemoteExecutor] = None
1485
1486     @staticmethod
1487     def _ping(host) -> bool:
1488         logger.debug('RUN> ping -c 1 %s', host)
1489         try:
1490             x = cmd_exitcode(
1491                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1492             )
1493             return x == 0
1494         except Exception:
1495             return False
1496
1497     def thread_pool(self) -> ThreadExecutor:
1498         if self.thread_executor is None:
1499             self.thread_executor = ThreadExecutor()
1500         return self.thread_executor
1501
1502     def process_pool(self) -> ProcessExecutor:
1503         if self.process_executor is None:
1504             self.process_executor = ProcessExecutor()
1505         return self.process_executor
1506
1507     def remote_pool(self) -> RemoteExecutor:
1508         if self.remote_executor is None:
1509             logger.info('Looking for some helper machines...')
1510             provider = ConfigRemoteWorkerPoolProvider()
1511             all_machines = provider.get_remote_workers()
1512             pool = []
1513
1514             # Make sure we can ping each machine.
1515             for record in all_machines:
1516                 if self._ping(record.machine):
1517                     logger.info('%s is alive / responding to pings', record.machine)
1518                     pool.append(record)
1519
1520             # The controller machine has a lot to do; go easy on it.
1521             for record in pool:
1522                 if record.machine == platform.node() and record.count > 1:
1523                     logger.info('Reducing workload for %s.', record.machine)
1524                     record.count = max(int(record.count / 2), 1)
1525
1526             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1527             policy.register_worker_pool(pool)
1528             self.remote_executor = RemoteExecutor(pool, policy)
1529         return self.remote_executor
1530
1531     def shutdown(self) -> None:
1532         if self.thread_executor is not None:
1533             self.thread_executor.shutdown(wait=True, quiet=True)
1534             self.thread_executor = None
1535         if self.process_executor is not None:
1536             self.process_executor.shutdown(wait=True, quiet=True)
1537             self.process_executor = None
1538         if self.remote_executor is not None:
1539             self.remote_executor.shutdown(wait=True, quiet=True)
1540             self.remote_executor = None