More documentation improvements.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """
7 This module defines a :class:`BaseExecutor` interface and three
8 implementations:
9
10     - :class:`ThreadExecutor`
11     - :class:`ProcessExecutor`
12     - :class:`RemoteExecutor`
13
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work.  Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code.  But it's good for
18 work that is mostly I/O bound.
19
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
22 workloads.
23
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network.  You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
29
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed.  It has the added benefit of being automatically
33 cleaned up at process termination time.
34
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
37 """
38
39 from __future__ import annotations
40
41 import concurrent.futures as fut
42 import logging
43 import os
44 import platform
45 import random
46 import subprocess
47 import threading
48 import time
49 import warnings
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass, fields
53 from typing import Any, Callable, Dict, List, Optional, Set
54
55 import cloudpickle  # type: ignore
56 from overrides import overrides
57
58 import pyutils.typez.histogram as hist
59 from pyutils import argparse_utils, config, math_utils, persistent, string_utils
60 from pyutils.ansi import bg, fg, reset, underline
61 from pyutils.decorator_utils import singleton
62 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
63 from pyutils.parallelize.thread_utils import background_thread
64
65 logger = logging.getLogger(__name__)
66
67 parser = config.add_commandline_args(
68     f"Executors ({__file__})", "Args related to processing executors."
69 )
70 parser.add_argument(
71     '--executors_threadpool_size',
72     type=int,
73     metavar='#THREADS',
74     help='Number of threads in the default threadpool, leave unset for default',
75     default=None,
76 )
77 parser.add_argument(
78     '--executors_processpool_size',
79     type=int,
80     metavar='#PROCESSES',
81     help='Number of processes in the default processpool, leave unset for default',
82     default=None,
83 )
84 parser.add_argument(
85     '--executors_schedule_remote_backups',
86     default=True,
87     action=argparse_utils.ActionNoYes,
88     help='Should we schedule duplicative backup work if a remote bundle is slow',
89 )
90 parser.add_argument(
91     '--executors_max_bundle_failures',
92     type=int,
93     default=3,
94     metavar='#FAILURES',
95     help='Maximum number of failures before giving up on a bundle',
96 )
97 parser.add_argument(
98     '--remote_worker_records_file',
99     type=str,
100     metavar='FILENAME',
101     help='Path of the remote worker records file (JSON)',
102     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
103 )
104 parser.add_argument(
105     '--remote_worker_helper_path',
106     type=str,
107     metavar='PATH_TO_REMOTE_WORKER_PY',
108     help='Path to remote_worker.py on remote machines',
109     default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
110 )
111
112
113 SSH = '/usr/bin/ssh -oForwardX11=no'
114 SCP = '/usr/bin/scp -C'
115
116
117 def _make_cloud_pickle(fun, *args, **kwargs):
118     """Internal helper to create cloud pickles."""
119     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
120     return cloudpickle.dumps((fun, args, kwargs))
121
122
123 class BaseExecutor(ABC):
124     """The base executor interface definition.  The interface for
125     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
126     :class:`ThreadExecutor`.
127     """
128
129     def __init__(self, *, title=''):
130         """
131         Args:
132             title: the name of this executor.
133         """
134         self.title = title
135         self.histogram = hist.SimpleHistogram(
136             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
137         )
138         self.task_count = 0
139
140     @abstractmethod
141     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142         """Submit work for the executor to do.
143
144         Args:
145             function: the Callable to be executed.
146             *args: the arguments to function
147             **kwargs: the arguments to function
148
149         Returns:
150             A concurrent :class:`Future` representing the result of the
151             work.
152         """
153         pass
154
155     @abstractmethod
156     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
157         """Shutdown the executor.
158
159         Args:
160             wait: wait for the shutdown to complete before returning?
161             quiet: keep it quiet, please.
162         """
163         pass
164
165     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
166         """Shutdown the executor and return True if the executor is idle
167         (i.e. there are no pending or active tasks).  Return False
168         otherwise.  Note: this should only be called by the launcher
169         process.
170
171         Args:
172             quiet: keep it quiet, please.
173
174         Returns:
175             True if the executor could be shut down because it has no
176             pending work, False otherwise.
177         """
178         if self.task_count == 0:
179             self.shutdown(wait=True, quiet=quiet)
180             return True
181         return False
182
183     def adjust_task_count(self, delta: int) -> None:
184         """Change the task count.  Note: do not call this method from a
185         worker, it should only be called by the launcher process /
186         thread / machine.
187
188         Args:
189             delta: the delta value by which to adjust task count.
190         """
191         self.task_count += delta
192         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
193
194     def get_task_count(self) -> int:
195         """Change the task count.  Note: do not call this method from a
196         worker, it should only be called by the launcher process /
197         thread / machine.
198
199         Returns:
200             The executor's current task count.
201         """
202         return self.task_count
203
204
205 class ThreadExecutor(BaseExecutor):
206     """A threadpool executor.  This executor uses Python threads to
207     schedule tasks.  Note that, at least as of python3.10, because of
208     the global lock in the interpreter itself, these do not
209     parallelize very well so this class is useful mostly for non-CPU
210     intensive tasks.
211
212     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
213     """
214
215     def __init__(self, max_workers: Optional[int] = None):
216         """
217         Args:
218             max_workers: maximum number of threads to create in the pool.
219         """
220         super().__init__()
221         workers = None
222         if max_workers is not None:
223             workers = max_workers
224         elif 'executors_threadpool_size' in config.config:
225             workers = config.config['executors_threadpool_size']
226         if workers is not None:
227             logger.debug('Creating threadpool executor with %d workers', workers)
228         else:
229             logger.debug('Creating a default sized threadpool executor')
230         self._thread_pool_executor = fut.ThreadPoolExecutor(
231             max_workers=workers, thread_name_prefix="thread_executor_helper"
232         )
233         self.already_shutdown = False
234
235     # This is run on a different thread; do not adjust task count here.
236     @staticmethod
237     def _run_local_bundle(fun, *args, **kwargs):
238         logger.debug("Running local bundle at %s", fun.__name__)
239         result = fun(*args, **kwargs)
240         return result
241
242     @overrides
243     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
244         if self.already_shutdown:
245             raise Exception('Submitted work after shutdown.')
246         self.adjust_task_count(+1)
247         newargs = []
248         newargs.append(function)
249         for arg in args:
250             newargs.append(arg)
251         start = time.time()
252         result = self._thread_pool_executor.submit(
253             ThreadExecutor._run_local_bundle, *newargs, **kwargs
254         )
255         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
256         result.add_done_callback(lambda _: self.adjust_task_count(-1))
257         return result
258
259     @overrides
260     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
261         if not self.already_shutdown:
262             logger.debug('Shutting down threadpool executor %s', self.title)
263             self._thread_pool_executor.shutdown(wait)
264             if not quiet:
265                 print(self.histogram.__repr__(label_formatter='%ds'))
266             self.already_shutdown = True
267
268
269 class ProcessExecutor(BaseExecutor):
270     """An executor which runs tasks in child processes.
271
272     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
273     """
274
275     def __init__(self, max_workers=None):
276         """
277         Args:
278             max_workers: the max number of worker processes to create.
279         """
280         super().__init__()
281         workers = None
282         if max_workers is not None:
283             workers = max_workers
284         elif 'executors_processpool_size' in config.config:
285             workers = config.config['executors_processpool_size']
286         if workers is not None:
287             logger.debug('Creating processpool executor with %d workers.', workers)
288         else:
289             logger.debug('Creating a default sized processpool executor')
290         self._process_executor = fut.ProcessPoolExecutor(
291             max_workers=workers,
292         )
293         self.already_shutdown = False
294
295     # This is run in another process; do not adjust task count here.
296     @staticmethod
297     def _run_cloud_pickle(pickle):
298         fun, args, kwargs = cloudpickle.loads(pickle)
299         logger.debug("Running pickled bundle at %s", fun.__name__)
300         result = fun(*args, **kwargs)
301         return result
302
303     @overrides
304     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
305         if self.already_shutdown:
306             raise Exception('Submitted work after shutdown.')
307         start = time.time()
308         self.adjust_task_count(+1)
309         pickle = _make_cloud_pickle(function, *args, **kwargs)
310         result = self._process_executor.submit(
311             ProcessExecutor._run_cloud_pickle, pickle
312         )
313         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
314         result.add_done_callback(lambda _: self.adjust_task_count(-1))
315         return result
316
317     @overrides
318     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
319         if not self.already_shutdown:
320             logger.debug('Shutting down processpool executor %s', self.title)
321             self._process_executor.shutdown(wait)
322             if not quiet:
323                 print(self.histogram.__repr__(label_formatter='%ds'))
324             self.already_shutdown = True
325
326     def __getstate__(self):
327         state = self.__dict__.copy()
328         state['_process_executor'] = None
329         return state
330
331
332 class RemoteExecutorException(Exception):
333     """Thrown when a bundle cannot be executed despite several retries."""
334
335     pass
336
337
338 @dataclass
339 class RemoteWorkerRecord:
340     """A record of info about a remote worker."""
341
342     username: str
343     """Username we can ssh into on this machine to run work."""
344
345     machine: str
346     """Machine address / name."""
347
348     weight: int
349     """Relative probability for the weighted policy to select this
350     machine for scheduling work."""
351
352     count: int
353     """If this machine is selected, what is the maximum number of task
354     that it can handle?"""
355
356     def __hash__(self):
357         return hash((self.username, self.machine))
358
359     def __repr__(self):
360         return f'{self.username}@{self.machine}'
361
362
363 @dataclass
364 class BundleDetails:
365     """All info necessary to define some unit of work that needs to be
366     done, where it is being run, its state, whether it is an original
367     bundle of a backup bundle, how many times it has failed, etc...
368     """
369
370     pickled_code: bytes
371     """The code to run, cloud pickled"""
372
373     uuid: str
374     """A unique identifier"""
375
376     function_name: str
377     """The name of the function we pickled"""
378
379     worker: Optional[RemoteWorkerRecord]
380     """The remote worker running this bundle or None if none (yet)"""
381
382     username: Optional[str]
383     """The remote username running this bundle or None if none (yet)"""
384
385     machine: Optional[str]
386     """The remote machine running this bundle or None if none (yet)"""
387
388     hostname: str
389     """The controller machine"""
390
391     code_file: str
392     """A unique filename to hold the work to be done"""
393
394     result_file: str
395     """Where the results should be placed / read from"""
396
397     pid: int
398     """The process id of the local subprocess watching the ssh connection
399     to the remote machine"""
400
401     start_ts: float
402     """Starting time"""
403
404     end_ts: float
405     """Ending time"""
406
407     slower_than_local_p95: bool
408     """Currently slower then 95% of other bundles on remote host"""
409
410     slower_than_global_p95: bool
411     """Currently slower than 95% of other bundles globally"""
412
413     src_bundle: Optional[BundleDetails]
414     """If this is a backup bundle, this points to the original bundle
415     that it's backing up.  None otherwise."""
416
417     is_cancelled: threading.Event
418     """An event that can be signaled to indicate this bundle is cancelled.
419     This is set when another copy (backup or original) of this work has
420     completed successfully elsewhere."""
421
422     was_cancelled: bool
423     """True if this bundle was cancelled, False if it finished normally"""
424
425     backup_bundles: Optional[List[BundleDetails]]
426     """If we've created backups of this bundle, this is the list of them"""
427
428     failure_count: int
429     """How many times has this bundle failed already?"""
430
431     def __repr__(self):
432         uuid = self.uuid
433         if uuid[-9:-2] == '_backup':
434             uuid = uuid[:-9]
435             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
436         else:
437             suffix = uuid[-6:]
438
439         # We colorize the uuid based on some bits from it to make them
440         # stand out in the logging and help a reader correlate log messages
441         # related to the same bundle.
442         colorz = [
443             fg('violet red'),
444             fg('red'),
445             fg('orange'),
446             fg('peach orange'),
447             fg('yellow'),
448             fg('marigold yellow'),
449             fg('green yellow'),
450             fg('tea green'),
451             fg('cornflower blue'),
452             fg('turquoise blue'),
453             fg('tropical blue'),
454             fg('lavender purple'),
455             fg('medium purple'),
456         ]
457         c = colorz[int(uuid[-2:], 16) % len(colorz)]
458         function_name = (
459             self.function_name if self.function_name is not None else 'nofname'
460         )
461         machine = self.machine if self.machine is not None else 'nomachine'
462         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
463
464
465 class RemoteExecutorStatus:
466     """A status 'scoreboard' for a remote executor tracking various
467     metrics and able to render a periodic dump of global state.
468     """
469
470     def __init__(self, total_worker_count: int) -> None:
471         """
472         Args:
473             total_worker_count: number of workers in the pool
474         """
475         self.worker_count: int = total_worker_count
476         self.known_workers: Set[RemoteWorkerRecord] = set()
477         self.start_time: float = time.time()
478         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
479         self.end_per_bundle: Dict[str, float] = defaultdict(float)
480         self.finished_bundle_timings_per_worker: Dict[
481             RemoteWorkerRecord, math_utils.NumericPopulation
482         ] = {}
483         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
484         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
485         self.finished_bundle_timings: math_utils.NumericPopulation = (
486             math_utils.NumericPopulation()
487         )
488         self.last_periodic_dump: Optional[float] = None
489         self.total_bundles_submitted: int = 0
490
491         # Protects reads and modification using self.  Also used
492         # as a memory fence for modifications to bundle.
493         self.lock: threading.Lock = threading.Lock()
494
495     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
496         """Record that bundle with uuid is assigned to a particular worker.
497
498         Args:
499             worker: the record of the worker to which uuid is assigned
500             uuid: the uuid of a bundle that has been assigned to a worker
501         """
502         with self.lock:
503             self.record_acquire_worker_already_locked(worker, uuid)
504
505     def record_acquire_worker_already_locked(
506         self, worker: RemoteWorkerRecord, uuid: str
507     ) -> None:
508         """Same as above but an entry point that doesn't acquire the lock
509         for codepaths where it's already held."""
510         assert self.lock.locked()
511         self.known_workers.add(worker)
512         self.start_per_bundle[uuid] = None
513         x = self.in_flight_bundles_by_worker.get(worker, set())
514         x.add(uuid)
515         self.in_flight_bundles_by_worker[worker] = x
516
517     def record_bundle_details(self, details: BundleDetails) -> None:
518         """Register the details about a bundle of work."""
519         with self.lock:
520             self.record_bundle_details_already_locked(details)
521
522     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
523         """Same as above but for codepaths that already hold the lock."""
524         assert self.lock.locked()
525         self.bundle_details_by_uuid[details.uuid] = details
526
527     def record_release_worker(
528         self,
529         worker: RemoteWorkerRecord,
530         uuid: str,
531         was_cancelled: bool,
532     ) -> None:
533         """Record that a bundle has released a worker."""
534         with self.lock:
535             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
536
537     def record_release_worker_already_locked(
538         self,
539         worker: RemoteWorkerRecord,
540         uuid: str,
541         was_cancelled: bool,
542     ) -> None:
543         """Same as above but for codepaths that already hold the lock."""
544         assert self.lock.locked()
545         ts = time.time()
546         self.end_per_bundle[uuid] = ts
547         self.in_flight_bundles_by_worker[worker].remove(uuid)
548         if not was_cancelled:
549             start = self.start_per_bundle[uuid]
550             assert start is not None
551             bundle_latency = ts - start
552             x = self.finished_bundle_timings_per_worker.get(
553                 worker, math_utils.NumericPopulation()
554             )
555             x.add_number(bundle_latency)
556             self.finished_bundle_timings_per_worker[worker] = x
557             self.finished_bundle_timings.add_number(bundle_latency)
558
559     def record_processing_began(self, uuid: str):
560         """Record when work on a bundle begins."""
561         with self.lock:
562             self.start_per_bundle[uuid] = time.time()
563
564     def total_in_flight(self) -> int:
565         """How many bundles are in flight currently?"""
566         assert self.lock.locked()
567         total_in_flight = 0
568         for worker in self.known_workers:
569             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
570         return total_in_flight
571
572     def total_idle(self) -> int:
573         """How many idle workers are there currently?"""
574         assert self.lock.locked()
575         return self.worker_count - self.total_in_flight()
576
577     def __repr__(self):
578         assert self.lock.locked()
579         ts = time.time()
580         total_finished = len(self.finished_bundle_timings)
581         total_in_flight = self.total_in_flight()
582         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
583         qall = None
584         if len(self.finished_bundle_timings) > 1:
585             qall_median = self.finished_bundle_timings.get_median()
586             qall_p95 = self.finished_bundle_timings.get_percentile(95)
587             ret += (
588                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
589                 f'✅={total_finished}/{self.total_bundles_submitted}, '
590                 f'💻n={total_in_flight}/{self.worker_count}\n'
591             )
592         else:
593             ret += (
594                 f'⏱={ts-self.start_time:.1f}s, '
595                 f'✅={total_finished}/{self.total_bundles_submitted}, '
596                 f'💻n={total_in_flight}/{self.worker_count}\n'
597             )
598
599         for worker in self.known_workers:
600             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
601             timings = self.finished_bundle_timings_per_worker.get(
602                 worker, math_utils.NumericPopulation()
603             )
604             count = len(timings)
605             qworker_median = None
606             qworker_p95 = None
607             if count > 1:
608                 qworker_median = timings.get_median()
609                 qworker_p95 = timings.get_percentile(95)
610                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
611             else:
612                 ret += '\n'
613             if count > 0:
614                 ret += f'    ...finished {count} total bundle(s) so far\n'
615             in_flight = len(self.in_flight_bundles_by_worker[worker])
616             if in_flight > 0:
617                 ret += f'    ...{in_flight} bundles currently in flight:\n'
618                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
619                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
620                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
621                     if self.start_per_bundle[bundle_uuid] is not None:
622                         sec = ts - self.start_per_bundle[bundle_uuid]
623                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
624                     else:
625                         ret += f'       {details} setting up / copying data...'
626                         sec = 0.0
627
628                     if qworker_p95 is not None:
629                         if sec > qworker_p95:
630                             ret += f'{bg("red")}>💻p95{reset()} '
631                             if details is not None:
632                                 details.slower_than_local_p95 = True
633                         else:
634                             if details is not None:
635                                 details.slower_than_local_p95 = False
636
637                     if qall is not None:
638                         if sec > qall[1]:
639                             ret += f'{bg("red")}>∀p95{reset()} '
640                             if details is not None:
641                                 details.slower_than_global_p95 = True
642                         else:
643                             details.slower_than_global_p95 = False
644                     ret += '\n'
645         return ret
646
647     def periodic_dump(self, total_bundles_submitted: int) -> None:
648         assert self.lock.locked()
649         self.total_bundles_submitted = total_bundles_submitted
650         ts = time.time()
651         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
652             print(self)
653             self.last_periodic_dump = ts
654
655
656 class RemoteWorkerSelectionPolicy(ABC):
657     """An interface definition of a policy for selecting a remote worker."""
658
659     def __init__(self):
660         self.workers: Optional[List[RemoteWorkerRecord]] = None
661
662     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
663         self.workers = workers
664
665     @abstractmethod
666     def is_worker_available(self) -> bool:
667         pass
668
669     @abstractmethod
670     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
671         pass
672
673
674 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
675     """A remote worker selector that uses weighted RNG."""
676
677     @overrides
678     def is_worker_available(self) -> bool:
679         if self.workers:
680             for worker in self.workers:
681                 if worker.count > 0:
682                     return True
683         return False
684
685     @overrides
686     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
687         grabbag = []
688         if self.workers:
689             for worker in self.workers:
690                 if worker.machine != machine_to_avoid:
691                     if worker.count > 0:
692                         for _ in range(worker.count * worker.weight):
693                             grabbag.append(worker)
694
695         if len(grabbag) == 0:
696             logger.debug(
697                 'There are no available workers that avoid %s', machine_to_avoid
698             )
699             if self.workers:
700                 for worker in self.workers:
701                     if worker.count > 0:
702                         for _ in range(worker.count * worker.weight):
703                             grabbag.append(worker)
704
705         if len(grabbag) == 0:
706             logger.warning('There are no available workers?!')
707             return None
708
709         worker = random.sample(grabbag, 1)[0]
710         assert worker.count > 0
711         worker.count -= 1
712         logger.debug('Selected worker %s', worker)
713         return worker
714
715
716 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
717     """A remote worker selector that just round robins."""
718
719     def __init__(self) -> None:
720         super().__init__()
721         self.index = 0
722
723     @overrides
724     def is_worker_available(self) -> bool:
725         if self.workers:
726             for worker in self.workers:
727                 if worker.count > 0:
728                     return True
729         return False
730
731     @overrides
732     def acquire_worker(
733         self, machine_to_avoid: str = None
734     ) -> Optional[RemoteWorkerRecord]:
735         if self.workers:
736             x = self.index
737             while True:
738                 worker = self.workers[x]
739                 if worker.count > 0:
740                     worker.count -= 1
741                     x += 1
742                     if x >= len(self.workers):
743                         x = 0
744                     self.index = x
745                     logger.debug('Selected worker %s', worker)
746                     return worker
747                 x += 1
748                 if x >= len(self.workers):
749                     x = 0
750                 if x == self.index:
751                     logger.warning('Unexpectedly could not find a worker, retrying...')
752                     return None
753         return None
754
755
756 class RemoteExecutor(BaseExecutor):
757     """An executor that uses processes on remote machines to do work.
758     To do so, it requires that a pool of remote workers to be properly
759     configured.  See instructions in
760     :class:`pyutils.parallelize.parallelize`.
761
762     Each machine in a worker pool has a *weight* and a *count*.  A
763     *weight* captures the relative speed of a processor on that worker
764     and a *count* captures the number of synchronous tasks the worker
765     can accept (i.e. the number of cpus on the machine).
766
767     To dispatch work to a remote machine, this class pickles the code
768     to be executed remotely using `cloudpickle`.  For that to work,
769     the remote machine should be running the same version of Python as
770     this machine, ideally in a virtual environment with the same
771     import libraries installed.  Differences in operating system
772     and/or processor architecture don't seem to matter for most code,
773     though.
774
775     .. warning::
776
777         Mismatches in Python version or in the version numbers of
778         third-party libraries between machines can cause problems
779         when trying to unpickle and run code remotely.
780
781     Work to be dispatched is represented in this code by creating a
782     "bundle".  Each bundle is assigned to a remote worker based on
783     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
784     general, it attempts to load all workers in the pool and maximize
785     throughput.  Once assigned to a remote worker, pickled code is
786     copied to that worker via `scp` and a remote command is issued via
787     `ssh` to execute a :file:`remote_worker.py` process on the remote
788     machine.  This process unpickles the code, runs it, and produces a
789     result which is then copied back to the local machine (again via
790     `scp`) where it can be processed by local code.
791
792     You can and probably must override the path of
793     :file:`remote_worker.py` on your pool machines using the
794     `--remote_worker_helper_path` commandline argument (or by just
795     changing the default in code, see above in this file's code).
796
797     During remote work execution, this local machine acts as a
798     controller dispatching all work to the network, copying pickled
799     tasks out, and copying results back in.  It may also be a worker
800     in the pool but do not underestimate the cost of being a
801     controller -- it takes some cpu and a lot of network bandwidth.
802     The work dispatcher logic attempts to detect when a controller is
803     also a worker and reduce its load.
804
805     Some redundancy and safety provisions are made when scheduling
806     tasks to the worker pool; e.g. slower than expected tasks have
807     redundant backups tasks created, especially if there are otherwise
808     idle workers.  If a task fails repeatedly, the dispatcher consider
809     it poisoned and give up on it.
810
811     .. warning::
812
813         This executor probably only makes sense to use with
814         computationally expensive tasks such as jobs that will execute
815         for ~30 seconds or longer.
816
817         The network overhead and latency of copying work from the
818         controller (local) machine to the remote workers and copying
819         results back again is relatively high.  Especially at startup,
820         the network can become a bottleneck.  Future versions of this
821         code may attempt to split the responsibility of being a
822         controller (distributing work to pool machines).
823
824     Instructions for how to set this up are provided in
825     :class:`pyutils.parallelize.parallelize`.
826
827     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
828
829     """
830
831     def __init__(
832         self,
833         workers: List[RemoteWorkerRecord],
834         policy: RemoteWorkerSelectionPolicy,
835     ) -> None:
836         """
837         Args:
838             workers: A list of remote workers we can call on to do tasks.
839             policy: A policy for selecting remote workers for tasks.
840         """
841
842         super().__init__()
843         self.workers = workers
844         self.policy = policy
845         self.worker_count = 0
846         for worker in self.workers:
847             self.worker_count += worker.count
848         if self.worker_count <= 0:
849             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
850             logger.critical(msg)
851             raise RemoteExecutorException(msg)
852         self.policy.register_worker_pool(self.workers)
853         self.cv = threading.Condition()
854         logger.debug(
855             'Creating %d local threads, one per remote worker.', self.worker_count
856         )
857         self._helper_executor = fut.ThreadPoolExecutor(
858             thread_name_prefix="remote_executor_helper",
859             max_workers=self.worker_count,
860         )
861         self.status = RemoteExecutorStatus(self.worker_count)
862         self.total_bundles_submitted = 0
863         self.backup_lock = threading.Lock()
864         self.last_backup = None
865         (
866             self.heartbeat_thread,
867             self.heartbeat_stop_event,
868         ) = self._run_periodic_heartbeat()
869         self.already_shutdown = False
870
871     @background_thread
872     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
873         """
874         We create a background thread to invoke :meth:`_heartbeat` regularly
875         while we are scheduling work.  It does some accounting such as
876         looking for slow bundles to tag for backup creation, checking for
877         unexpected failures, and printing a fancy message on stdout.
878         """
879         while not stop_event.is_set():
880             time.sleep(5.0)
881             logger.debug('Running periodic heartbeat code...')
882             self._heartbeat()
883         logger.debug('Periodic heartbeat thread shutting down.')
884
885     def _heartbeat(self) -> None:
886         # Note: this is invoked on a background thread, not an
887         # executor thread.  Be careful what you do with it b/c it
888         # needs to get back and dump status again periodically.
889         with self.status.lock:
890             self.status.periodic_dump(self.total_bundles_submitted)
891
892             # Look for bundles to reschedule via executor.submit
893             if config.config['executors_schedule_remote_backups']:
894                 self._maybe_schedule_backup_bundles()
895
896     def _maybe_schedule_backup_bundles(self):
897         """Maybe schedule backup bundles if we see a very slow bundle."""
898
899         assert self.status.lock.locked()
900         num_done = len(self.status.finished_bundle_timings)
901         num_idle_workers = self.worker_count - self.task_count
902         now = time.time()
903         if (
904             num_done >= 2
905             and num_idle_workers > 0
906             and (self.last_backup is None or (now - self.last_backup > 9.0))
907             and self.backup_lock.acquire(blocking=False)
908         ):
909             try:
910                 assert self.backup_lock.locked()
911
912                 bundle_to_backup = None
913                 best_score = None
914                 for (
915                     worker,
916                     bundle_uuids,
917                 ) in self.status.in_flight_bundles_by_worker.items():
918
919                     # Prefer to schedule backups of bundles running on
920                     # slower machines.
921                     base_score = 0
922                     for record in self.workers:
923                         if worker.machine == record.machine:
924                             base_score = float(record.weight)
925                             base_score = 1.0 / base_score
926                             base_score *= 200.0
927                             base_score = int(base_score)
928                             break
929
930                     for uuid in bundle_uuids:
931                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
932                         if (
933                             bundle is not None
934                             and bundle.src_bundle is None
935                             and bundle.backup_bundles is not None
936                         ):
937                             score = base_score
938
939                             # Schedule backups of bundles running
940                             # longer; especially those that are
941                             # unexpectedly slow.
942                             start_ts = self.status.start_per_bundle[uuid]
943                             if start_ts is not None:
944                                 runtime = now - start_ts
945                                 score += runtime
946                                 logger.debug(
947                                     'score[%s] => %.1f  # latency boost', bundle, score
948                                 )
949
950                                 if bundle.slower_than_local_p95:
951                                     score += runtime / 2
952                                     logger.debug(
953                                         'score[%s] => %.1f  # >worker p95',
954                                         bundle,
955                                         score,
956                                     )
957
958                                 if bundle.slower_than_global_p95:
959                                     score += runtime / 4
960                                     logger.debug(
961                                         'score[%s] => %.1f  # >global p95',
962                                         bundle,
963                                         score,
964                                     )
965
966                             # Prefer backups of bundles that don't
967                             # have backups already.
968                             backup_count = len(bundle.backup_bundles)
969                             if backup_count == 0:
970                                 score *= 2
971                             elif backup_count == 1:
972                                 score /= 2
973                             elif backup_count == 2:
974                                 score /= 8
975                             else:
976                                 score = 0
977                             logger.debug(
978                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
979                                 bundle,
980                                 score,
981                             )
982
983                             if score != 0 and (
984                                 best_score is None or score > best_score
985                             ):
986                                 bundle_to_backup = bundle
987                                 assert bundle is not None
988                                 assert bundle.backup_bundles is not None
989                                 assert bundle.src_bundle is None
990                                 best_score = score
991
992                 # Note: this is all still happening on the heartbeat
993                 # runner thread.  That's ok because
994                 # _schedule_backup_for_bundle uses the executor to
995                 # submit the bundle again which will cause it to be
996                 # picked up by a worker thread and allow this thread
997                 # to return to run future heartbeats.
998                 if bundle_to_backup is not None:
999                     self.last_backup = now
1000                     logger.info(
1001                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1002                         bundle_to_backup,
1003                         best_score,
1004                     )
1005                     self._schedule_backup_for_bundle(bundle_to_backup)
1006             finally:
1007                 self.backup_lock.release()
1008
1009     def _is_worker_available(self) -> bool:
1010         """Is there a worker available currently?"""
1011         return self.policy.is_worker_available()
1012
1013     def _acquire_worker(
1014         self, machine_to_avoid: str = None
1015     ) -> Optional[RemoteWorkerRecord]:
1016         """Try to acquire a worker."""
1017         return self.policy.acquire_worker(machine_to_avoid)
1018
1019     def _find_available_worker_or_block(
1020         self, machine_to_avoid: str = None
1021     ) -> RemoteWorkerRecord:
1022         """Find a worker or block until one becomes available."""
1023         with self.cv:
1024             while not self._is_worker_available():
1025                 self.cv.wait()
1026             worker = self._acquire_worker(machine_to_avoid)
1027             if worker is not None:
1028                 return worker
1029         msg = "We should never reach this point in the code"
1030         logger.critical(msg)
1031         raise Exception(msg)
1032
1033     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1034         """Release a previously acquired worker."""
1035         worker = bundle.worker
1036         assert worker is not None
1037         logger.debug('Released worker %s', worker)
1038         self.status.record_release_worker(
1039             worker,
1040             bundle.uuid,
1041             was_cancelled,
1042         )
1043         with self.cv:
1044             worker.count += 1
1045             self.cv.notify()
1046         self.adjust_task_count(-1)
1047
1048     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1049         """See if a particular bundle is cancelled.  Do not block."""
1050         with self.status.lock:
1051             if bundle.is_cancelled.wait(timeout=0.0):
1052                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1053                 bundle.was_cancelled = True
1054                 return True
1055         return False
1056
1057     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1058         """Find a worker for bundle or block until one is available."""
1059
1060         self.adjust_task_count(+1)
1061         uuid = bundle.uuid
1062         hostname = bundle.hostname
1063         avoid_machine = override_avoid_machine
1064         is_original = bundle.src_bundle is None
1065
1066         # Try not to schedule a backup on the same host as the original.
1067         if avoid_machine is None and bundle.src_bundle is not None:
1068             avoid_machine = bundle.src_bundle.machine
1069         worker = None
1070         while worker is None:
1071             worker = self._find_available_worker_or_block(avoid_machine)
1072         assert worker is not None
1073
1074         # Ok, found a worker.
1075         bundle.worker = worker
1076         machine = bundle.machine = worker.machine
1077         username = bundle.username = worker.username
1078         self.status.record_acquire_worker(worker, uuid)
1079         logger.debug('%s: Running bundle on %s...', bundle, worker)
1080
1081         # Before we do any work, make sure the bundle is still viable.
1082         # It may have been some time between when it was submitted and
1083         # now due to lack of worker availability and someone else may
1084         # have already finished it.
1085         if self._check_if_cancelled(bundle):
1086             try:
1087                 return self._process_work_result(bundle)
1088             except Exception as e:
1089                 logger.warning(
1090                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1091                 )
1092                 self._release_worker(bundle)
1093                 if is_original:
1094                     # Weird.  We are the original owner of this
1095                     # bundle.  For it to have been cancelled, a backup
1096                     # must have already started and completed before
1097                     # we even for started.  Moreover, the backup says
1098                     # it is done but we can't find the results it
1099                     # should have copied over.  Reschedule the whole
1100                     # thing.
1101                     logger.exception(e)
1102                     logger.error(
1103                         '%s: We are the original owner thread and yet there are '
1104                         'no results for this bundle.  This is unexpected and bad.',
1105                         bundle,
1106                     )
1107                     return self._emergency_retry_nasty_bundle(bundle)
1108                 else:
1109                     # We're a backup and our bundle is cancelled
1110                     # before we even got started.  Do nothing and let
1111                     # the original bundle's thread worry about either
1112                     # finding the results or complaining about it.
1113                     return None
1114
1115         # Send input code / data to worker machine if it's not local.
1116         if hostname not in machine:
1117             try:
1118                 cmd = (
1119                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1120                 )
1121                 start_ts = time.time()
1122                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1123                 run_silently(cmd)
1124                 xfer_latency = time.time() - start_ts
1125                 logger.debug(
1126                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1127                 )
1128             except Exception as e:
1129                 self._release_worker(bundle)
1130                 if is_original:
1131                     # Weird.  We tried to copy the code to the worker
1132                     # and it failed...  And we're the original bundle.
1133                     # We have to retry.
1134                     logger.exception(e)
1135                     logger.error(
1136                         "%s: Failed to send instructions to the worker machine?! "
1137                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1138                         "be a race condition.  Attempting an emergency retry...",
1139                         bundle,
1140                     )
1141                     return self._emergency_retry_nasty_bundle(bundle)
1142                 else:
1143                     # This is actually expected; we're a backup.
1144                     # There's a race condition where someone else
1145                     # already finished the work and removed the source
1146                     # code_file before we could copy it.  Ignore.
1147                     logger.warning(
1148                         '%s: Failed to send instructions to the worker machine... '
1149                         'We\'re a backup and this may be caused by the original (or '
1150                         'some other backup) already finishing this work.  Ignoring.',
1151                         bundle,
1152                     )
1153                     return None
1154
1155         # Kick off the work.  Note that if this fails we let
1156         # _wait_for_process deal with it.
1157         self.status.record_processing_began(uuid)
1158         helper_path = config.config['remote_worker_helper_path']
1159         cmd = (
1160             f'{SSH} {bundle.username}@{bundle.machine} '
1161             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1162         )
1163         logger.debug(
1164             '%s: Executing %s in the background to kick off work...', bundle, cmd
1165         )
1166         p = cmd_in_background(cmd, silent=True)
1167         bundle.pid = p.pid
1168         logger.debug(
1169             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1170         )
1171         return self._wait_for_process(p, bundle, 0)
1172
1173     def _wait_for_process(
1174         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1175     ) -> Any:
1176         """At this point we've copied the bundle's pickled code to the remote
1177         worker and started an ssh process that should be invoking the
1178         remote worker to have it execute the user's code.  See how
1179         that's going and wait for it to complete or fail.  Note that
1180         this code is recursive: there are codepaths where we decide to
1181         stop waiting for an ssh process (because another backup seems
1182         to have finished) but then fail to fetch or parse the results
1183         from that backup and thus call ourselves to continue waiting
1184         on an active ssh process.  This is the purpose of the depth
1185         argument: to curtail potential infinite recursion by giving up
1186         eventually.
1187
1188         Args:
1189             p: the Popen record of the ssh job
1190             bundle: the bundle of work being executed remotely
1191             depth: how many retries we've made so far.  Starts at zero.
1192
1193         """
1194
1195         machine = bundle.machine
1196         assert p is not None
1197         pid = p.pid  # pid of the ssh process
1198         if depth > 3:
1199             logger.error(
1200                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1201                 pid,
1202             )
1203             p.terminate()
1204             self._release_worker(bundle)
1205             return self._emergency_retry_nasty_bundle(bundle)
1206
1207         # Spin until either the ssh job we scheduled finishes the
1208         # bundle or some backup worker signals that they finished it
1209         # before we could.
1210         while True:
1211             try:
1212                 p.wait(timeout=0.25)
1213             except subprocess.TimeoutExpired:
1214                 if self._check_if_cancelled(bundle):
1215                     logger.info(
1216                         '%s: looks like another worker finished bundle...', bundle
1217                     )
1218                     break
1219             else:
1220                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1221                 p = None
1222                 break
1223
1224         # If we get here we believe the bundle is done; either the ssh
1225         # subprocess finished (hopefully successfully) or we noticed
1226         # that some other worker seems to have completed the bundle
1227         # before us and we're bailing out.
1228         try:
1229             ret = self._process_work_result(bundle)
1230             if ret is not None and p is not None:
1231                 p.terminate()
1232             return ret
1233
1234         # Something went wrong; e.g. we could not copy the results
1235         # back, cleanup after ourselves on the remote machine, or
1236         # unpickle the results we got from the remove machine.  If we
1237         # still have an active ssh subprocess, keep waiting on it.
1238         # Otherwise, time for an emergency reschedule.
1239         except Exception as e:
1240             logger.exception(e)
1241             logger.error('%s: Something unexpected just happened...', bundle)
1242             if p is not None:
1243                 logger.warning(
1244                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1245                     bundle,
1246                 )
1247                 return self._wait_for_process(p, bundle, depth + 1)
1248             else:
1249                 self._release_worker(bundle)
1250                 return self._emergency_retry_nasty_bundle(bundle)
1251
1252     def _process_work_result(self, bundle: BundleDetails) -> Any:
1253         """A bundle seems to be completed.  Check on the results."""
1254
1255         with self.status.lock:
1256             is_original = bundle.src_bundle is None
1257             was_cancelled = bundle.was_cancelled
1258             username = bundle.username
1259             machine = bundle.machine
1260             result_file = bundle.result_file
1261             code_file = bundle.code_file
1262
1263             # Whether original or backup, if we finished first we must
1264             # fetch the results if the computation happened on a
1265             # remote machine.
1266             bundle.end_ts = time.time()
1267             if not was_cancelled:
1268                 assert bundle.machine is not None
1269                 if bundle.hostname not in bundle.machine:
1270                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1271                     logger.info(
1272                         "%s: Fetching results back from %s@%s via %s",
1273                         bundle,
1274                         username,
1275                         machine,
1276                         cmd,
1277                     )
1278
1279                     # If either of these throw they are handled in
1280                     # _wait_for_process.
1281                     attempts = 0
1282                     while True:
1283                         try:
1284                             run_silently(cmd)
1285                         except Exception as e:
1286                             attempts += 1
1287                             if attempts >= 3:
1288                                 raise e
1289                         else:
1290                             break
1291
1292                     # Cleanup remote /tmp files.
1293                     run_silently(
1294                         f'{SSH} {username}@{machine}'
1295                         f' "/bin/rm -f {code_file} {result_file}"'
1296                     )
1297                     logger.debug(
1298                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1299                     )
1300                 dur = bundle.end_ts - bundle.start_ts
1301                 self.histogram.add_item(dur)
1302
1303         # Only the original worker should unpickle the file contents
1304         # though since it's the only one whose result matters.  The
1305         # original is also the only job that may delete result_file
1306         # from disk.  Note that the original may have been cancelled
1307         # if one of the backups finished first; it still must read the
1308         # result from disk.  It still does that here with is_cancelled
1309         # set.
1310         if is_original:
1311             logger.debug("%s: Unpickling %s.", bundle, result_file)
1312             try:
1313                 with open(result_file, 'rb') as rb:
1314                     serialized = rb.read()
1315                 result = cloudpickle.loads(serialized)
1316             except Exception as e:
1317                 logger.exception(e)
1318                 logger.error('Failed to load %s... this is bad news.', result_file)
1319                 self._release_worker(bundle)
1320
1321                 # Re-raise the exception; the code in _wait_for_process may
1322                 # decide to _emergency_retry_nasty_bundle here.
1323                 raise e
1324             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1325             os.remove(result_file)
1326             os.remove(code_file)
1327
1328             # Notify any backups that the original is done so they
1329             # should stop ASAP.  Do this whether or not we
1330             # finished first since there could be more than one
1331             # backup.
1332             if bundle.backup_bundles is not None:
1333                 for backup in bundle.backup_bundles:
1334                     logger.debug(
1335                         '%s: Notifying backup %s that it\'s cancelled',
1336                         bundle,
1337                         backup.uuid,
1338                     )
1339                     backup.is_cancelled.set()
1340
1341         # This is a backup job and, by now, we have already fetched
1342         # the bundle results.
1343         else:
1344             # Backup results don't matter, they just need to leave the
1345             # result file in the right place for their originals to
1346             # read/unpickle later.
1347             result = None
1348
1349             # Tell the original to stop if we finished first.
1350             if not was_cancelled:
1351                 orig_bundle = bundle.src_bundle
1352                 assert orig_bundle is not None
1353                 logger.debug(
1354                     '%s: Notifying original %s we beat them to it.',
1355                     bundle,
1356                     orig_bundle.uuid,
1357                 )
1358                 orig_bundle.is_cancelled.set()
1359         self._release_worker(bundle, was_cancelled=was_cancelled)
1360         return result
1361
1362     def _create_original_bundle(self, pickle, function_name: str):
1363         """Creates a bundle that is not a backup of any other bundle but
1364         rather represents a user task.
1365         """
1366
1367         uuid = string_utils.generate_uuid(omit_dashes=True)
1368         code_file = f'/tmp/{uuid}.code.bin'
1369         result_file = f'/tmp/{uuid}.result.bin'
1370
1371         logger.debug('Writing pickled code to %s', code_file)
1372         with open(code_file, 'wb') as wb:
1373             wb.write(pickle)
1374
1375         bundle = BundleDetails(
1376             pickled_code=pickle,
1377             uuid=uuid,
1378             function_name=function_name,
1379             worker=None,
1380             username=None,
1381             machine=None,
1382             hostname=platform.node(),
1383             code_file=code_file,
1384             result_file=result_file,
1385             pid=0,
1386             start_ts=time.time(),
1387             end_ts=0.0,
1388             slower_than_local_p95=False,
1389             slower_than_global_p95=False,
1390             src_bundle=None,
1391             is_cancelled=threading.Event(),
1392             was_cancelled=False,
1393             backup_bundles=[],
1394             failure_count=0,
1395         )
1396         self.status.record_bundle_details(bundle)
1397         logger.debug('%s: Created an original bundle', bundle)
1398         return bundle
1399
1400     def _create_backup_bundle(self, src_bundle: BundleDetails):
1401         """Creates a bundle that is a backup of another bundle that is
1402         running too slowly."""
1403
1404         assert self.status.lock.locked()
1405         assert src_bundle.backup_bundles is not None
1406         n = len(src_bundle.backup_bundles)
1407         uuid = src_bundle.uuid + f'_backup#{n}'
1408
1409         backup_bundle = BundleDetails(
1410             pickled_code=src_bundle.pickled_code,
1411             uuid=uuid,
1412             function_name=src_bundle.function_name,
1413             worker=None,
1414             username=None,
1415             machine=None,
1416             hostname=src_bundle.hostname,
1417             code_file=src_bundle.code_file,
1418             result_file=src_bundle.result_file,
1419             pid=0,
1420             start_ts=time.time(),
1421             end_ts=0.0,
1422             slower_than_local_p95=False,
1423             slower_than_global_p95=False,
1424             src_bundle=src_bundle,
1425             is_cancelled=threading.Event(),
1426             was_cancelled=False,
1427             backup_bundles=None,  # backup backups not allowed
1428             failure_count=0,
1429         )
1430         src_bundle.backup_bundles.append(backup_bundle)
1431         self.status.record_bundle_details_already_locked(backup_bundle)
1432         logger.debug('%s: Created a backup bundle', backup_bundle)
1433         return backup_bundle
1434
1435     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1436         """Schedule a backup of src_bundle."""
1437
1438         assert self.status.lock.locked()
1439         assert src_bundle is not None
1440         backup_bundle = self._create_backup_bundle(src_bundle)
1441         logger.debug(
1442             '%s/%s: Scheduling backup for execution...',
1443             backup_bundle.uuid,
1444             backup_bundle.function_name,
1445         )
1446         self._helper_executor.submit(self._launch, backup_bundle)
1447
1448         # Results from backups don't matter; if they finish first
1449         # they will move the result_file to this machine and let
1450         # the original pick them up and unpickle them (and return
1451         # a result).
1452
1453     def _emergency_retry_nasty_bundle(
1454         self, bundle: BundleDetails
1455     ) -> Optional[fut.Future]:
1456         """Something unexpectedly failed with bundle.  Either retry it
1457         from the beginning or throw in the towel and give up on it."""
1458
1459         is_original = bundle.src_bundle is None
1460         bundle.worker = None
1461         avoid_last_machine = bundle.machine
1462         bundle.machine = None
1463         bundle.username = None
1464         bundle.failure_count += 1
1465         if is_original:
1466             retry_limit = 3
1467         else:
1468             retry_limit = 2
1469
1470         if bundle.failure_count > retry_limit:
1471             logger.error(
1472                 '%s: Tried this bundle too many times already (%dx); giving up.',
1473                 bundle,
1474                 retry_limit,
1475             )
1476             if is_original:
1477                 raise RemoteExecutorException(
1478                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1479                 )
1480             else:
1481                 logger.error(
1482                     '%s: At least it\'s only a backup; better luck with the others.',
1483                     bundle,
1484                 )
1485             return None
1486         else:
1487             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1488             logger.warning(msg)
1489             warnings.warn(msg)
1490             return self._launch(bundle, avoid_last_machine)
1491
1492     @overrides
1493     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1494         """Submit work to be done.  This is the user entry point of this
1495         class."""
1496         if self.already_shutdown:
1497             raise Exception('Submitted work after shutdown.')
1498         pickle = _make_cloud_pickle(function, *args, **kwargs)
1499         bundle = self._create_original_bundle(pickle, function.__name__)
1500         self.total_bundles_submitted += 1
1501         return self._helper_executor.submit(self._launch, bundle)
1502
1503     @overrides
1504     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1505         """Shutdown the executor."""
1506         if not self.already_shutdown:
1507             logging.debug('Shutting down RemoteExecutor %s', self.title)
1508             self.heartbeat_stop_event.set()
1509             self.heartbeat_thread.join()
1510             self._helper_executor.shutdown(wait)
1511             if not quiet:
1512                 print(self.histogram.__repr__(label_formatter='%ds'))
1513             self.already_shutdown = True
1514
1515
1516 class RemoteWorkerPoolProvider:
1517     @abstractmethod
1518     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1519         pass
1520
1521
1522 @persistent.persistent_autoloaded_singleton()  # type: ignore
1523 class ConfigRemoteWorkerPoolProvider(
1524     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1525 ):
1526     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1527         self.remote_worker_pool = []
1528         for record in json_remote_worker_pool['remote_worker_records']:
1529             self.remote_worker_pool.append(
1530                 self.dataclassFromDict(RemoteWorkerRecord, record)
1531             )
1532         assert len(self.remote_worker_pool) > 0
1533
1534     @staticmethod
1535     def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1536         fieldSet = {f.name for f in fields(clsName) if f.init}
1537         filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1538         return clsName(**filteredArgDict)
1539
1540     @overrides
1541     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1542         return self.remote_worker_pool
1543
1544     @overrides
1545     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1546         return self.remote_worker_pool
1547
1548     @staticmethod
1549     @overrides
1550     def get_filename() -> str:
1551         return config.config['remote_worker_records_file']
1552
1553     @staticmethod
1554     @overrides
1555     def should_we_load_data(filename: str) -> bool:
1556         return True
1557
1558     @staticmethod
1559     @overrides
1560     def should_we_save_data(filename: str) -> bool:
1561         return False
1562
1563
1564 @singleton
1565 class DefaultExecutors(object):
1566     """A container for a default thread, process and remote executor.
1567     These are not created until needed and we take care to clean up
1568     before process exit automatically for the caller's convenience.
1569     Instead of creating your own executor, consider using the one
1570     from this pool.  e.g.::
1571
1572         @par.parallelize(method=par.Method.PROCESS)
1573         def do_work(
1574             solutions: List[Work],
1575             shard_num: int,
1576             ...
1577         ):
1578             <do the work>
1579
1580
1581         def start_do_work(all_work: List[Work]):
1582             shards = []
1583             logger.debug('Sharding work into groups of 10.')
1584             for subset in list_utils.shard(all_work, 10):
1585                 shards.append([x for x in subset])
1586
1587             logger.debug('Kicking off helper pool.')
1588             try:
1589                 for n, shard in enumerate(shards):
1590                     results.append(
1591                         do_work(
1592                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1593                         )
1594                     )
1595                 smart_future.wait_all(results)
1596             finally:
1597                 # Note: if you forget to do this it will clean itself up
1598                 # during program termination including tearing down any
1599                 # active ssh connections.
1600                 executors.DefaultExecutors().process_pool().shutdown()
1601     """
1602
1603     def __init__(self):
1604         self.thread_executor: Optional[ThreadExecutor] = None
1605         self.process_executor: Optional[ProcessExecutor] = None
1606         self.remote_executor: Optional[RemoteExecutor] = None
1607
1608     @staticmethod
1609     def _ping(host) -> bool:
1610         logger.debug('RUN> ping -c 1 %s', host)
1611         try:
1612             x = cmd_exitcode(
1613                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1614             )
1615             return x == 0
1616         except Exception:
1617             return False
1618
1619     def thread_pool(self) -> ThreadExecutor:
1620         if self.thread_executor is None:
1621             self.thread_executor = ThreadExecutor()
1622         return self.thread_executor
1623
1624     def process_pool(self) -> ProcessExecutor:
1625         if self.process_executor is None:
1626             self.process_executor = ProcessExecutor()
1627         return self.process_executor
1628
1629     def remote_pool(self) -> RemoteExecutor:
1630         if self.remote_executor is None:
1631             logger.info('Looking for some helper machines...')
1632             provider = ConfigRemoteWorkerPoolProvider()
1633             all_machines = provider.get_remote_workers()
1634             pool = []
1635
1636             # Make sure we can ping each machine.
1637             for record in all_machines:
1638                 if self._ping(record.machine):
1639                     logger.info('%s is alive / responding to pings', record.machine)
1640                     pool.append(record)
1641
1642             # The controller machine has a lot to do; go easy on it.
1643             for record in pool:
1644                 if record.machine == platform.node() and record.count > 1:
1645                     logger.info('Reducing workload for %s.', record.machine)
1646                     record.count = max(int(record.count / 2), 1)
1647
1648             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1649             policy.register_worker_pool(pool)
1650             self.remote_executor = RemoteExecutor(pool, policy)
1651         return self.remote_executor
1652
1653     def shutdown(self) -> None:
1654         if self.thread_executor is not None:
1655             self.thread_executor.shutdown(wait=True, quiet=True)
1656             self.thread_executor = None
1657         if self.process_executor is not None:
1658             self.process_executor.shutdown(wait=True, quiet=True)
1659             self.process_executor = None
1660         if self.remote_executor is not None:
1661             self.remote_executor.shutdown(wait=True, quiet=True)
1662             self.remote_executor = None