Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
9 machines.
10
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
13
14 from __future__ import annotations
15 import concurrent.futures as fut
16 import logging
17 import os
18 import platform
19 import random
20 import subprocess
21 import threading
22 import time
23 import warnings
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass, fields
27 from typing import Any, Callable, Dict, List, Optional, Set
28
29 import cloudpickle  # type: ignore
30 import numpy
31 from overrides import overrides
32
33 import argparse_utils
34 import config
35 import histogram as hist
36 import persistent
37 import string_utils
38 from ansi import bg, fg, reset, underline
39 from decorator_utils import singleton
40 from exec_utils import cmd_exitcode, cmd_in_background, run_silently
41 from thread_utils import background_thread
42
43 logger = logging.getLogger(__name__)
44
45 parser = config.add_commandline_args(
46     f"Executors ({__file__})", "Args related to processing executors."
47 )
48 parser.add_argument(
49     '--executors_threadpool_size',
50     type=int,
51     metavar='#THREADS',
52     help='Number of threads in the default threadpool, leave unset for default',
53     default=None,
54 )
55 parser.add_argument(
56     '--executors_processpool_size',
57     type=int,
58     metavar='#PROCESSES',
59     help='Number of processes in the default processpool, leave unset for default',
60     default=None,
61 )
62 parser.add_argument(
63     '--executors_schedule_remote_backups',
64     default=True,
65     action=argparse_utils.ActionNoYes,
66     help='Should we schedule duplicative backup work if a remote bundle is slow',
67 )
68 parser.add_argument(
69     '--executors_max_bundle_failures',
70     type=int,
71     default=3,
72     metavar='#FAILURES',
73     help='Maximum number of failures before giving up on a bundle',
74 )
75 parser.add_argument(
76     '--remote_worker_records_file',
77     type=str,
78     metavar='FILENAME',
79     help='Path of the remote worker records file (JSON)',
80     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
81 )
82
83
84 SSH = '/usr/bin/ssh -oForwardX11=no'
85 SCP = '/usr/bin/scp -C'
86
87
88 def _make_cloud_pickle(fun, *args, **kwargs):
89     """Internal helper to create cloud pickles."""
90     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
91     return cloudpickle.dumps((fun, args, kwargs))
92
93
94 class BaseExecutor(ABC):
95     """The base executor interface definition.  The interface for
96     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
97     :class:`ThreadExecutor`.
98     """
99
100     def __init__(self, *, title=''):
101         self.title = title
102         self.histogram = hist.SimpleHistogram(
103             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
104         )
105         self.task_count = 0
106
107     @abstractmethod
108     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
109         pass
110
111     @abstractmethod
112     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
113         pass
114
115     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
116         """Shutdown the executor and return True if the executor is idle
117         (i.e. there are no pending or active tasks).  Return False
118         otherwise.  Note: this should only be called by the launcher
119         process.
120
121         """
122         if self.task_count == 0:
123             self.shutdown(wait=True, quiet=quiet)
124             return True
125         return False
126
127     def adjust_task_count(self, delta: int) -> None:
128         """Change the task count.  Note: do not call this method from a
129         worker, it should only be called by the launcher process /
130         thread / machine.
131
132         """
133         self.task_count += delta
134         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
135
136     def get_task_count(self) -> int:
137         """Change the task count.  Note: do not call this method from a
138         worker, it should only be called by the launcher process /
139         thread / machine.
140
141         """
142         return self.task_count
143
144
145 class ThreadExecutor(BaseExecutor):
146     """A threadpool executor.  This executor uses python threads to
147     schedule tasks.  Note that, at least as of python3.10, because of
148     the global lock in the interpreter itself, these do not
149     parallelize very well so this class is useful mostly for non-CPU
150     intensive tasks.
151
152     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
153     """
154
155     def __init__(self, max_workers: Optional[int] = None):
156         super().__init__()
157         workers = None
158         if max_workers is not None:
159             workers = max_workers
160         elif 'executors_threadpool_size' in config.config:
161             workers = config.config['executors_threadpool_size']
162         if workers is not None:
163             logger.debug('Creating threadpool executor with %d workers', workers)
164         else:
165             logger.debug('Creating a default sized threadpool executor')
166         self._thread_pool_executor = fut.ThreadPoolExecutor(
167             max_workers=workers, thread_name_prefix="thread_executor_helper"
168         )
169         self.already_shutdown = False
170
171     # This is run on a different thread; do not adjust task count here.
172     @staticmethod
173     def run_local_bundle(fun, *args, **kwargs):
174         logger.debug("Running local bundle at %s", fun.__name__)
175         result = fun(*args, **kwargs)
176         return result
177
178     @overrides
179     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
180         if self.already_shutdown:
181             raise Exception('Submitted work after shutdown.')
182         self.adjust_task_count(+1)
183         newargs = []
184         newargs.append(function)
185         for arg in args:
186             newargs.append(arg)
187         start = time.time()
188         result = self._thread_pool_executor.submit(
189             ThreadExecutor.run_local_bundle, *newargs, **kwargs
190         )
191         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
192         result.add_done_callback(lambda _: self.adjust_task_count(-1))
193         return result
194
195     @overrides
196     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
197         if not self.already_shutdown:
198             logger.debug('Shutting down threadpool executor %s', self.title)
199             self._thread_pool_executor.shutdown(wait)
200             if not quiet:
201                 print(self.histogram.__repr__(label_formatter='%ds'))
202             self.already_shutdown = True
203
204
205 class ProcessExecutor(BaseExecutor):
206     """An executor which runs tasks in child processes.
207
208     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
209     """
210
211     def __init__(self, max_workers=None):
212         super().__init__()
213         workers = None
214         if max_workers is not None:
215             workers = max_workers
216         elif 'executors_processpool_size' in config.config:
217             workers = config.config['executors_processpool_size']
218         if workers is not None:
219             logger.debug('Creating processpool executor with %d workers.', workers)
220         else:
221             logger.debug('Creating a default sized processpool executor')
222         self._process_executor = fut.ProcessPoolExecutor(
223             max_workers=workers,
224         )
225         self.already_shutdown = False
226
227     # This is run in another process; do not adjust task count here.
228     @staticmethod
229     def run_cloud_pickle(pickle):
230         fun, args, kwargs = cloudpickle.loads(pickle)
231         logger.debug("Running pickled bundle at %s", fun.__name__)
232         result = fun(*args, **kwargs)
233         return result
234
235     @overrides
236     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
237         if self.already_shutdown:
238             raise Exception('Submitted work after shutdown.')
239         start = time.time()
240         self.adjust_task_count(+1)
241         pickle = _make_cloud_pickle(function, *args, **kwargs)
242         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
243         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
244         result.add_done_callback(lambda _: self.adjust_task_count(-1))
245         return result
246
247     @overrides
248     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
249         if not self.already_shutdown:
250             logger.debug('Shutting down processpool executor %s', self.title)
251             self._process_executor.shutdown(wait)
252             if not quiet:
253                 print(self.histogram.__repr__(label_formatter='%ds'))
254             self.already_shutdown = True
255
256     def __getstate__(self):
257         state = self.__dict__.copy()
258         state['_process_executor'] = None
259         return state
260
261
262 class RemoteExecutorException(Exception):
263     """Thrown when a bundle cannot be executed despite several retries."""
264
265     pass
266
267
268 @dataclass
269 class RemoteWorkerRecord:
270     """A record of info about a remote worker."""
271
272     username: str
273     """Username we can ssh into on this machine to run work."""
274
275     machine: str
276     """Machine address / name."""
277
278     weight: int
279     """Relative probability for the weighted policy to select this
280     machine for scheduling work."""
281
282     count: int
283     """If this machine is selected, what is the maximum number of task
284     that it can handle?"""
285
286     def __hash__(self):
287         return hash((self.username, self.machine))
288
289     def __repr__(self):
290         return f'{self.username}@{self.machine}'
291
292
293 @dataclass
294 class BundleDetails:
295     """All info necessary to define some unit of work that needs to be
296     done, where it is being run, its state, whether it is an original
297     bundle of a backup bundle, how many times it has failed, etc...
298     """
299
300     pickled_code: bytes
301     """The code to run, cloud pickled"""
302
303     uuid: str
304     """A unique identifier"""
305
306     function_name: str
307     """The name of the function we pickled"""
308
309     worker: Optional[RemoteWorkerRecord]
310     """The remote worker running this bundle or None if none (yet)"""
311
312     username: Optional[str]
313     """The remote username running this bundle or None if none (yet)"""
314
315     machine: Optional[str]
316     """The remote machine running this bundle or None if none (yet)"""
317
318     hostname: str
319     """The controller machine"""
320
321     code_file: str
322     """A unique filename to hold the work to be done"""
323
324     result_file: str
325     """Where the results should be placed / read from"""
326
327     pid: int
328     """The process id of the local subprocess watching the ssh connection
329     to the remote machine"""
330
331     start_ts: float
332     """Starting time"""
333
334     end_ts: float
335     """Ending time"""
336
337     slower_than_local_p95: bool
338     """Currently slower then 95% of other bundles on remote host"""
339
340     slower_than_global_p95: bool
341     """Currently slower than 95% of other bundles globally"""
342
343     src_bundle: Optional[BundleDetails]
344     """If this is a backup bundle, this points to the original bundle
345     that it's backing up.  None otherwise."""
346
347     is_cancelled: threading.Event
348     """An event that can be signaled to indicate this bundle is cancelled.
349     This is set when another copy (backup or original) of this work has
350     completed successfully elsewhere."""
351
352     was_cancelled: bool
353     """True if this bundle was cancelled, False if it finished normally"""
354
355     backup_bundles: Optional[List[BundleDetails]]
356     """If we've created backups of this bundle, this is the list of them"""
357
358     failure_count: int
359     """How many times has this bundle failed already?"""
360
361     def __repr__(self):
362         uuid = self.uuid
363         if uuid[-9:-2] == '_backup':
364             uuid = uuid[:-9]
365             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
366         else:
367             suffix = uuid[-6:]
368
369         # We colorize the uuid based on some bits from it to make them
370         # stand out in the logging and help a reader correlate log messages
371         # related to the same bundle.
372         colorz = [
373             fg('violet red'),
374             fg('red'),
375             fg('orange'),
376             fg('peach orange'),
377             fg('yellow'),
378             fg('marigold yellow'),
379             fg('green yellow'),
380             fg('tea green'),
381             fg('cornflower blue'),
382             fg('turquoise blue'),
383             fg('tropical blue'),
384             fg('lavender purple'),
385             fg('medium purple'),
386         ]
387         c = colorz[int(uuid[-2:], 16) % len(colorz)]
388         function_name = self.function_name if self.function_name is not None else 'nofname'
389         machine = self.machine if self.machine is not None else 'nomachine'
390         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
391
392
393 class RemoteExecutorStatus:
394     """A status 'scoreboard' for a remote executor tracking various
395     metrics and able to render a periodic dump of global state.
396     """
397
398     def __init__(self, total_worker_count: int) -> None:
399         """C'tor.
400
401         Args:
402             total_worker_count: number of workers in the pool
403
404         """
405         self.worker_count: int = total_worker_count
406         self.known_workers: Set[RemoteWorkerRecord] = set()
407         self.start_time: float = time.time()
408         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
409         self.end_per_bundle: Dict[str, float] = defaultdict(float)
410         self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
411         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
412         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
413         self.finished_bundle_timings: List[float] = []
414         self.last_periodic_dump: Optional[float] = None
415         self.total_bundles_submitted: int = 0
416
417         # Protects reads and modification using self.  Also used
418         # as a memory fence for modifications to bundle.
419         self.lock: threading.Lock = threading.Lock()
420
421     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
422         """Record that bundle with uuid is assigned to a particular worker.
423
424         Args:
425             worker: the record of the worker to which uuid is assigned
426             uuid: the uuid of a bundle that has been assigned to a worker
427         """
428         with self.lock:
429             self.record_acquire_worker_already_locked(worker, uuid)
430
431     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
432         """Same as above but an entry point that doesn't acquire the lock
433         for codepaths where it's already held."""
434         assert self.lock.locked()
435         self.known_workers.add(worker)
436         self.start_per_bundle[uuid] = None
437         x = self.in_flight_bundles_by_worker.get(worker, set())
438         x.add(uuid)
439         self.in_flight_bundles_by_worker[worker] = x
440
441     def record_bundle_details(self, details: BundleDetails) -> None:
442         """Register the details about a bundle of work."""
443         with self.lock:
444             self.record_bundle_details_already_locked(details)
445
446     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
447         """Same as above but for codepaths that already hold the lock."""
448         assert self.lock.locked()
449         self.bundle_details_by_uuid[details.uuid] = details
450
451     def record_release_worker(
452         self,
453         worker: RemoteWorkerRecord,
454         uuid: str,
455         was_cancelled: bool,
456     ) -> None:
457         """Record that a bundle has released a worker."""
458         with self.lock:
459             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
460
461     def record_release_worker_already_locked(
462         self,
463         worker: RemoteWorkerRecord,
464         uuid: str,
465         was_cancelled: bool,
466     ) -> None:
467         """Same as above but for codepaths that already hold the lock."""
468         assert self.lock.locked()
469         ts = time.time()
470         self.end_per_bundle[uuid] = ts
471         self.in_flight_bundles_by_worker[worker].remove(uuid)
472         if not was_cancelled:
473             start = self.start_per_bundle[uuid]
474             assert start is not None
475             bundle_latency = ts - start
476             x = self.finished_bundle_timings_per_worker.get(worker, [])
477             x.append(bundle_latency)
478             self.finished_bundle_timings_per_worker[worker] = x
479             self.finished_bundle_timings.append(bundle_latency)
480
481     def record_processing_began(self, uuid: str):
482         """Record when work on a bundle begins."""
483         with self.lock:
484             self.start_per_bundle[uuid] = time.time()
485
486     def total_in_flight(self) -> int:
487         """How many bundles are in flight currently?"""
488         assert self.lock.locked()
489         total_in_flight = 0
490         for worker in self.known_workers:
491             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
492         return total_in_flight
493
494     def total_idle(self) -> int:
495         """How many idle workers are there currently?"""
496         assert self.lock.locked()
497         return self.worker_count - self.total_in_flight()
498
499     def __repr__(self):
500         assert self.lock.locked()
501         ts = time.time()
502         total_finished = len(self.finished_bundle_timings)
503         total_in_flight = self.total_in_flight()
504         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
505         qall = None
506         if len(self.finished_bundle_timings) > 1:
507             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
508             ret += (
509                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
510                 f'✅={total_finished}/{self.total_bundles_submitted}, '
511                 f'💻n={total_in_flight}/{self.worker_count}\n'
512             )
513         else:
514             ret += (
515                 f'⏱={ts-self.start_time:.1f}s, '
516                 f'✅={total_finished}/{self.total_bundles_submitted}, '
517                 f'💻n={total_in_flight}/{self.worker_count}\n'
518             )
519
520         for worker in self.known_workers:
521             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
522             timings = self.finished_bundle_timings_per_worker.get(worker, [])
523             count = len(timings)
524             qworker = None
525             if count > 1:
526                 qworker = numpy.quantile(timings, [0.5, 0.95])
527                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
528             else:
529                 ret += '\n'
530             if count > 0:
531                 ret += f'    ...finished {count} total bundle(s) so far\n'
532             in_flight = len(self.in_flight_bundles_by_worker[worker])
533             if in_flight > 0:
534                 ret += f'    ...{in_flight} bundles currently in flight:\n'
535                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
536                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
537                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
538                     if self.start_per_bundle[bundle_uuid] is not None:
539                         sec = ts - self.start_per_bundle[bundle_uuid]
540                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
541                     else:
542                         ret += f'       {details} setting up / copying data...'
543                         sec = 0.0
544
545                     if qworker is not None:
546                         if sec > qworker[1]:
547                             ret += f'{bg("red")}>💻p95{reset()} '
548                             if details is not None:
549                                 details.slower_than_local_p95 = True
550                         else:
551                             if details is not None:
552                                 details.slower_than_local_p95 = False
553
554                     if qall is not None:
555                         if sec > qall[1]:
556                             ret += f'{bg("red")}>∀p95{reset()} '
557                             if details is not None:
558                                 details.slower_than_global_p95 = True
559                         else:
560                             details.slower_than_global_p95 = False
561                     ret += '\n'
562         return ret
563
564     def periodic_dump(self, total_bundles_submitted: int) -> None:
565         assert self.lock.locked()
566         self.total_bundles_submitted = total_bundles_submitted
567         ts = time.time()
568         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
569             print(self)
570             self.last_periodic_dump = ts
571
572
573 class RemoteWorkerSelectionPolicy(ABC):
574     """A policy for selecting a remote worker base class."""
575
576     def __init__(self):
577         self.workers: Optional[List[RemoteWorkerRecord]] = None
578
579     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
580         self.workers = workers
581
582     @abstractmethod
583     def is_worker_available(self) -> bool:
584         pass
585
586     @abstractmethod
587     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
588         pass
589
590
591 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
592     """A remote worker selector that uses weighted RNG."""
593
594     @overrides
595     def is_worker_available(self) -> bool:
596         if self.workers:
597             for worker in self.workers:
598                 if worker.count > 0:
599                     return True
600         return False
601
602     @overrides
603     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
604         grabbag = []
605         if self.workers:
606             for worker in self.workers:
607                 if worker.machine != machine_to_avoid:
608                     if worker.count > 0:
609                         for _ in range(worker.count * worker.weight):
610                             grabbag.append(worker)
611
612         if len(grabbag) == 0:
613             logger.debug('There are no available workers that avoid %s', machine_to_avoid)
614             if self.workers:
615                 for worker in self.workers:
616                     if worker.count > 0:
617                         for _ in range(worker.count * worker.weight):
618                             grabbag.append(worker)
619
620         if len(grabbag) == 0:
621             logger.warning('There are no available workers?!')
622             return None
623
624         worker = random.sample(grabbag, 1)[0]
625         assert worker.count > 0
626         worker.count -= 1
627         logger.debug('Selected worker %s', worker)
628         return worker
629
630
631 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
632     """A remote worker selector that just round robins."""
633
634     def __init__(self) -> None:
635         super().__init__()
636         self.index = 0
637
638     @overrides
639     def is_worker_available(self) -> bool:
640         if self.workers:
641             for worker in self.workers:
642                 if worker.count > 0:
643                     return True
644         return False
645
646     @overrides
647     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
648         if self.workers:
649             x = self.index
650             while True:
651                 worker = self.workers[x]
652                 if worker.count > 0:
653                     worker.count -= 1
654                     x += 1
655                     if x >= len(self.workers):
656                         x = 0
657                     self.index = x
658                     logger.debug('Selected worker %s', worker)
659                     return worker
660                 x += 1
661                 if x >= len(self.workers):
662                     x = 0
663                 if x == self.index:
664                     logger.warning('Unexpectedly could not find a worker, retrying...')
665                     return None
666         return None
667
668
669 class RemoteExecutor(BaseExecutor):
670     """An executor that uses processes on remote machines to do work.  This
671     works by creating "bundles" of work with pickled code in each to be
672     executed.  Each bundle is assigned a remote worker based on some policy
673     heuristics.  Once assigned to a remote worker, a local subprocess is
674     created.  It copies the pickled code to the remote machine via ssh/scp
675     and then starts up work on the remote machine again using ssh.  When
676     the work is complete it copies the results back to the local machine.
677
678     So there is essentially one "controller" machine (which may also be
679     in the remote executor pool and therefore do task work in addition to
680     controlling) and N worker machines.  This code runs on the controller
681     whereas on the worker machines we invoke pickled user code via a
682     shim in :file:`remote_worker.py`.
683
684     Some redundancy and safety provisions are made; e.g. slower than
685     expected tasks have redundant backups created and if a task fails
686     repeatedly we consider it poisoned and give up on it.
687
688     .. warning::
689
690         The network overhead / latency of copying work from the
691         controller machine to the remote workers is relatively high.
692         This executor probably only makes sense to use with
693         computationally expensive tasks such as jobs that will execute
694         for ~30 seconds or longer.
695
696     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
697     """
698
699     def __init__(
700         self,
701         workers: List[RemoteWorkerRecord],
702         policy: RemoteWorkerSelectionPolicy,
703     ) -> None:
704         """C'tor.
705
706         Args:
707             workers: A list of remote workers we can call on to do tasks.
708             policy: A policy for selecting remote workers for tasks.
709         """
710
711         super().__init__()
712         self.workers = workers
713         self.policy = policy
714         self.worker_count = 0
715         for worker in self.workers:
716             self.worker_count += worker.count
717         if self.worker_count <= 0:
718             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
719             logger.critical(msg)
720             raise RemoteExecutorException(msg)
721         self.policy.register_worker_pool(self.workers)
722         self.cv = threading.Condition()
723         logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
724         self._helper_executor = fut.ThreadPoolExecutor(
725             thread_name_prefix="remote_executor_helper",
726             max_workers=self.worker_count,
727         )
728         self.status = RemoteExecutorStatus(self.worker_count)
729         self.total_bundles_submitted = 0
730         self.backup_lock = threading.Lock()
731         self.last_backup = None
732         (
733             self.heartbeat_thread,
734             self.heartbeat_stop_event,
735         ) = self._run_periodic_heartbeat()
736         self.already_shutdown = False
737
738     @background_thread
739     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
740         """
741         We create a background thread to invoke :meth:`_heartbeat` regularly
742         while we are scheduling work.  It does some accounting such as
743         looking for slow bundles to tag for backup creation, checking for
744         unexpected failures, and printing a fancy message on stdout.
745         """
746         while not stop_event.is_set():
747             time.sleep(5.0)
748             logger.debug('Running periodic heartbeat code...')
749             self._heartbeat()
750         logger.debug('Periodic heartbeat thread shutting down.')
751
752     def _heartbeat(self) -> None:
753         # Note: this is invoked on a background thread, not an
754         # executor thread.  Be careful what you do with it b/c it
755         # needs to get back and dump status again periodically.
756         with self.status.lock:
757             self.status.periodic_dump(self.total_bundles_submitted)
758
759             # Look for bundles to reschedule via executor.submit
760             if config.config['executors_schedule_remote_backups']:
761                 self._maybe_schedule_backup_bundles()
762
763     def _maybe_schedule_backup_bundles(self):
764         """Maybe schedule backup bundles if we see a very slow bundle."""
765
766         assert self.status.lock.locked()
767         num_done = len(self.status.finished_bundle_timings)
768         num_idle_workers = self.worker_count - self.task_count
769         now = time.time()
770         if (
771             num_done >= 2
772             and num_idle_workers > 0
773             and (self.last_backup is None or (now - self.last_backup > 9.0))
774             and self.backup_lock.acquire(blocking=False)
775         ):
776             try:
777                 assert self.backup_lock.locked()
778
779                 bundle_to_backup = None
780                 best_score = None
781                 for (
782                     worker,
783                     bundle_uuids,
784                 ) in self.status.in_flight_bundles_by_worker.items():
785
786                     # Prefer to schedule backups of bundles running on
787                     # slower machines.
788                     base_score = 0
789                     for record in self.workers:
790                         if worker.machine == record.machine:
791                             base_score = float(record.weight)
792                             base_score = 1.0 / base_score
793                             base_score *= 200.0
794                             base_score = int(base_score)
795                             break
796
797                     for uuid in bundle_uuids:
798                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
799                         if (
800                             bundle is not None
801                             and bundle.src_bundle is None
802                             and bundle.backup_bundles is not None
803                         ):
804                             score = base_score
805
806                             # Schedule backups of bundles running
807                             # longer; especially those that are
808                             # unexpectedly slow.
809                             start_ts = self.status.start_per_bundle[uuid]
810                             if start_ts is not None:
811                                 runtime = now - start_ts
812                                 score += runtime
813                                 logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
814
815                                 if bundle.slower_than_local_p95:
816                                     score += runtime / 2
817                                     logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
818
819                                 if bundle.slower_than_global_p95:
820                                     score += runtime / 4
821                                     logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
822
823                             # Prefer backups of bundles that don't
824                             # have backups already.
825                             backup_count = len(bundle.backup_bundles)
826                             if backup_count == 0:
827                                 score *= 2
828                             elif backup_count == 1:
829                                 score /= 2
830                             elif backup_count == 2:
831                                 score /= 8
832                             else:
833                                 score = 0
834                             logger.debug(
835                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
836                                 bundle,
837                                 score,
838                             )
839
840                             if score != 0 and (best_score is None or score > best_score):
841                                 bundle_to_backup = bundle
842                                 assert bundle is not None
843                                 assert bundle.backup_bundles is not None
844                                 assert bundle.src_bundle is None
845                                 best_score = score
846
847                 # Note: this is all still happening on the heartbeat
848                 # runner thread.  That's ok because
849                 # _schedule_backup_for_bundle uses the executor to
850                 # submit the bundle again which will cause it to be
851                 # picked up by a worker thread and allow this thread
852                 # to return to run future heartbeats.
853                 if bundle_to_backup is not None:
854                     self.last_backup = now
855                     logger.info(
856                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
857                         bundle_to_backup,
858                         best_score,
859                     )
860                     self._schedule_backup_for_bundle(bundle_to_backup)
861             finally:
862                 self.backup_lock.release()
863
864     def _is_worker_available(self) -> bool:
865         """Is there a worker available currently?"""
866         return self.policy.is_worker_available()
867
868     def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
869         """Try to acquire a worker."""
870         return self.policy.acquire_worker(machine_to_avoid)
871
872     def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
873         """Find a worker or block until one becomes available."""
874         with self.cv:
875             while not self._is_worker_available():
876                 self.cv.wait()
877             worker = self._acquire_worker(machine_to_avoid)
878             if worker is not None:
879                 return worker
880         msg = "We should never reach this point in the code"
881         logger.critical(msg)
882         raise Exception(msg)
883
884     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
885         """Release a previously acquired worker."""
886         worker = bundle.worker
887         assert worker is not None
888         logger.debug('Released worker %s', worker)
889         self.status.record_release_worker(
890             worker,
891             bundle.uuid,
892             was_cancelled,
893         )
894         with self.cv:
895             worker.count += 1
896             self.cv.notify()
897         self.adjust_task_count(-1)
898
899     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
900         """See if a particular bundle is cancelled.  Do not block."""
901         with self.status.lock:
902             if bundle.is_cancelled.wait(timeout=0.0):
903                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
904                 bundle.was_cancelled = True
905                 return True
906         return False
907
908     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
909         """Find a worker for bundle or block until one is available."""
910
911         self.adjust_task_count(+1)
912         uuid = bundle.uuid
913         hostname = bundle.hostname
914         avoid_machine = override_avoid_machine
915         is_original = bundle.src_bundle is None
916
917         # Try not to schedule a backup on the same host as the original.
918         if avoid_machine is None and bundle.src_bundle is not None:
919             avoid_machine = bundle.src_bundle.machine
920         worker = None
921         while worker is None:
922             worker = self._find_available_worker_or_block(avoid_machine)
923         assert worker is not None
924
925         # Ok, found a worker.
926         bundle.worker = worker
927         machine = bundle.machine = worker.machine
928         username = bundle.username = worker.username
929         self.status.record_acquire_worker(worker, uuid)
930         logger.debug('%s: Running bundle on %s...', bundle, worker)
931
932         # Before we do any work, make sure the bundle is still viable.
933         # It may have been some time between when it was submitted and
934         # now due to lack of worker availability and someone else may
935         # have already finished it.
936         if self._check_if_cancelled(bundle):
937             try:
938                 return self._process_work_result(bundle)
939             except Exception as e:
940                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
941                 self._release_worker(bundle)
942                 if is_original:
943                     # Weird.  We are the original owner of this
944                     # bundle.  For it to have been cancelled, a backup
945                     # must have already started and completed before
946                     # we even for started.  Moreover, the backup says
947                     # it is done but we can't find the results it
948                     # should have copied over.  Reschedule the whole
949                     # thing.
950                     logger.exception(e)
951                     logger.error(
952                         '%s: We are the original owner thread and yet there are '
953                         'no results for this bundle.  This is unexpected and bad.',
954                         bundle,
955                     )
956                     return self._emergency_retry_nasty_bundle(bundle)
957                 else:
958                     # We're a backup and our bundle is cancelled
959                     # before we even got started.  Do nothing and let
960                     # the original bundle's thread worry about either
961                     # finding the results or complaining about it.
962                     return None
963
964         # Send input code / data to worker machine if it's not local.
965         if hostname not in machine:
966             try:
967                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
968                 start_ts = time.time()
969                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
970                 run_silently(cmd)
971                 xfer_latency = time.time() - start_ts
972                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
973             except Exception as e:
974                 self._release_worker(bundle)
975                 if is_original:
976                     # Weird.  We tried to copy the code to the worker
977                     # and it failed...  And we're the original bundle.
978                     # We have to retry.
979                     logger.exception(e)
980                     logger.error(
981                         "%s: Failed to send instructions to the worker machine?! "
982                         "This is not expected; we\'re the original bundle so this shouldn\'t "
983                         "be a race condition.  Attempting an emergency retry...",
984                         bundle,
985                     )
986                     return self._emergency_retry_nasty_bundle(bundle)
987                 else:
988                     # This is actually expected; we're a backup.
989                     # There's a race condition where someone else
990                     # already finished the work and removed the source
991                     # code_file before we could copy it.  Ignore.
992                     logger.warning(
993                         '%s: Failed to send instructions to the worker machine... '
994                         'We\'re a backup and this may be caused by the original (or '
995                         'some other backup) already finishing this work.  Ignoring.',
996                         bundle,
997                     )
998                     return None
999
1000         # Kick off the work.  Note that if this fails we let
1001         # _wait_for_process deal with it.
1002         self.status.record_processing_began(uuid)
1003         cmd = (
1004             f'{SSH} {bundle.username}@{bundle.machine} '
1005             f'"source py39-venv/bin/activate &&'
1006             f' /home/scott/lib/python_modules/remote_worker.py'
1007             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1008         )
1009         logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
1010         p = cmd_in_background(cmd, silent=True)
1011         bundle.pid = p.pid
1012         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
1013         return self._wait_for_process(p, bundle, 0)
1014
1015     def _wait_for_process(
1016         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1017     ) -> Any:
1018         """At this point we've copied the bundle's pickled code to the remote
1019         worker and started an ssh process that should be invoking the
1020         remote worker to have it execute the user's code.  See how
1021         that's going and wait for it to complete or fail.  Note that
1022         this code is recursive: there are codepaths where we decide to
1023         stop waiting for an ssh process (because another backup seems
1024         to have finished) but then fail to fetch or parse the results
1025         from that backup and thus call ourselves to continue waiting
1026         on an active ssh process.  This is the purpose of the depth
1027         argument: to curtail potential infinite recursion by giving up
1028         eventually.
1029
1030         Args:
1031             p: the Popen record of the ssh job
1032             bundle: the bundle of work being executed remotely
1033             depth: how many retries we've made so far.  Starts at zero.
1034
1035         """
1036
1037         machine = bundle.machine
1038         assert p is not None
1039         pid = p.pid  # pid of the ssh process
1040         if depth > 3:
1041             logger.error(
1042                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1043             )
1044             p.terminate()
1045             self._release_worker(bundle)
1046             return self._emergency_retry_nasty_bundle(bundle)
1047
1048         # Spin until either the ssh job we scheduled finishes the
1049         # bundle or some backup worker signals that they finished it
1050         # before we could.
1051         while True:
1052             try:
1053                 p.wait(timeout=0.25)
1054             except subprocess.TimeoutExpired:
1055                 if self._check_if_cancelled(bundle):
1056                     logger.info('%s: looks like another worker finished bundle...', bundle)
1057                     break
1058             else:
1059                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1060                 p = None
1061                 break
1062
1063         # If we get here we believe the bundle is done; either the ssh
1064         # subprocess finished (hopefully successfully) or we noticed
1065         # that some other worker seems to have completed the bundle
1066         # before us and we're bailing out.
1067         try:
1068             ret = self._process_work_result(bundle)
1069             if ret is not None and p is not None:
1070                 p.terminate()
1071             return ret
1072
1073         # Something went wrong; e.g. we could not copy the results
1074         # back, cleanup after ourselves on the remote machine, or
1075         # unpickle the results we got from the remove machine.  If we
1076         # still have an active ssh subprocess, keep waiting on it.
1077         # Otherwise, time for an emergency reschedule.
1078         except Exception as e:
1079             logger.exception(e)
1080             logger.error('%s: Something unexpected just happened...', bundle)
1081             if p is not None:
1082                 logger.warning(
1083                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1084                 )
1085                 return self._wait_for_process(p, bundle, depth + 1)
1086             else:
1087                 self._release_worker(bundle)
1088                 return self._emergency_retry_nasty_bundle(bundle)
1089
1090     def _process_work_result(self, bundle: BundleDetails) -> Any:
1091         """A bundle seems to be completed.  Check on the results."""
1092
1093         with self.status.lock:
1094             is_original = bundle.src_bundle is None
1095             was_cancelled = bundle.was_cancelled
1096             username = bundle.username
1097             machine = bundle.machine
1098             result_file = bundle.result_file
1099             code_file = bundle.code_file
1100
1101             # Whether original or backup, if we finished first we must
1102             # fetch the results if the computation happened on a
1103             # remote machine.
1104             bundle.end_ts = time.time()
1105             if not was_cancelled:
1106                 assert bundle.machine is not None
1107                 if bundle.hostname not in bundle.machine:
1108                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1109                     logger.info(
1110                         "%s: Fetching results back from %s@%s via %s",
1111                         bundle,
1112                         username,
1113                         machine,
1114                         cmd,
1115                     )
1116
1117                     # If either of these throw they are handled in
1118                     # _wait_for_process.
1119                     attempts = 0
1120                     while True:
1121                         try:
1122                             run_silently(cmd)
1123                         except Exception as e:
1124                             attempts += 1
1125                             if attempts >= 3:
1126                                 raise e
1127                         else:
1128                             break
1129
1130                     # Cleanup remote /tmp files.
1131                     run_silently(
1132                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1133                     )
1134                     logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1135                 dur = bundle.end_ts - bundle.start_ts
1136                 self.histogram.add_item(dur)
1137
1138         # Only the original worker should unpickle the file contents
1139         # though since it's the only one whose result matters.  The
1140         # original is also the only job that may delete result_file
1141         # from disk.  Note that the original may have been cancelled
1142         # if one of the backups finished first; it still must read the
1143         # result from disk.  It still does that here with is_cancelled
1144         # set.
1145         if is_original:
1146             logger.debug("%s: Unpickling %s.", bundle, result_file)
1147             try:
1148                 with open(result_file, 'rb') as rb:
1149                     serialized = rb.read()
1150                 result = cloudpickle.loads(serialized)
1151             except Exception as e:
1152                 logger.exception(e)
1153                 logger.error('Failed to load %s... this is bad news.', result_file)
1154                 self._release_worker(bundle)
1155
1156                 # Re-raise the exception; the code in _wait_for_process may
1157                 # decide to _emergency_retry_nasty_bundle here.
1158                 raise e
1159             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1160             os.remove(result_file)
1161             os.remove(code_file)
1162
1163             # Notify any backups that the original is done so they
1164             # should stop ASAP.  Do this whether or not we
1165             # finished first since there could be more than one
1166             # backup.
1167             if bundle.backup_bundles is not None:
1168                 for backup in bundle.backup_bundles:
1169                     logger.debug(
1170                         '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1171                     )
1172                     backup.is_cancelled.set()
1173
1174         # This is a backup job and, by now, we have already fetched
1175         # the bundle results.
1176         else:
1177             # Backup results don't matter, they just need to leave the
1178             # result file in the right place for their originals to
1179             # read/unpickle later.
1180             result = None
1181
1182             # Tell the original to stop if we finished first.
1183             if not was_cancelled:
1184                 orig_bundle = bundle.src_bundle
1185                 assert orig_bundle is not None
1186                 logger.debug(
1187                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1188                 )
1189                 orig_bundle.is_cancelled.set()
1190         self._release_worker(bundle, was_cancelled=was_cancelled)
1191         return result
1192
1193     def _create_original_bundle(self, pickle, function_name: str):
1194         """Creates a bundle that is not a backup of any other bundle but
1195         rather represents a user task.
1196         """
1197
1198         uuid = string_utils.generate_uuid(omit_dashes=True)
1199         code_file = f'/tmp/{uuid}.code.bin'
1200         result_file = f'/tmp/{uuid}.result.bin'
1201
1202         logger.debug('Writing pickled code to %s', code_file)
1203         with open(code_file, 'wb') as wb:
1204             wb.write(pickle)
1205
1206         bundle = BundleDetails(
1207             pickled_code=pickle,
1208             uuid=uuid,
1209             function_name=function_name,
1210             worker=None,
1211             username=None,
1212             machine=None,
1213             hostname=platform.node(),
1214             code_file=code_file,
1215             result_file=result_file,
1216             pid=0,
1217             start_ts=time.time(),
1218             end_ts=0.0,
1219             slower_than_local_p95=False,
1220             slower_than_global_p95=False,
1221             src_bundle=None,
1222             is_cancelled=threading.Event(),
1223             was_cancelled=False,
1224             backup_bundles=[],
1225             failure_count=0,
1226         )
1227         self.status.record_bundle_details(bundle)
1228         logger.debug('%s: Created an original bundle', bundle)
1229         return bundle
1230
1231     def _create_backup_bundle(self, src_bundle: BundleDetails):
1232         """Creates a bundle that is a backup of another bundle that is
1233         running too slowly."""
1234
1235         assert self.status.lock.locked()
1236         assert src_bundle.backup_bundles is not None
1237         n = len(src_bundle.backup_bundles)
1238         uuid = src_bundle.uuid + f'_backup#{n}'
1239
1240         backup_bundle = BundleDetails(
1241             pickled_code=src_bundle.pickled_code,
1242             uuid=uuid,
1243             function_name=src_bundle.function_name,
1244             worker=None,
1245             username=None,
1246             machine=None,
1247             hostname=src_bundle.hostname,
1248             code_file=src_bundle.code_file,
1249             result_file=src_bundle.result_file,
1250             pid=0,
1251             start_ts=time.time(),
1252             end_ts=0.0,
1253             slower_than_local_p95=False,
1254             slower_than_global_p95=False,
1255             src_bundle=src_bundle,
1256             is_cancelled=threading.Event(),
1257             was_cancelled=False,
1258             backup_bundles=None,  # backup backups not allowed
1259             failure_count=0,
1260         )
1261         src_bundle.backup_bundles.append(backup_bundle)
1262         self.status.record_bundle_details_already_locked(backup_bundle)
1263         logger.debug('%s: Created a backup bundle', backup_bundle)
1264         return backup_bundle
1265
1266     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1267         """Schedule a backup of src_bundle."""
1268
1269         assert self.status.lock.locked()
1270         assert src_bundle is not None
1271         backup_bundle = self._create_backup_bundle(src_bundle)
1272         logger.debug(
1273             '%s/%s: Scheduling backup for execution...',
1274             backup_bundle.uuid,
1275             backup_bundle.function_name,
1276         )
1277         self._helper_executor.submit(self._launch, backup_bundle)
1278
1279         # Results from backups don't matter; if they finish first
1280         # they will move the result_file to this machine and let
1281         # the original pick them up and unpickle them (and return
1282         # a result).
1283
1284     def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1285         """Something unexpectedly failed with bundle.  Either retry it
1286         from the beginning or throw in the towel and give up on it."""
1287
1288         is_original = bundle.src_bundle is None
1289         bundle.worker = None
1290         avoid_last_machine = bundle.machine
1291         bundle.machine = None
1292         bundle.username = None
1293         bundle.failure_count += 1
1294         if is_original:
1295             retry_limit = 3
1296         else:
1297             retry_limit = 2
1298
1299         if bundle.failure_count > retry_limit:
1300             logger.error(
1301                 '%s: Tried this bundle too many times already (%dx); giving up.',
1302                 bundle,
1303                 retry_limit,
1304             )
1305             if is_original:
1306                 raise RemoteExecutorException(
1307                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1308                 )
1309             else:
1310                 logger.error(
1311                     '%s: At least it\'s only a backup; better luck with the others.', bundle
1312                 )
1313             return None
1314         else:
1315             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1316             logger.warning(msg)
1317             warnings.warn(msg)
1318             return self._launch(bundle, avoid_last_machine)
1319
1320     @overrides
1321     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1322         """Submit work to be done.  This is the user entry point of this
1323         class."""
1324         if self.already_shutdown:
1325             raise Exception('Submitted work after shutdown.')
1326         pickle = _make_cloud_pickle(function, *args, **kwargs)
1327         bundle = self._create_original_bundle(pickle, function.__name__)
1328         self.total_bundles_submitted += 1
1329         return self._helper_executor.submit(self._launch, bundle)
1330
1331     @overrides
1332     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1333         """Shutdown the executor."""
1334         if not self.already_shutdown:
1335             logging.debug('Shutting down RemoteExecutor %s', self.title)
1336             self.heartbeat_stop_event.set()
1337             self.heartbeat_thread.join()
1338             self._helper_executor.shutdown(wait)
1339             if not quiet:
1340                 print(self.histogram.__repr__(label_formatter='%ds'))
1341             self.already_shutdown = True
1342
1343
1344 class RemoteWorkerPoolProvider:
1345     @abstractmethod
1346     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1347         pass
1348
1349
1350 @persistent.persistent_autoloaded_singleton()  # type: ignore
1351 class ConfigRemoteWorkerPoolProvider(RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent):
1352     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1353         self.remote_worker_pool = []
1354         for record in json_remote_worker_pool['remote_worker_records']:
1355             self.remote_worker_pool.append(self.dataclassFromDict(RemoteWorkerRecord, record))
1356         assert len(self.remote_worker_pool) > 0
1357
1358     @staticmethod
1359     def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1360         fieldSet = {f.name for f in fields(clsName) if f.init}
1361         filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1362         return clsName(**filteredArgDict)
1363
1364     @overrides
1365     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1366         return self.remote_worker_pool
1367
1368     @overrides
1369     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1370         return self.remote_worker_pool
1371
1372     @staticmethod
1373     @overrides
1374     def get_filename() -> str:
1375         return config.config['remote_worker_records_file']
1376
1377     @staticmethod
1378     @overrides
1379     def should_we_load_data(filename: str) -> bool:
1380         return True
1381
1382     @staticmethod
1383     @overrides
1384     def should_we_save_data(filename: str) -> bool:
1385         return False
1386
1387
1388 @singleton
1389 class DefaultExecutors(object):
1390     """A container for a default thread, process and remote executor.
1391     These are not created until needed and we take care to clean up
1392     before process exit automatically for the caller's convenience.
1393     Instead of creating your own executor, consider using the one
1394     from this pool.  e.g.::
1395
1396         @par.parallelize(method=par.Method.PROCESS)
1397         def do_work(
1398             solutions: List[Work],
1399             shard_num: int,
1400             ...
1401         ):
1402             <do the work>
1403
1404
1405         def start_do_work(all_work: List[Work]):
1406             shards = []
1407             logger.debug('Sharding work into groups of 10.')
1408             for subset in list_utils.shard(all_work, 10):
1409                 shards.append([x for x in subset])
1410
1411             logger.debug('Kicking off helper pool.')
1412             try:
1413                 for n, shard in enumerate(shards):
1414                     results.append(
1415                         do_work(
1416                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1417                         )
1418                     )
1419                 smart_future.wait_all(results)
1420             finally:
1421                 # Note: if you forget to do this it will clean itself up
1422                 # during program termination including tearing down any
1423                 # active ssh connections.
1424                 executors.DefaultExecutors().process_pool().shutdown()
1425     """
1426
1427     def __init__(self):
1428         self.thread_executor: Optional[ThreadExecutor] = None
1429         self.process_executor: Optional[ProcessExecutor] = None
1430         self.remote_executor: Optional[RemoteExecutor] = None
1431
1432     @staticmethod
1433     def _ping(host) -> bool:
1434         logger.debug('RUN> ping -c 1 %s', host)
1435         try:
1436             x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1437             return x == 0
1438         except Exception:
1439             return False
1440
1441     def thread_pool(self) -> ThreadExecutor:
1442         if self.thread_executor is None:
1443             self.thread_executor = ThreadExecutor()
1444         return self.thread_executor
1445
1446     def process_pool(self) -> ProcessExecutor:
1447         if self.process_executor is None:
1448             self.process_executor = ProcessExecutor()
1449         return self.process_executor
1450
1451     def remote_pool(self) -> RemoteExecutor:
1452         if self.remote_executor is None:
1453             logger.info('Looking for some helper machines...')
1454             provider = ConfigRemoteWorkerPoolProvider()
1455             all_machines = provider.get_remote_workers()
1456             pool = []
1457
1458             # Make sure we can ping each machine.
1459             for record in all_machines:
1460                 if self._ping(record.machine):
1461                     logger.info('%s is alive / responding to pings', record.machine)
1462                     pool.append(record)
1463
1464             # The controller machine has a lot to do; go easy on it.
1465             for record in pool:
1466                 if record.machine == platform.node() and record.count > 1:
1467                     logger.info('Reducing workload for %s.', record.machine)
1468                     record.count = max(int(record.count / 2), 1)
1469
1470             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1471             policy.register_worker_pool(pool)
1472             self.remote_executor = RemoteExecutor(pool, policy)
1473         return self.remote_executor
1474
1475     def shutdown(self) -> None:
1476         if self.thread_executor is not None:
1477             self.thread_executor.shutdown(wait=True, quiet=True)
1478             self.thread_executor = None
1479         if self.process_executor is not None:
1480             self.process_executor.shutdown(wait=True, quiet=True)
1481             self.process_executor = None
1482         if self.remote_executor is not None:
1483             self.remote_executor.shutdown(wait=True, quiet=True)
1484             self.remote_executor = None