Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
5
6 # © Copyright 2021-2023, Scott Gasch
7
8 """
9 This module defines a :class:`BaseExecutor` interface and three
10 implementations:
11
12     - :class:`ThreadExecutor`
13     - :class:`ProcessExecutor`
14     - :class:`RemoteExecutor`
15
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work.  Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code.  But it's good for
20 work that is mostly I/O bound.
21
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
24 workloads.
25
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network.  You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
31
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed.  It has the added benefit of being automatically
35 cleaned up at process termination time.
36
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
39 """
40
41 from __future__ import annotations
42
43 import concurrent.futures as fut
44 import logging
45 import os
46 import platform
47 import random
48 import subprocess
49 import threading
50 import time
51 import warnings
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
56
57 import cloudpickle  # type: ignore
58 from overrides import overrides
59
60 import pyutils.typez.histogram as hist
61 from pyutils import argparse_utils, config, dataclass_utils, math_utils, string_utils
62 from pyutils.ansi import bg, fg, reset, underline
63 from pyutils.decorator_utils import singleton
64 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
65 from pyutils.parallelize.thread_utils import background_thread
66 from pyutils.typez import persistent, type_utils
67
68 logger = logging.getLogger(__name__)
69
70 parser = config.add_commandline_args(
71     f"Executors ({__file__})", "Args related to processing executors."
72 )
73 parser.add_argument(
74     '--executors_threadpool_size',
75     type=int,
76     metavar='#THREADS',
77     help='Number of threads in the default threadpool, leave unset for default',
78     default=None,
79 )
80 parser.add_argument(
81     '--executors_processpool_size',
82     type=int,
83     metavar='#PROCESSES',
84     help='Number of processes in the default processpool, leave unset for default',
85     default=None,
86 )
87 parser.add_argument(
88     '--executors_schedule_remote_backups',
89     default=True,
90     action=argparse_utils.ActionNoYes,
91     help='Should we schedule duplicative backup work if a remote bundle is slow',
92 )
93 parser.add_argument(
94     '--executors_max_bundle_failures',
95     type=int,
96     default=3,
97     metavar='#FAILURES',
98     help='Maximum number of failures before giving up on a bundle',
99 )
100 parser.add_argument(
101     '--remote_worker_records_file',
102     type=str,
103     metavar='FILENAME',
104     help='Path of the remote worker records file (JSON)',
105     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
106 )
107 parser.add_argument(
108     '--remote_worker_helper_path',
109     type=str,
110     metavar='PATH_TO_REMOTE_WORKER_PY',
111     help='Path to remote_worker.py on remote machines',
112     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
113 )
114
115
116 SSH = '/usr/bin/ssh -oForwardX11=no'
117 SCP = '/usr/bin/scp -C'
118
119
120 def _make_cloud_pickle(fun, *args, **kwargs):
121     """Internal helper to create cloud pickles."""
122     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
123     return cloudpickle.dumps((fun, args, kwargs))
124
125
126 class BaseExecutor(ABC):
127     """The base executor interface definition.  The interface for
128     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
129     :class:`ThreadExecutor`.
130     """
131
132     def __init__(self, *, title=''):
133         """
134         Args:
135             title: the name of this executor.
136         """
137         self.title = title
138         self.histogram = hist.SimpleHistogram(
139             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
140         )
141         self.task_count = 0
142
143     @abstractmethod
144     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
145         """Submit work for the executor to do.
146
147         Args:
148             function: the Callable to be executed.
149             *args: the arguments to function
150             **kwargs: the arguments to function
151
152         Returns:
153             A concurrent :class:`Future` representing the result of the
154             work.
155         """
156         pass
157
158     @abstractmethod
159     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
160         """Shutdown the executor.
161
162         Args:
163             wait: wait for the shutdown to complete before returning?
164             quiet: keep it quiet, please.
165         """
166         pass
167
168     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
169         """Shutdown the executor and return True if the executor is idle
170         (i.e. there are no pending or active tasks).  Return False
171         otherwise.  Note: this should only be called by the launcher
172         process.
173
174         Args:
175             quiet: keep it quiet, please.
176
177         Returns:
178             True if the executor could be shut down because it has no
179             pending work, False otherwise.
180         """
181         if self.task_count == 0:
182             self.shutdown(wait=True, quiet=quiet)
183             return True
184         return False
185
186     def adjust_task_count(self, delta: int) -> None:
187         """Change the task count.  Note: do not call this method from a
188         worker, it should only be called by the launcher process /
189         thread / machine.
190
191         Args:
192             delta: the delta value by which to adjust task count.
193         """
194         self.task_count += delta
195         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
196
197     def get_task_count(self) -> int:
198         """Change the task count.  Note: do not call this method from a
199         worker, it should only be called by the launcher process /
200         thread / machine.
201
202         Returns:
203             The executor's current task count.
204         """
205         return self.task_count
206
207
208 class ThreadExecutor(BaseExecutor):
209     """A threadpool executor.  This executor uses Python threads to
210     schedule tasks.  Note that, at least as of python3.10, because of
211     the global lock in the interpreter itself, these do not
212     parallelize very well so this class is useful mostly for non-CPU
213     intensive tasks.
214
215     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
216     """
217
218     def __init__(self, max_workers: Optional[int] = None):
219         """
220         Args:
221             max_workers: maximum number of threads to create in the pool.
222         """
223         super().__init__()
224         workers = None
225         if max_workers is not None:
226             workers = max_workers
227         elif 'executors_threadpool_size' in config.config:
228             workers = config.config['executors_threadpool_size']
229         if workers is not None:
230             logger.debug('Creating threadpool executor with %d workers', workers)
231         else:
232             logger.debug('Creating a default sized threadpool executor')
233         self._thread_pool_executor = fut.ThreadPoolExecutor(
234             max_workers=workers, thread_name_prefix="thread_executor_helper"
235         )
236         self.already_shutdown = False
237
238     # This is run on a different thread; do not adjust task count here.
239     @staticmethod
240     def _run_local_bundle(fun, *args, **kwargs):
241         logger.debug("Running local bundle at %s", fun.__name__)
242         result = fun(*args, **kwargs)
243         return result
244
245     @overrides
246     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
247         """
248         Raises:
249             Exception: executor is shutting down already.
250         """
251         if self.already_shutdown:
252             raise Exception('Submitted work after shutdown.')
253         self.adjust_task_count(+1)
254         newargs = []
255         newargs.append(function)
256         for arg in args:
257             newargs.append(arg)
258         start = time.time()
259         result = self._thread_pool_executor.submit(
260             ThreadExecutor._run_local_bundle, *newargs, **kwargs
261         )
262         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
263         result.add_done_callback(lambda _: self.adjust_task_count(-1))
264         return result
265
266     @overrides
267     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
268         if not self.already_shutdown:
269             logger.debug('Shutting down threadpool executor %s', self.title)
270             self._thread_pool_executor.shutdown(wait)
271             if not quiet:
272                 print(self.histogram.__repr__(label_formatter='%ds'))
273             self.already_shutdown = True
274
275
276 class ProcessExecutor(BaseExecutor):
277     """An executor which runs tasks in child processes.
278
279     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
280     """
281
282     def __init__(self, max_workers=None):
283         """
284         Args:
285             max_workers: the max number of worker processes to create.
286         """
287         super().__init__()
288         workers = None
289         if max_workers is not None:
290             workers = max_workers
291         elif 'executors_processpool_size' in config.config:
292             workers = config.config['executors_processpool_size']
293         if workers is not None:
294             logger.debug('Creating processpool executor with %d workers.', workers)
295         else:
296             logger.debug('Creating a default sized processpool executor')
297         self._process_executor = fut.ProcessPoolExecutor(
298             max_workers=workers,
299         )
300         self.already_shutdown = False
301
302     # This is run in another process; do not adjust task count here.
303     @staticmethod
304     def _run_cloud_pickle(pickle):
305         fun, args, kwargs = cloudpickle.loads(pickle)
306         logger.debug("Running pickled bundle at %s", fun.__name__)
307         result = fun(*args, **kwargs)
308         return result
309
310     @overrides
311     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
312         """
313         Raises:
314             Exception: executor is shutting down already.
315         """
316         if self.already_shutdown:
317             raise Exception('Submitted work after shutdown.')
318         start = time.time()
319         self.adjust_task_count(+1)
320         pickle = _make_cloud_pickle(function, *args, **kwargs)
321         result = self._process_executor.submit(
322             ProcessExecutor._run_cloud_pickle, pickle
323         )
324         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
325         result.add_done_callback(lambda _: self.adjust_task_count(-1))
326         return result
327
328     @overrides
329     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
330         if not self.already_shutdown:
331             logger.debug('Shutting down processpool executor %s', self.title)
332             self._process_executor.shutdown(wait)
333             if not quiet:
334                 print(self.histogram.__repr__(label_formatter='%ds'))
335             self.already_shutdown = True
336
337     def __getstate__(self):
338         state = self.__dict__.copy()
339         state['_process_executor'] = None
340         return state
341
342
343 class RemoteExecutorException(Exception):
344     """Thrown when a bundle cannot be executed despite several retries."""
345
346     pass
347
348
349 @dataclass
350 class RemoteWorkerRecord:
351     """A record of info about a remote worker."""
352
353     username: str
354     """Username we can ssh into on this machine to run work."""
355
356     machine: str
357     """Machine address / name."""
358
359     weight: int
360     """Relative probability for the weighted policy to select this
361     machine for scheduling work."""
362
363     count: int
364     """If this machine is selected, what is the maximum number of task
365     that it can handle?"""
366
367     def __hash__(self):
368         return hash((self.username, self.machine))
369
370     def __repr__(self):
371         return f'{self.username}@{self.machine}'
372
373
374 @dataclass
375 class BundleDetails:
376     """All info necessary to define some unit of work that needs to be
377     done, where it is being run, its state, whether it is an original
378     bundle of a backup bundle, how many times it has failed, etc...
379     """
380
381     pickled_code: bytes
382     """The code to run, cloud pickled"""
383
384     uuid: str
385     """A unique identifier"""
386
387     function_name: str
388     """The name of the function we pickled"""
389
390     worker: Optional[RemoteWorkerRecord]
391     """The remote worker running this bundle or None if none (yet)"""
392
393     username: Optional[str]
394     """The remote username running this bundle or None if none (yet)"""
395
396     machine: Optional[str]
397     """The remote machine running this bundle or None if none (yet)"""
398
399     controller: str
400     """The controller machine"""
401
402     code_file: str
403     """A unique filename to hold the work to be done"""
404
405     result_file: str
406     """Where the results should be placed / read from"""
407
408     pid: int
409     """The process id of the local subprocess watching the ssh connection
410     to the remote machine"""
411
412     start_ts: float
413     """Starting time"""
414
415     end_ts: float
416     """Ending time"""
417
418     slower_than_local_p95: bool
419     """Currently slower then 95% of other bundles on remote host"""
420
421     slower_than_global_p95: bool
422     """Currently slower than 95% of other bundles globally"""
423
424     src_bundle: Optional[BundleDetails]
425     """If this is a backup bundle, this points to the original bundle
426     that it's backing up.  None otherwise."""
427
428     is_cancelled: threading.Event
429     """An event that can be signaled to indicate this bundle is cancelled.
430     This is set when another copy (backup or original) of this work has
431     completed successfully elsewhere."""
432
433     was_cancelled: bool
434     """True if this bundle was cancelled, False if it finished normally"""
435
436     backup_bundles: Optional[List[BundleDetails]]
437     """If we've created backups of this bundle, this is the list of them"""
438
439     failure_count: int
440     """How many times has this bundle failed already?"""
441
442     def __repr__(self):
443         uuid = self.uuid
444         if uuid[-9:-2] == '_backup':
445             uuid = uuid[:-9]
446             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
447         else:
448             suffix = uuid[-6:]
449
450         # We colorize the uuid based on some bits from it to make them
451         # stand out in the logging and help a reader correlate log messages
452         # related to the same bundle.
453         colorz = [
454             fg('violet red'),
455             fg('red'),
456             fg('orange'),
457             fg('peach orange'),
458             fg('yellow'),
459             fg('marigold yellow'),
460             fg('green yellow'),
461             fg('tea green'),
462             fg('cornflower blue'),
463             fg('turquoise blue'),
464             fg('tropical blue'),
465             fg('lavender purple'),
466             fg('medium purple'),
467         ]
468         c = colorz[int(uuid[-2:], 16) % len(colorz)]
469         function_name = (
470             self.function_name if self.function_name is not None else 'nofname'
471         )
472         machine = self.machine if self.machine is not None else 'nomachine'
473         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
474
475
476 class RemoteExecutorStatus:
477     """A status 'scoreboard' for a remote executor tracking various
478     metrics and able to render a periodic dump of global state.
479     """
480
481     def __init__(self, total_worker_count: int) -> None:
482         """
483         Args:
484             total_worker_count: number of workers in the pool
485         """
486         self.worker_count: int = total_worker_count
487         self.known_workers: Set[RemoteWorkerRecord] = set()
488         self.start_time: float = time.time()
489         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
490         self.end_per_bundle: Dict[str, float] = defaultdict(float)
491         self.finished_bundle_timings_per_worker: Dict[
492             RemoteWorkerRecord, math_utils.NumericPopulation
493         ] = {}
494         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
495         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
496         self.finished_bundle_timings: math_utils.NumericPopulation = (
497             math_utils.NumericPopulation()
498         )
499         self.last_periodic_dump: Optional[float] = None
500         self.total_bundles_submitted: int = 0
501
502         # Protects reads and modification using self.  Also used
503         # as a memory fence for modifications to bundle.
504         self.lock: threading.Lock = threading.Lock()
505
506     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
507         """Record that bundle with uuid is assigned to a particular worker.
508
509         Args:
510             worker: the record of the worker to which uuid is assigned
511             uuid: the uuid of a bundle that has been assigned to a worker
512         """
513         with self.lock:
514             self.record_acquire_worker_already_locked(worker, uuid)
515
516     def record_acquire_worker_already_locked(
517         self, worker: RemoteWorkerRecord, uuid: str
518     ) -> None:
519         """Same as above but an entry point that doesn't acquire the lock
520         for codepaths where it's already held."""
521         assert self.lock.locked()
522         self.known_workers.add(worker)
523         self.start_per_bundle[uuid] = None
524         x = self.in_flight_bundles_by_worker.get(worker, set())
525         x.add(uuid)
526         self.in_flight_bundles_by_worker[worker] = x
527
528     def record_bundle_details(self, details: BundleDetails) -> None:
529         """Register the details about a bundle of work."""
530         with self.lock:
531             self.record_bundle_details_already_locked(details)
532
533     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
534         """Same as above but for codepaths that already hold the lock."""
535         assert self.lock.locked()
536         self.bundle_details_by_uuid[details.uuid] = details
537
538     def record_release_worker(
539         self,
540         worker: RemoteWorkerRecord,
541         uuid: str,
542         was_cancelled: bool,
543     ) -> None:
544         """Record that a bundle has released a worker."""
545         with self.lock:
546             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
547
548     def record_release_worker_already_locked(
549         self,
550         worker: RemoteWorkerRecord,
551         uuid: str,
552         was_cancelled: bool,
553     ) -> None:
554         """Same as above but for codepaths that already hold the lock."""
555         assert self.lock.locked()
556         ts = time.time()
557         self.end_per_bundle[uuid] = ts
558         self.in_flight_bundles_by_worker[worker].remove(uuid)
559         if not was_cancelled:
560             start = self.start_per_bundle[uuid]
561             assert start is not None
562             bundle_latency = ts - start
563             x = self.finished_bundle_timings_per_worker.get(
564                 worker, math_utils.NumericPopulation()
565             )
566             x.add_number(bundle_latency)
567             self.finished_bundle_timings_per_worker[worker] = x
568             self.finished_bundle_timings.add_number(bundle_latency)
569
570     def record_processing_began(self, uuid: str):
571         """Record when work on a bundle begins."""
572         with self.lock:
573             self.start_per_bundle[uuid] = time.time()
574
575     def total_in_flight(self) -> int:
576         """How many bundles are in flight currently?"""
577         assert self.lock.locked()
578         total_in_flight = 0
579         for worker in self.known_workers:
580             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
581         return total_in_flight
582
583     def total_idle(self) -> int:
584         """How many idle workers are there currently?"""
585         assert self.lock.locked()
586         return self.worker_count - self.total_in_flight()
587
588     def __repr__(self):
589         assert self.lock.locked()
590         ts = time.time()
591         total_finished = len(self.finished_bundle_timings)
592         total_in_flight = self.total_in_flight()
593         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
594         qall_median = None
595         qall_p95 = None
596         if len(self.finished_bundle_timings) > 1:
597             qall_median = self.finished_bundle_timings.get_median()
598             qall_p95 = self.finished_bundle_timings.get_percentile(95)
599             ret += (
600                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
601                 f'✅={total_finished}/{self.total_bundles_submitted}, '
602                 f'💻n={total_in_flight}/{self.worker_count}\n'
603             )
604         else:
605             ret += (
606                 f'⏱={ts-self.start_time:.1f}s, '
607                 f'✅={total_finished}/{self.total_bundles_submitted}, '
608                 f'💻n={total_in_flight}/{self.worker_count}\n'
609             )
610
611         for worker in self.known_workers:
612             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
613             timings = self.finished_bundle_timings_per_worker.get(
614                 worker, math_utils.NumericPopulation()
615             )
616             count = len(timings)
617             qworker_median = None
618             qworker_p95 = None
619             if count > 1:
620                 qworker_median = timings.get_median()
621                 qworker_p95 = timings.get_percentile(95)
622                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
623             else:
624                 ret += '\n'
625             if count > 0:
626                 ret += f'    ...finished {count} total bundle(s) so far\n'
627             in_flight = len(self.in_flight_bundles_by_worker[worker])
628             if in_flight > 0:
629                 ret += f'    ...{in_flight} bundles currently in flight:\n'
630                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
631                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
632                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
633                     if self.start_per_bundle[bundle_uuid] is not None:
634                         sec = ts - self.start_per_bundle[bundle_uuid]
635                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
636                     else:
637                         ret += f'       {details} setting up / copying data...'
638                         sec = 0.0
639
640                     if qworker_p95 is not None:
641                         if sec > qworker_p95:
642                             ret += f'{bg("red")}>💻p95{reset()} '
643                             if details is not None:
644                                 details.slower_than_local_p95 = True
645                         else:
646                             if details is not None:
647                                 details.slower_than_local_p95 = False
648
649                     if qall_p95 is not None:
650                         if sec > qall_p95:
651                             ret += f'{bg("red")}>∀p95{reset()} '
652                             if details is not None:
653                                 details.slower_than_global_p95 = True
654                         else:
655                             details.slower_than_global_p95 = False
656                     ret += '\n'
657         return ret
658
659     def periodic_dump(self, total_bundles_submitted: int) -> None:
660         assert self.lock.locked()
661         self.total_bundles_submitted = total_bundles_submitted
662         ts = time.time()
663         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
664             print(self)
665             self.last_periodic_dump = ts
666
667
668 class RemoteWorkerSelectionPolicy(ABC):
669     """An interface definition of a policy for selecting a remote worker."""
670
671     def __init__(self):
672         self.workers: Optional[List[RemoteWorkerRecord]] = None
673
674     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
675         self.workers = workers
676
677     @abstractmethod
678     def is_worker_available(self) -> bool:
679         pass
680
681     @abstractmethod
682     def acquire_worker(
683         self, machine_to_avoid: str = None
684     ) -> Optional[RemoteWorkerRecord]:
685         pass
686
687
688 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
689     """A remote worker selector that uses weighted RNG."""
690
691     @overrides
692     def is_worker_available(self) -> bool:
693         if self.workers:
694             for worker in self.workers:
695                 if worker.count > 0:
696                     return True
697         return False
698
699     @overrides
700     def acquire_worker(
701         self, machine_to_avoid: str = None
702     ) -> Optional[RemoteWorkerRecord]:
703         grabbag = []
704         if self.workers:
705             for worker in self.workers:
706                 if worker.machine != machine_to_avoid:
707                     if worker.count > 0:
708                         for _ in range(worker.count * worker.weight):
709                             grabbag.append(worker)
710
711         if len(grabbag) == 0:
712             logger.debug(
713                 'There are no available workers that avoid %s', machine_to_avoid
714             )
715             if self.workers:
716                 for worker in self.workers:
717                     if worker.count > 0:
718                         for _ in range(worker.count * worker.weight):
719                             grabbag.append(worker)
720
721         if len(grabbag) == 0:
722             logger.warning('There are no available workers?!')
723             return None
724
725         worker = random.sample(grabbag, 1)[0]
726         assert worker.count > 0
727         worker.count -= 1
728         logger.debug('Selected worker %s', worker)
729         return worker
730
731
732 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
733     """A remote worker selector that just round robins."""
734
735     def __init__(self) -> None:
736         super().__init__()
737         self.index = 0
738
739     @overrides
740     def is_worker_available(self) -> bool:
741         if self.workers:
742             for worker in self.workers:
743                 if worker.count > 0:
744                     return True
745         return False
746
747     def _increment_index(self, index: int) -> None:
748         if self.workers:
749             index += 1
750             if index >= len(self.workers):
751                 index = 0
752             self.index = index
753
754     @overrides
755     def acquire_worker(
756         self, machine_to_avoid: str = None
757     ) -> Optional[RemoteWorkerRecord]:
758         if self.workers:
759             x = self.index
760             while True:
761                 worker = self.workers[x]
762                 if worker.machine != machine_to_avoid and worker.count > 0:
763                     worker.count -= 1
764                     self._increment_index(x)
765                     logger.debug('Selected worker %s', worker)
766                     return worker
767                 x += 1
768                 if x >= len(self.workers):
769                     x = 0
770                 if x == self.index:
771                     logger.warning('Unexpectedly could not find a worker, retrying...')
772                     return None
773         return None
774
775
776 class RemoteExecutor(BaseExecutor):
777     """An executor that uses processes on remote machines to do work.
778     To do so, it requires that a pool of remote workers to be properly
779     configured.  See instructions in
780     :class:`pyutils.parallelize.parallelize`.
781
782     Each machine in a worker pool has a *weight* and a *count*.  A
783     *weight* captures the relative speed of a processor on that worker
784     and a *count* captures the number of synchronous tasks the worker
785     can accept (i.e. the number of cpus on the machine).
786
787     To dispatch work to a remote machine, this class pickles the code
788     to be executed remotely using `cloudpickle`.  For that to work,
789     the remote machine should be running the same version of Python as
790     this machine, ideally in a virtual environment with the same
791     import libraries installed.  Differences in operating system
792     and/or processor architecture don't seem to matter for most code,
793     though.
794
795     .. warning::
796
797         Mismatches in Python version or in the version numbers of
798         third-party libraries between machines can cause problems
799         when trying to unpickle and run code remotely.
800
801     Work to be dispatched is represented in this code by creating a
802     "bundle".  Each bundle is assigned to a remote worker based on
803     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
804     general, it attempts to load all workers in the pool and maximize
805     throughput.  Once assigned to a remote worker, pickled code is
806     copied to that worker via `scp` and a remote command is issued via
807     `ssh` to execute a :file:`remote_worker.py` process on the remote
808     machine.  This process unpickles the code, runs it, and produces a
809     result which is then copied back to the local machine (again via
810     `scp`) where it can be processed by local code.
811
812     You can and probably must override the path of
813     :file:`remote_worker.py` on your pool machines using the
814     `--remote_worker_helper_path` commandline argument (or by just
815     changing the default in code, see above in this file's code).
816
817     During remote work execution, this local machine acts as a
818     controller dispatching all work to the network, copying pickled
819     tasks out, and copying results back in.  It may also be a worker
820     in the pool but do not underestimate the cost of being a
821     controller -- it takes some cpu and a lot of network bandwidth.
822     The work dispatcher logic attempts to detect when a controller is
823     also a worker and reduce its load.
824
825     Some redundancy and safety provisions are made when scheduling
826     tasks to the worker pool; e.g. slower than expected tasks have
827     redundant backups tasks created, especially if there are otherwise
828     idle workers.  If a task fails repeatedly, the dispatcher consider
829     it poisoned and give up on it.
830
831     .. warning::
832
833         This executor probably only makes sense to use with
834         computationally expensive tasks such as jobs that will execute
835         for ~30 seconds or longer.
836
837         The network overhead and latency of copying work from the
838         controller (local) machine to the remote workers and copying
839         results back again is relatively high.  Especially at startup,
840         the network can become a bottleneck.  Future versions of this
841         code may attempt to split the responsibility of being a
842         controller (distributing work to pool machines).
843
844     Instructions for how to set this up are provided in
845     :class:`pyutils.parallelize.parallelize`.
846
847     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
848
849     """
850
851     def __init__(
852         self,
853         workers: List[RemoteWorkerRecord],
854         policy: RemoteWorkerSelectionPolicy,
855     ) -> None:
856         """
857         Args:
858             workers: A list of remote workers we can call on to do tasks.
859             policy: A policy for selecting remote workers for tasks.
860
861         Raises:
862             RemoteExecutorException: unable to find a place to schedule work.
863         """
864
865         super().__init__()
866         self.workers = workers
867         self.policy = policy
868         self.worker_count = 0
869         for worker in self.workers:
870             self.worker_count += worker.count
871         if self.worker_count <= 0:
872             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
873             logger.critical(msg)
874             raise RemoteExecutorException(msg)
875         self.policy.register_worker_pool(self.workers)
876         self.cv = threading.Condition()
877         logger.debug(
878             'Creating %d local threads, one per remote worker.', self.worker_count
879         )
880         self._helper_executor = fut.ThreadPoolExecutor(
881             thread_name_prefix="remote_executor_helper",
882             max_workers=self.worker_count,
883         )
884         self.status = RemoteExecutorStatus(self.worker_count)
885         self.total_bundles_submitted = 0
886         self.backup_lock = threading.Lock()
887         self.last_backup = None
888         (
889             self.heartbeat_thread,
890             self.heartbeat_stop_event,
891         ) = self._run_periodic_heartbeat()
892         self.already_shutdown = False
893
894     @background_thread
895     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
896         """
897         We create a background thread to invoke :meth:`_heartbeat` regularly
898         while we are scheduling work.  It does some accounting such as
899         looking for slow bundles to tag for backup creation, checking for
900         unexpected failures, and printing a fancy message on stdout.
901         """
902         while not stop_event.is_set():
903             time.sleep(5.0)
904             logger.debug('Running periodic heartbeat code...')
905             self._heartbeat()
906         logger.debug('Periodic heartbeat thread shutting down.')
907
908     def _heartbeat(self) -> None:
909         # Note: this is invoked on a background thread, not an
910         # executor thread.  Be careful what you do with it b/c it
911         # needs to get back and dump status again periodically.
912         with self.status.lock:
913             self.status.periodic_dump(self.total_bundles_submitted)
914
915             # Look for bundles to reschedule via executor.submit
916             if config.config['executors_schedule_remote_backups']:
917                 self._maybe_schedule_backup_bundles()
918
919     def _maybe_schedule_backup_bundles(self):
920         """Maybe schedule backup bundles if we see a very slow bundle."""
921
922         assert self.status.lock.locked()
923         num_done = len(self.status.finished_bundle_timings)
924         num_idle_workers = self.worker_count - self.task_count
925         now = time.time()
926         if (
927             num_done >= 2
928             and num_idle_workers > 0
929             and (self.last_backup is None or (now - self.last_backup > 9.0))
930             and self.backup_lock.acquire(blocking=False)
931         ):
932             try:
933                 assert self.backup_lock.locked()
934
935                 bundle_to_backup = None
936                 best_score = None
937                 for (
938                     worker,
939                     bundle_uuids,
940                 ) in self.status.in_flight_bundles_by_worker.items():
941
942                     # Prefer to schedule backups of bundles running on
943                     # slower machines.
944                     base_score = 0
945                     for record in self.workers:
946                         if worker.machine == record.machine:
947                             temp_score = float(record.weight)
948                             temp_score = 1.0 / temp_score
949                             temp_score *= 200.0
950                             base_score = int(temp_score)
951                             break
952
953                     for uuid in bundle_uuids:
954                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
955                         if (
956                             bundle is not None
957                             and bundle.src_bundle is None
958                             and bundle.backup_bundles is not None
959                         ):
960                             score = base_score
961
962                             # Schedule backups of bundles running
963                             # longer; especially those that are
964                             # unexpectedly slow.
965                             start_ts = self.status.start_per_bundle[uuid]
966                             if start_ts is not None:
967                                 runtime = now - start_ts
968                                 score += runtime
969                                 logger.debug(
970                                     'score[%s] => %.1f  # latency boost', bundle, score
971                                 )
972
973                                 if bundle.slower_than_local_p95:
974                                     score += runtime / 2
975                                     logger.debug(
976                                         'score[%s] => %.1f  # >worker p95',
977                                         bundle,
978                                         score,
979                                     )
980
981                                 if bundle.slower_than_global_p95:
982                                     score += runtime / 4
983                                     logger.debug(
984                                         'score[%s] => %.1f  # >global p95',
985                                         bundle,
986                                         score,
987                                     )
988
989                             # Prefer backups of bundles that don't
990                             # have backups already.
991                             backup_count = len(bundle.backup_bundles)
992                             if backup_count == 0:
993                                 score *= 2
994                             elif backup_count == 1:
995                                 score /= 2
996                             elif backup_count == 2:
997                                 score /= 8
998                             else:
999                                 score = 0
1000                             logger.debug(
1001                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
1002                                 bundle,
1003                                 score,
1004                             )
1005
1006                             if score != 0 and (
1007                                 best_score is None or score > best_score
1008                             ):
1009                                 bundle_to_backup = bundle
1010                                 assert bundle is not None
1011                                 assert bundle.backup_bundles is not None
1012                                 assert bundle.src_bundle is None
1013                                 best_score = score
1014
1015                 # Note: this is all still happening on the heartbeat
1016                 # runner thread.  That's ok because
1017                 # _schedule_backup_for_bundle uses the executor to
1018                 # submit the bundle again which will cause it to be
1019                 # picked up by a worker thread and allow this thread
1020                 # to return to run future heartbeats.
1021                 if bundle_to_backup is not None:
1022                     self.last_backup = now
1023                     logger.info(
1024                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1025                         bundle_to_backup,
1026                         best_score,
1027                     )
1028                     self._schedule_backup_for_bundle(bundle_to_backup)
1029             finally:
1030                 self.backup_lock.release()
1031
1032     def _is_worker_available(self) -> bool:
1033         """Is there a worker available currently?"""
1034         return self.policy.is_worker_available()
1035
1036     def _acquire_worker(
1037         self, machine_to_avoid: str = None
1038     ) -> Optional[RemoteWorkerRecord]:
1039         """Try to acquire a worker."""
1040         return self.policy.acquire_worker(machine_to_avoid)
1041
1042     def _find_available_worker_or_block(
1043         self, machine_to_avoid: str = None
1044     ) -> RemoteWorkerRecord:
1045         """Find a worker or block until one becomes available."""
1046         with self.cv:
1047             while not self._is_worker_available():
1048                 self.cv.wait()
1049             worker = self._acquire_worker(machine_to_avoid)
1050             if worker is not None:
1051                 return worker
1052         msg = "We should never reach this point in the code"
1053         logger.critical(msg)
1054         raise Exception(msg)
1055
1056     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1057         """Release a previously acquired worker."""
1058         worker = bundle.worker
1059         assert worker is not None
1060         logger.debug('Released worker %s', worker)
1061         self.status.record_release_worker(
1062             worker,
1063             bundle.uuid,
1064             was_cancelled,
1065         )
1066         with self.cv:
1067             worker.count += 1
1068             self.cv.notify()
1069         self.adjust_task_count(-1)
1070
1071     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1072         """See if a particular bundle is cancelled.  Do not block."""
1073         with self.status.lock:
1074             if bundle.is_cancelled.wait(timeout=0.0):
1075                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1076                 bundle.was_cancelled = True
1077                 return True
1078         return False
1079
1080     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1081         """Find a worker for bundle or block until one is available."""
1082
1083         self.adjust_task_count(+1)
1084         uuid = bundle.uuid
1085         controller = bundle.controller
1086         avoid_machine = override_avoid_machine
1087         is_original = bundle.src_bundle is None
1088
1089         # Try not to schedule a backup on the same host as the original.
1090         if avoid_machine is None and bundle.src_bundle is not None:
1091             avoid_machine = bundle.src_bundle.machine
1092         worker = None
1093         while worker is None:
1094             worker = self._find_available_worker_or_block(avoid_machine)
1095         assert worker is not None
1096
1097         # Ok, found a worker.
1098         bundle.worker = worker
1099         machine = bundle.machine = worker.machine
1100         username = bundle.username = worker.username
1101         self.status.record_acquire_worker(worker, uuid)
1102         logger.debug('%s: Running bundle on %s...', bundle, worker)
1103
1104         # Before we do any work, make sure the bundle is still viable.
1105         # It may have been some time between when it was submitted and
1106         # now due to lack of worker availability and someone else may
1107         # have already finished it.
1108         if self._check_if_cancelled(bundle):
1109             try:
1110                 return self._process_work_result(bundle)
1111             except Exception:
1112                 logger.warning(
1113                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1114                 )
1115                 self._release_worker(bundle)
1116                 if is_original:
1117                     # Weird.  We are the original owner of this
1118                     # bundle.  For it to have been cancelled, a backup
1119                     # must have already started and completed before
1120                     # we even for started.  Moreover, the backup says
1121                     # it is done but we can't find the results it
1122                     # should have copied over.  Reschedule the whole
1123                     # thing.
1124                     logger.exception(
1125                         '%s: We are the original owner thread and yet there are '
1126                         'no results for this bundle.  This is unexpected and bad. '
1127                         'Attempting an emergency retry...',
1128                         bundle,
1129                     )
1130                     return self._emergency_retry_nasty_bundle(bundle)
1131                 else:
1132                     # We're a backup and our bundle is cancelled
1133                     # before we even got started.  Do nothing and let
1134                     # the original bundle's thread worry about either
1135                     # finding the results or complaining about it.
1136                     return None
1137
1138         # Send input code / data to worker machine if it's not local.
1139         if controller not in machine:
1140             try:
1141                 cmd = (
1142                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1143                 )
1144                 start_ts = time.time()
1145                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1146                 run_silently(cmd)
1147                 xfer_latency = time.time() - start_ts
1148                 logger.debug(
1149                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1150                 )
1151             except Exception:
1152                 self._release_worker(bundle)
1153                 if is_original:
1154                     # Weird.  We tried to copy the code to the worker
1155                     # and it failed...  And we're the original bundle.
1156                     # We have to retry.
1157                     logger.exception(
1158                         "%s: Failed to send instructions to the worker machine?! "
1159                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1160                         "be a race condition.  Attempting an emergency retry...",
1161                         bundle,
1162                     )
1163                     return self._emergency_retry_nasty_bundle(bundle)
1164                 else:
1165                     # This is actually expected; we're a backup.
1166                     # There's a race condition where someone else
1167                     # already finished the work and removed the source
1168                     # code_file before we could copy it.  Ignore.
1169                     logger.warning(
1170                         '%s: Failed to send instructions to the worker machine... '
1171                         'We\'re a backup and this may be caused by the original (or '
1172                         'some other backup) already finishing this work.  Ignoring.',
1173                         bundle,
1174                     )
1175                     return None
1176
1177         # Kick off the work.  Note that if this fails we let
1178         # _wait_for_process deal with it.
1179         self.status.record_processing_began(uuid)
1180         helper_path = config.config['remote_worker_helper_path']
1181         cmd = (
1182             f'{SSH} {bundle.username}@{bundle.machine} '
1183             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1184         )
1185         logger.debug(
1186             '%s: Executing %s in the background to kick off work...', bundle, cmd
1187         )
1188         p = cmd_in_background(cmd, silent=True)
1189         bundle.pid = p.pid
1190         logger.debug(
1191             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1192         )
1193         return self._wait_for_process(p, bundle, 0)
1194
1195     def _wait_for_process(
1196         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1197     ) -> Any:
1198         """At this point we've copied the bundle's pickled code to the remote
1199         worker and started an ssh process that should be invoking the
1200         remote worker to have it execute the user's code.  See how
1201         that's going and wait for it to complete or fail.  Note that
1202         this code is recursive: there are codepaths where we decide to
1203         stop waiting for an ssh process (because another backup seems
1204         to have finished) but then fail to fetch or parse the results
1205         from that backup and thus call ourselves to continue waiting
1206         on an active ssh process.  This is the purpose of the depth
1207         argument: to curtail potential infinite recursion by giving up
1208         eventually.
1209
1210         Args:
1211             p: the Popen record of the ssh job
1212             bundle: the bundle of work being executed remotely
1213             depth: how many retries we've made so far.  Starts at zero.
1214
1215         """
1216
1217         machine = bundle.machine
1218         assert p is not None
1219         pid = p.pid  # pid of the ssh process
1220         if depth > 3:
1221             logger.error(
1222                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1223                 pid,
1224             )
1225             p.terminate()
1226             self._release_worker(bundle)
1227             return self._emergency_retry_nasty_bundle(bundle)
1228
1229         # Spin until either the ssh job we scheduled finishes the
1230         # bundle or some backup worker signals that they finished it
1231         # before we could.
1232         while True:
1233             try:
1234                 p.wait(timeout=0.25)
1235             except subprocess.TimeoutExpired:
1236                 if self._check_if_cancelled(bundle):
1237                     logger.info(
1238                         '%s: looks like another worker finished bundle...', bundle
1239                     )
1240                     break
1241             else:
1242                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1243                 p = None
1244                 break
1245
1246         # If we get here we believe the bundle is done; either the ssh
1247         # subprocess finished (hopefully successfully) or we noticed
1248         # that some other worker seems to have completed the bundle
1249         # before us and we're bailing out.
1250         try:
1251             ret = self._process_work_result(bundle)
1252             if ret is not None and p is not None:
1253                 p.terminate()
1254             return ret
1255
1256         # Something went wrong; e.g. we could not copy the results
1257         # back, cleanup after ourselves on the remote machine, or
1258         # unpickle the results we got from the remove machine.  If we
1259         # still have an active ssh subprocess, keep waiting on it.
1260         # Otherwise, time for an emergency reschedule.
1261         except Exception:
1262             logger.exception('%s: Something unexpected just happened...', bundle)
1263             if p is not None:
1264                 logger.warning(
1265                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1266                     bundle,
1267                 )
1268                 return self._wait_for_process(p, bundle, depth + 1)
1269             else:
1270                 self._release_worker(bundle)
1271                 return self._emergency_retry_nasty_bundle(bundle)
1272
1273     def _process_work_result(self, bundle: BundleDetails) -> Any:
1274         """A bundle seems to be completed.  Check on the results."""
1275
1276         with self.status.lock:
1277             is_original = bundle.src_bundle is None
1278             was_cancelled = bundle.was_cancelled
1279             username = bundle.username
1280             machine = bundle.machine
1281             result_file = bundle.result_file
1282             code_file = bundle.code_file
1283
1284             # Whether original or backup, if we finished first we must
1285             # fetch the results if the computation happened on a
1286             # remote machine.
1287             bundle.end_ts = time.time()
1288             if not was_cancelled:
1289                 assert bundle.machine is not None
1290                 if bundle.controller not in bundle.machine:
1291                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1292                     logger.info(
1293                         "%s: Fetching results back from %s@%s via %s",
1294                         bundle,
1295                         username,
1296                         machine,
1297                         cmd,
1298                     )
1299
1300                     # If either of these throw they are handled in
1301                     # _wait_for_process.
1302                     attempts = 0
1303                     while True:
1304                         try:
1305                             run_silently(cmd)
1306                         except Exception as e:
1307                             attempts += 1
1308                             if attempts >= 3:
1309                                 raise e
1310                         else:
1311                             break
1312
1313                     # Cleanup remote /tmp files.
1314                     run_silently(
1315                         f'{SSH} {username}@{machine}'
1316                         f' "/bin/rm -f {code_file} {result_file}"'
1317                     )
1318                     logger.debug(
1319                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1320                     )
1321                 dur = bundle.end_ts - bundle.start_ts
1322                 self.histogram.add_item(dur)
1323
1324         # Only the original worker should unpickle the file contents
1325         # though since it's the only one whose result matters.  The
1326         # original is also the only job that may delete result_file
1327         # from disk.  Note that the original may have been cancelled
1328         # if one of the backups finished first; it still must read the
1329         # result from disk.  It still does that here with is_cancelled
1330         # set.
1331         if is_original:
1332             logger.debug("%s: Unpickling %s.", bundle, result_file)
1333             try:
1334                 with open(result_file, 'rb') as rb:
1335                     serialized = rb.read()
1336                 result = cloudpickle.loads(serialized)
1337             except Exception as e:
1338                 logger.exception('Failed to load %s... this is bad news.', result_file)
1339                 self._release_worker(bundle)
1340
1341                 # Re-raise the exception; the code in _wait_for_process may
1342                 # decide to _emergency_retry_nasty_bundle here.
1343                 raise e
1344             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1345             os.remove(result_file)
1346             os.remove(code_file)
1347
1348             # Notify any backups that the original is done so they
1349             # should stop ASAP.  Do this whether or not we
1350             # finished first since there could be more than one
1351             # backup.
1352             if bundle.backup_bundles is not None:
1353                 for backup in bundle.backup_bundles:
1354                     logger.debug(
1355                         '%s: Notifying backup %s that it\'s cancelled',
1356                         bundle,
1357                         backup.uuid,
1358                     )
1359                     backup.is_cancelled.set()
1360
1361         # This is a backup job and, by now, we have already fetched
1362         # the bundle results.
1363         else:
1364             # Backup results don't matter, they just need to leave the
1365             # result file in the right place for their originals to
1366             # read/unpickle later.
1367             result = None
1368
1369             # Tell the original to stop if we finished first.
1370             if not was_cancelled:
1371                 orig_bundle = bundle.src_bundle
1372                 assert orig_bundle is not None
1373                 logger.debug(
1374                     '%s: Notifying original %s we beat them to it.',
1375                     bundle,
1376                     orig_bundle.uuid,
1377                 )
1378                 orig_bundle.is_cancelled.set()
1379         self._release_worker(bundle, was_cancelled=was_cancelled)
1380         return result
1381
1382     def _create_original_bundle(self, pickle, function_name: str):
1383         """Creates a bundle that is not a backup of any other bundle but
1384         rather represents a user task.
1385         """
1386
1387         uuid = string_utils.generate_uuid(omit_dashes=True)
1388         code_file = f'/tmp/{uuid}.code.bin'
1389         result_file = f'/tmp/{uuid}.result.bin'
1390
1391         logger.debug('Writing pickled code to %s', code_file)
1392         with open(code_file, 'wb') as wb:
1393             wb.write(pickle)
1394
1395         bundle = BundleDetails(
1396             pickled_code=pickle,
1397             uuid=uuid,
1398             function_name=function_name,
1399             worker=None,
1400             username=None,
1401             machine=None,
1402             controller=platform.node(),
1403             code_file=code_file,
1404             result_file=result_file,
1405             pid=0,
1406             start_ts=time.time(),
1407             end_ts=0.0,
1408             slower_than_local_p95=False,
1409             slower_than_global_p95=False,
1410             src_bundle=None,
1411             is_cancelled=threading.Event(),
1412             was_cancelled=False,
1413             backup_bundles=[],
1414             failure_count=0,
1415         )
1416         self.status.record_bundle_details(bundle)
1417         logger.debug('%s: Created an original bundle', bundle)
1418         return bundle
1419
1420     def _create_backup_bundle(self, src_bundle: BundleDetails):
1421         """Creates a bundle that is a backup of another bundle that is
1422         running too slowly."""
1423
1424         assert self.status.lock.locked()
1425         assert src_bundle.backup_bundles is not None
1426         n = len(src_bundle.backup_bundles)
1427         uuid = src_bundle.uuid + f'_backup#{n}'
1428
1429         backup_bundle = BundleDetails(
1430             pickled_code=src_bundle.pickled_code,
1431             uuid=uuid,
1432             function_name=src_bundle.function_name,
1433             worker=None,
1434             username=None,
1435             machine=None,
1436             controller=src_bundle.controller,
1437             code_file=src_bundle.code_file,
1438             result_file=src_bundle.result_file,
1439             pid=0,
1440             start_ts=time.time(),
1441             end_ts=0.0,
1442             slower_than_local_p95=False,
1443             slower_than_global_p95=False,
1444             src_bundle=src_bundle,
1445             is_cancelled=threading.Event(),
1446             was_cancelled=False,
1447             backup_bundles=None,  # backup backups not allowed
1448             failure_count=0,
1449         )
1450         src_bundle.backup_bundles.append(backup_bundle)
1451         self.status.record_bundle_details_already_locked(backup_bundle)
1452         logger.debug('%s: Created a backup bundle', backup_bundle)
1453         return backup_bundle
1454
1455     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1456         """Schedule a backup of src_bundle."""
1457
1458         assert self.status.lock.locked()
1459         assert src_bundle is not None
1460         backup_bundle = self._create_backup_bundle(src_bundle)
1461         logger.debug(
1462             '%s/%s: Scheduling backup for execution...',
1463             backup_bundle.uuid,
1464             backup_bundle.function_name,
1465         )
1466         self._helper_executor.submit(self._launch, backup_bundle)
1467
1468         # Results from backups don't matter; if they finish first
1469         # they will move the result_file to this machine and let
1470         # the original pick them up and unpickle them (and return
1471         # a result).
1472
1473     def _emergency_retry_nasty_bundle(
1474         self, bundle: BundleDetails
1475     ) -> Optional[fut.Future]:
1476         """Something unexpectedly failed with bundle.  Either retry it
1477         from the beginning or throw in the towel and give up on it.
1478
1479         Raises:
1480             RemoteExecutorException: a bundle fails repeatedly.
1481         """
1482
1483         is_original = bundle.src_bundle is None
1484         bundle.worker = None
1485         avoid_last_machine = bundle.machine
1486         bundle.machine = None
1487         bundle.username = None
1488         bundle.failure_count += 1
1489         if is_original:
1490             retry_limit = 3
1491         else:
1492             retry_limit = 2
1493
1494         if bundle.failure_count > retry_limit:
1495             logger.error(
1496                 '%s: Tried this bundle too many times already (%dx); giving up.',
1497                 bundle,
1498                 retry_limit,
1499             )
1500             if is_original:
1501                 raise RemoteExecutorException(
1502                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1503                 )
1504             logger.error(
1505                 '%s: At least it\'s only a backup; better luck with the others.',
1506                 bundle,
1507             )
1508             return None
1509         else:
1510             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1511             logger.warning(msg)
1512             warnings.warn(msg)
1513             return self._launch(bundle, avoid_last_machine)
1514
1515     @overrides
1516     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1517         """Submit work to be done.  This is the user entry point of this
1518         class.
1519
1520         Raises:
1521             Exception: executor is already shutting down.
1522         """
1523         if self.already_shutdown:
1524             raise Exception('Submitted work after shutdown.')
1525         pickle = _make_cloud_pickle(function, *args, **kwargs)
1526         bundle = self._create_original_bundle(pickle, function.__name__)
1527         self.total_bundles_submitted += 1
1528         return self._helper_executor.submit(self._launch, bundle)
1529
1530     @overrides
1531     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1532         """Shutdown the executor."""
1533         if not self.already_shutdown:
1534             logging.debug('Shutting down RemoteExecutor %s', self.title)
1535             self.heartbeat_stop_event.set()
1536             self.heartbeat_thread.join()
1537             self._helper_executor.shutdown(wait)
1538             if not quiet:
1539                 print(self.histogram.__repr__(label_formatter='%ds'))
1540             self.already_shutdown = True
1541
1542
1543 class RemoteWorkerPoolProvider:
1544     @abstractmethod
1545     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1546         pass
1547
1548
1549 @persistent.persistent_autoloaded_singleton()  # type: ignore
1550 class ConfigRemoteWorkerPoolProvider(
1551     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1552 ):
1553     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1554         self.remote_worker_pool: List[RemoteWorkerRecord] = []
1555         for record in json_remote_worker_pool['remote_worker_records']:
1556             self.remote_worker_pool.append(
1557                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1558             )
1559         assert len(self.remote_worker_pool) > 0
1560
1561     @overrides
1562     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1563         return self.remote_worker_pool
1564
1565     @overrides
1566     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1567         return self.remote_worker_pool
1568
1569     @staticmethod
1570     @overrides
1571     def get_filename() -> str:
1572         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1573
1574     @staticmethod
1575     @overrides
1576     def should_we_load_data(filename: str) -> bool:
1577         return True
1578
1579     @staticmethod
1580     @overrides
1581     def should_we_save_data(filename: str) -> bool:
1582         return False
1583
1584
1585 @singleton
1586 class DefaultExecutors(object):
1587     """A container for a default thread, process and remote executor.
1588     These are not created until needed and we take care to clean up
1589     before process exit automatically for the caller's convenience.
1590     Instead of creating your own executor, consider using the one
1591     from this pool.  e.g.::
1592
1593         @par.parallelize(method=par.Method.PROCESS)
1594         def do_work(
1595             solutions: List[Work],
1596             shard_num: int,
1597             ...
1598         ):
1599             <do the work>
1600
1601
1602         def start_do_work(all_work: List[Work]):
1603             shards = []
1604             logger.debug('Sharding work into groups of 10.')
1605             for subset in list_utils.shard(all_work, 10):
1606                 shards.append([x for x in subset])
1607
1608             logger.debug('Kicking off helper pool.')
1609             try:
1610                 for n, shard in enumerate(shards):
1611                     results.append(
1612                         do_work(
1613                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1614                         )
1615                     )
1616                 smart_future.wait_all(results)
1617             finally:
1618                 # Note: if you forget to do this it will clean itself up
1619                 # during program termination including tearing down any
1620                 # active ssh connections.
1621                 executors.DefaultExecutors().process_pool().shutdown()
1622     """
1623
1624     def __init__(self):
1625         self.thread_executor: Optional[ThreadExecutor] = None
1626         self.process_executor: Optional[ProcessExecutor] = None
1627         self.remote_executor: Optional[RemoteExecutor] = None
1628
1629     @staticmethod
1630     def _ping(host) -> bool:
1631         logger.debug('RUN> ping -c 1 %s', host)
1632         try:
1633             x = cmd_exitcode(
1634                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1635             )
1636             return x == 0
1637         except Exception:
1638             return False
1639
1640     def thread_pool(self) -> ThreadExecutor:
1641         if self.thread_executor is None:
1642             self.thread_executor = ThreadExecutor()
1643         return self.thread_executor
1644
1645     def process_pool(self) -> ProcessExecutor:
1646         if self.process_executor is None:
1647             self.process_executor = ProcessExecutor()
1648         return self.process_executor
1649
1650     def remote_pool(self) -> RemoteExecutor:
1651         if self.remote_executor is None:
1652             logger.info('Looking for some helper machines...')
1653             provider = ConfigRemoteWorkerPoolProvider()
1654             all_machines = provider.get_remote_workers()
1655             pool = []
1656
1657             # Make sure we can ping each machine.
1658             for record in all_machines:
1659                 if self._ping(record.machine):
1660                     logger.info('%s is alive / responding to pings', record.machine)
1661                     pool.append(record)
1662
1663             # The controller machine has a lot to do; go easy on it.
1664             for record in pool:
1665                 if record.machine == platform.node() and record.count > 1:
1666                     logger.info('Reducing workload for %s.', record.machine)
1667                     record.count = max(int(record.count / 2), 1)
1668
1669             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1670             policy.register_worker_pool(pool)
1671             self.remote_executor = RemoteExecutor(pool, policy)
1672         return self.remote_executor
1673
1674     def shutdown(self) -> None:
1675         if self.thread_executor is not None:
1676             self.thread_executor.shutdown(wait=True, quiet=True)
1677             self.thread_executor = None
1678         if self.process_executor is not None:
1679             self.process_executor.shutdown(wait=True, quiet=True)
1680             self.process_executor = None
1681         if self.remote_executor is not None:
1682             self.remote_executor.shutdown(wait=True, quiet=True)
1683             self.remote_executor = None