Persistent should be a type.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
5
6 # © Copyright 2021-2023, Scott Gasch
7
8 """
9 This module defines a :class:`BaseExecutor` interface and three
10 implementations:
11
12     - :class:`ThreadExecutor`
13     - :class:`ProcessExecutor`
14     - :class:`RemoteExecutor`
15
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work.  Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code.  But it's good for
20 work that is mostly I/O bound.
21
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
24 workloads.
25
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network.  You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
31
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed.  It has the added benefit of being automatically
35 cleaned up at process termination time.
36
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
39 """
40
41 from __future__ import annotations
42
43 import concurrent.futures as fut
44 import logging
45 import os
46 import platform
47 import random
48 import subprocess
49 import threading
50 import time
51 import warnings
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
56
57 import cloudpickle  # type: ignore
58 from overrides import overrides
59
60 import pyutils.typez.histogram as hist
61 from pyutils import argparse_utils, config, dataclass_utils, math_utils, string_utils
62 from pyutils.ansi import bg, fg, reset, underline
63 from pyutils.decorator_utils import singleton
64 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
65 from pyutils.parallelize.thread_utils import background_thread
66 from pyutils.typez import persistent, type_utils
67
68 logger = logging.getLogger(__name__)
69
70 parser = config.add_commandline_args(
71     f"Executors ({__file__})", "Args related to processing executors."
72 )
73 parser.add_argument(
74     '--executors_threadpool_size',
75     type=int,
76     metavar='#THREADS',
77     help='Number of threads in the default threadpool, leave unset for default',
78     default=None,
79 )
80 parser.add_argument(
81     '--executors_processpool_size',
82     type=int,
83     metavar='#PROCESSES',
84     help='Number of processes in the default processpool, leave unset for default',
85     default=None,
86 )
87 parser.add_argument(
88     '--executors_schedule_remote_backups',
89     default=True,
90     action=argparse_utils.ActionNoYes,
91     help='Should we schedule duplicative backup work if a remote bundle is slow',
92 )
93 parser.add_argument(
94     '--executors_max_bundle_failures',
95     type=int,
96     default=3,
97     metavar='#FAILURES',
98     help='Maximum number of failures before giving up on a bundle',
99 )
100 parser.add_argument(
101     '--remote_worker_records_file',
102     type=str,
103     metavar='FILENAME',
104     help='Path of the remote worker records file (JSON)',
105     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
106 )
107 parser.add_argument(
108     '--remote_worker_helper_path',
109     type=str,
110     metavar='PATH_TO_REMOTE_WORKER_PY',
111     help='Path to remote_worker.py on remote machines',
112     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
113 )
114
115
116 SSH = '/usr/bin/ssh -oForwardX11=no'
117 SCP = '/usr/bin/scp -C'
118
119
120 def _make_cloud_pickle(fun, *args, **kwargs):
121     """Internal helper to create cloud pickles."""
122     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
123     return cloudpickle.dumps((fun, args, kwargs))
124
125
126 class BaseExecutor(ABC):
127     """The base executor interface definition.  The interface for
128     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
129     :class:`ThreadExecutor`.
130     """
131
132     def __init__(self, *, title=''):
133         """
134         Args:
135             title: the name of this executor.
136         """
137         self.title = title
138         self.histogram = hist.SimpleHistogram(
139             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
140         )
141         self.task_count = 0
142
143     @abstractmethod
144     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
145         """Submit work for the executor to do.
146
147         Args:
148             function: the Callable to be executed.
149             *args: the arguments to function
150             **kwargs: the arguments to function
151
152         Returns:
153             A concurrent :class:`Future` representing the result of the
154             work.
155         """
156         pass
157
158     @abstractmethod
159     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
160         """Shutdown the executor.
161
162         Args:
163             wait: wait for the shutdown to complete before returning?
164             quiet: keep it quiet, please.
165         """
166         pass
167
168     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
169         """Shutdown the executor and return True if the executor is idle
170         (i.e. there are no pending or active tasks).  Return False
171         otherwise.  Note: this should only be called by the launcher
172         process.
173
174         Args:
175             quiet: keep it quiet, please.
176
177         Returns:
178             True if the executor could be shut down because it has no
179             pending work, False otherwise.
180         """
181         if self.task_count == 0:
182             self.shutdown(wait=True, quiet=quiet)
183             return True
184         return False
185
186     def adjust_task_count(self, delta: int) -> None:
187         """Change the task count.  Note: do not call this method from a
188         worker, it should only be called by the launcher process /
189         thread / machine.
190
191         Args:
192             delta: the delta value by which to adjust task count.
193         """
194         self.task_count += delta
195         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
196
197     def get_task_count(self) -> int:
198         """Change the task count.  Note: do not call this method from a
199         worker, it should only be called by the launcher process /
200         thread / machine.
201
202         Returns:
203             The executor's current task count.
204         """
205         return self.task_count
206
207
208 class ThreadExecutor(BaseExecutor):
209     """A threadpool executor.  This executor uses Python threads to
210     schedule tasks.  Note that, at least as of python3.10, because of
211     the global lock in the interpreter itself, these do not
212     parallelize very well so this class is useful mostly for non-CPU
213     intensive tasks.
214
215     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
216     """
217
218     def __init__(self, max_workers: Optional[int] = None):
219         """
220         Args:
221             max_workers: maximum number of threads to create in the pool.
222         """
223         super().__init__()
224         workers = None
225         if max_workers is not None:
226             workers = max_workers
227         elif 'executors_threadpool_size' in config.config:
228             workers = config.config['executors_threadpool_size']
229         if workers is not None:
230             logger.debug('Creating threadpool executor with %d workers', workers)
231         else:
232             logger.debug('Creating a default sized threadpool executor')
233         self._thread_pool_executor = fut.ThreadPoolExecutor(
234             max_workers=workers, thread_name_prefix="thread_executor_helper"
235         )
236         self.already_shutdown = False
237
238     # This is run on a different thread; do not adjust task count here.
239     @staticmethod
240     def _run_local_bundle(fun, *args, **kwargs):
241         logger.debug("Running local bundle at %s", fun.__name__)
242         result = fun(*args, **kwargs)
243         return result
244
245     @overrides
246     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
247         if self.already_shutdown:
248             raise Exception('Submitted work after shutdown.')
249         self.adjust_task_count(+1)
250         newargs = []
251         newargs.append(function)
252         for arg in args:
253             newargs.append(arg)
254         start = time.time()
255         result = self._thread_pool_executor.submit(
256             ThreadExecutor._run_local_bundle, *newargs, **kwargs
257         )
258         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
259         result.add_done_callback(lambda _: self.adjust_task_count(-1))
260         return result
261
262     @overrides
263     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
264         if not self.already_shutdown:
265             logger.debug('Shutting down threadpool executor %s', self.title)
266             self._thread_pool_executor.shutdown(wait)
267             if not quiet:
268                 print(self.histogram.__repr__(label_formatter='%ds'))
269             self.already_shutdown = True
270
271
272 class ProcessExecutor(BaseExecutor):
273     """An executor which runs tasks in child processes.
274
275     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
276     """
277
278     def __init__(self, max_workers=None):
279         """
280         Args:
281             max_workers: the max number of worker processes to create.
282         """
283         super().__init__()
284         workers = None
285         if max_workers is not None:
286             workers = max_workers
287         elif 'executors_processpool_size' in config.config:
288             workers = config.config['executors_processpool_size']
289         if workers is not None:
290             logger.debug('Creating processpool executor with %d workers.', workers)
291         else:
292             logger.debug('Creating a default sized processpool executor')
293         self._process_executor = fut.ProcessPoolExecutor(
294             max_workers=workers,
295         )
296         self.already_shutdown = False
297
298     # This is run in another process; do not adjust task count here.
299     @staticmethod
300     def _run_cloud_pickle(pickle):
301         fun, args, kwargs = cloudpickle.loads(pickle)
302         logger.debug("Running pickled bundle at %s", fun.__name__)
303         result = fun(*args, **kwargs)
304         return result
305
306     @overrides
307     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
308         if self.already_shutdown:
309             raise Exception('Submitted work after shutdown.')
310         start = time.time()
311         self.adjust_task_count(+1)
312         pickle = _make_cloud_pickle(function, *args, **kwargs)
313         result = self._process_executor.submit(
314             ProcessExecutor._run_cloud_pickle, pickle
315         )
316         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
317         result.add_done_callback(lambda _: self.adjust_task_count(-1))
318         return result
319
320     @overrides
321     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
322         if not self.already_shutdown:
323             logger.debug('Shutting down processpool executor %s', self.title)
324             self._process_executor.shutdown(wait)
325             if not quiet:
326                 print(self.histogram.__repr__(label_formatter='%ds'))
327             self.already_shutdown = True
328
329     def __getstate__(self):
330         state = self.__dict__.copy()
331         state['_process_executor'] = None
332         return state
333
334
335 class RemoteExecutorException(Exception):
336     """Thrown when a bundle cannot be executed despite several retries."""
337
338     pass
339
340
341 @dataclass
342 class RemoteWorkerRecord:
343     """A record of info about a remote worker."""
344
345     username: str
346     """Username we can ssh into on this machine to run work."""
347
348     machine: str
349     """Machine address / name."""
350
351     weight: int
352     """Relative probability for the weighted policy to select this
353     machine for scheduling work."""
354
355     count: int
356     """If this machine is selected, what is the maximum number of task
357     that it can handle?"""
358
359     def __hash__(self):
360         return hash((self.username, self.machine))
361
362     def __repr__(self):
363         return f'{self.username}@{self.machine}'
364
365
366 @dataclass
367 class BundleDetails:
368     """All info necessary to define some unit of work that needs to be
369     done, where it is being run, its state, whether it is an original
370     bundle of a backup bundle, how many times it has failed, etc...
371     """
372
373     pickled_code: bytes
374     """The code to run, cloud pickled"""
375
376     uuid: str
377     """A unique identifier"""
378
379     function_name: str
380     """The name of the function we pickled"""
381
382     worker: Optional[RemoteWorkerRecord]
383     """The remote worker running this bundle or None if none (yet)"""
384
385     username: Optional[str]
386     """The remote username running this bundle or None if none (yet)"""
387
388     machine: Optional[str]
389     """The remote machine running this bundle or None if none (yet)"""
390
391     controller: str
392     """The controller machine"""
393
394     code_file: str
395     """A unique filename to hold the work to be done"""
396
397     result_file: str
398     """Where the results should be placed / read from"""
399
400     pid: int
401     """The process id of the local subprocess watching the ssh connection
402     to the remote machine"""
403
404     start_ts: float
405     """Starting time"""
406
407     end_ts: float
408     """Ending time"""
409
410     slower_than_local_p95: bool
411     """Currently slower then 95% of other bundles on remote host"""
412
413     slower_than_global_p95: bool
414     """Currently slower than 95% of other bundles globally"""
415
416     src_bundle: Optional[BundleDetails]
417     """If this is a backup bundle, this points to the original bundle
418     that it's backing up.  None otherwise."""
419
420     is_cancelled: threading.Event
421     """An event that can be signaled to indicate this bundle is cancelled.
422     This is set when another copy (backup or original) of this work has
423     completed successfully elsewhere."""
424
425     was_cancelled: bool
426     """True if this bundle was cancelled, False if it finished normally"""
427
428     backup_bundles: Optional[List[BundleDetails]]
429     """If we've created backups of this bundle, this is the list of them"""
430
431     failure_count: int
432     """How many times has this bundle failed already?"""
433
434     def __repr__(self):
435         uuid = self.uuid
436         if uuid[-9:-2] == '_backup':
437             uuid = uuid[:-9]
438             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
439         else:
440             suffix = uuid[-6:]
441
442         # We colorize the uuid based on some bits from it to make them
443         # stand out in the logging and help a reader correlate log messages
444         # related to the same bundle.
445         colorz = [
446             fg('violet red'),
447             fg('red'),
448             fg('orange'),
449             fg('peach orange'),
450             fg('yellow'),
451             fg('marigold yellow'),
452             fg('green yellow'),
453             fg('tea green'),
454             fg('cornflower blue'),
455             fg('turquoise blue'),
456             fg('tropical blue'),
457             fg('lavender purple'),
458             fg('medium purple'),
459         ]
460         c = colorz[int(uuid[-2:], 16) % len(colorz)]
461         function_name = (
462             self.function_name if self.function_name is not None else 'nofname'
463         )
464         machine = self.machine if self.machine is not None else 'nomachine'
465         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
466
467
468 class RemoteExecutorStatus:
469     """A status 'scoreboard' for a remote executor tracking various
470     metrics and able to render a periodic dump of global state.
471     """
472
473     def __init__(self, total_worker_count: int) -> None:
474         """
475         Args:
476             total_worker_count: number of workers in the pool
477         """
478         self.worker_count: int = total_worker_count
479         self.known_workers: Set[RemoteWorkerRecord] = set()
480         self.start_time: float = time.time()
481         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
482         self.end_per_bundle: Dict[str, float] = defaultdict(float)
483         self.finished_bundle_timings_per_worker: Dict[
484             RemoteWorkerRecord, math_utils.NumericPopulation
485         ] = {}
486         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
487         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
488         self.finished_bundle_timings: math_utils.NumericPopulation = (
489             math_utils.NumericPopulation()
490         )
491         self.last_periodic_dump: Optional[float] = None
492         self.total_bundles_submitted: int = 0
493
494         # Protects reads and modification using self.  Also used
495         # as a memory fence for modifications to bundle.
496         self.lock: threading.Lock = threading.Lock()
497
498     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
499         """Record that bundle with uuid is assigned to a particular worker.
500
501         Args:
502             worker: the record of the worker to which uuid is assigned
503             uuid: the uuid of a bundle that has been assigned to a worker
504         """
505         with self.lock:
506             self.record_acquire_worker_already_locked(worker, uuid)
507
508     def record_acquire_worker_already_locked(
509         self, worker: RemoteWorkerRecord, uuid: str
510     ) -> None:
511         """Same as above but an entry point that doesn't acquire the lock
512         for codepaths where it's already held."""
513         assert self.lock.locked()
514         self.known_workers.add(worker)
515         self.start_per_bundle[uuid] = None
516         x = self.in_flight_bundles_by_worker.get(worker, set())
517         x.add(uuid)
518         self.in_flight_bundles_by_worker[worker] = x
519
520     def record_bundle_details(self, details: BundleDetails) -> None:
521         """Register the details about a bundle of work."""
522         with self.lock:
523             self.record_bundle_details_already_locked(details)
524
525     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
526         """Same as above but for codepaths that already hold the lock."""
527         assert self.lock.locked()
528         self.bundle_details_by_uuid[details.uuid] = details
529
530     def record_release_worker(
531         self,
532         worker: RemoteWorkerRecord,
533         uuid: str,
534         was_cancelled: bool,
535     ) -> None:
536         """Record that a bundle has released a worker."""
537         with self.lock:
538             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
539
540     def record_release_worker_already_locked(
541         self,
542         worker: RemoteWorkerRecord,
543         uuid: str,
544         was_cancelled: bool,
545     ) -> None:
546         """Same as above but for codepaths that already hold the lock."""
547         assert self.lock.locked()
548         ts = time.time()
549         self.end_per_bundle[uuid] = ts
550         self.in_flight_bundles_by_worker[worker].remove(uuid)
551         if not was_cancelled:
552             start = self.start_per_bundle[uuid]
553             assert start is not None
554             bundle_latency = ts - start
555             x = self.finished_bundle_timings_per_worker.get(
556                 worker, math_utils.NumericPopulation()
557             )
558             x.add_number(bundle_latency)
559             self.finished_bundle_timings_per_worker[worker] = x
560             self.finished_bundle_timings.add_number(bundle_latency)
561
562     def record_processing_began(self, uuid: str):
563         """Record when work on a bundle begins."""
564         with self.lock:
565             self.start_per_bundle[uuid] = time.time()
566
567     def total_in_flight(self) -> int:
568         """How many bundles are in flight currently?"""
569         assert self.lock.locked()
570         total_in_flight = 0
571         for worker in self.known_workers:
572             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
573         return total_in_flight
574
575     def total_idle(self) -> int:
576         """How many idle workers are there currently?"""
577         assert self.lock.locked()
578         return self.worker_count - self.total_in_flight()
579
580     def __repr__(self):
581         assert self.lock.locked()
582         ts = time.time()
583         total_finished = len(self.finished_bundle_timings)
584         total_in_flight = self.total_in_flight()
585         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
586         qall_median = None
587         qall_p95 = None
588         if len(self.finished_bundle_timings) > 1:
589             qall_median = self.finished_bundle_timings.get_median()
590             qall_p95 = self.finished_bundle_timings.get_percentile(95)
591             ret += (
592                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
593                 f'✅={total_finished}/{self.total_bundles_submitted}, '
594                 f'💻n={total_in_flight}/{self.worker_count}\n'
595             )
596         else:
597             ret += (
598                 f'⏱={ts-self.start_time:.1f}s, '
599                 f'✅={total_finished}/{self.total_bundles_submitted}, '
600                 f'💻n={total_in_flight}/{self.worker_count}\n'
601             )
602
603         for worker in self.known_workers:
604             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
605             timings = self.finished_bundle_timings_per_worker.get(
606                 worker, math_utils.NumericPopulation()
607             )
608             count = len(timings)
609             qworker_median = None
610             qworker_p95 = None
611             if count > 1:
612                 qworker_median = timings.get_median()
613                 qworker_p95 = timings.get_percentile(95)
614                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
615             else:
616                 ret += '\n'
617             if count > 0:
618                 ret += f'    ...finished {count} total bundle(s) so far\n'
619             in_flight = len(self.in_flight_bundles_by_worker[worker])
620             if in_flight > 0:
621                 ret += f'    ...{in_flight} bundles currently in flight:\n'
622                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
623                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
624                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
625                     if self.start_per_bundle[bundle_uuid] is not None:
626                         sec = ts - self.start_per_bundle[bundle_uuid]
627                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
628                     else:
629                         ret += f'       {details} setting up / copying data...'
630                         sec = 0.0
631
632                     if qworker_p95 is not None:
633                         if sec > qworker_p95:
634                             ret += f'{bg("red")}>💻p95{reset()} '
635                             if details is not None:
636                                 details.slower_than_local_p95 = True
637                         else:
638                             if details is not None:
639                                 details.slower_than_local_p95 = False
640
641                     if qall_p95 is not None:
642                         if sec > qall_p95:
643                             ret += f'{bg("red")}>∀p95{reset()} '
644                             if details is not None:
645                                 details.slower_than_global_p95 = True
646                         else:
647                             details.slower_than_global_p95 = False
648                     ret += '\n'
649         return ret
650
651     def periodic_dump(self, total_bundles_submitted: int) -> None:
652         assert self.lock.locked()
653         self.total_bundles_submitted = total_bundles_submitted
654         ts = time.time()
655         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
656             print(self)
657             self.last_periodic_dump = ts
658
659
660 class RemoteWorkerSelectionPolicy(ABC):
661     """An interface definition of a policy for selecting a remote worker."""
662
663     def __init__(self):
664         self.workers: Optional[List[RemoteWorkerRecord]] = None
665
666     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
667         self.workers = workers
668
669     @abstractmethod
670     def is_worker_available(self) -> bool:
671         pass
672
673     @abstractmethod
674     def acquire_worker(
675         self, machine_to_avoid: str = None
676     ) -> Optional[RemoteWorkerRecord]:
677         pass
678
679
680 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
681     """A remote worker selector that uses weighted RNG."""
682
683     @overrides
684     def is_worker_available(self) -> bool:
685         if self.workers:
686             for worker in self.workers:
687                 if worker.count > 0:
688                     return True
689         return False
690
691     @overrides
692     def acquire_worker(
693         self, machine_to_avoid: str = None
694     ) -> Optional[RemoteWorkerRecord]:
695         grabbag = []
696         if self.workers:
697             for worker in self.workers:
698                 if worker.machine != machine_to_avoid:
699                     if worker.count > 0:
700                         for _ in range(worker.count * worker.weight):
701                             grabbag.append(worker)
702
703         if len(grabbag) == 0:
704             logger.debug(
705                 'There are no available workers that avoid %s', machine_to_avoid
706             )
707             if self.workers:
708                 for worker in self.workers:
709                     if worker.count > 0:
710                         for _ in range(worker.count * worker.weight):
711                             grabbag.append(worker)
712
713         if len(grabbag) == 0:
714             logger.warning('There are no available workers?!')
715             return None
716
717         worker = random.sample(grabbag, 1)[0]
718         assert worker.count > 0
719         worker.count -= 1
720         logger.debug('Selected worker %s', worker)
721         return worker
722
723
724 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
725     """A remote worker selector that just round robins."""
726
727     def __init__(self) -> None:
728         super().__init__()
729         self.index = 0
730
731     @overrides
732     def is_worker_available(self) -> bool:
733         if self.workers:
734             for worker in self.workers:
735                 if worker.count > 0:
736                     return True
737         return False
738
739     def _increment_index(self, index: int) -> None:
740         if self.workers:
741             index += 1
742             if index >= len(self.workers):
743                 index = 0
744             self.index = index
745
746     @overrides
747     def acquire_worker(
748         self, machine_to_avoid: str = None
749     ) -> Optional[RemoteWorkerRecord]:
750         if self.workers:
751             x = self.index
752             while True:
753                 worker = self.workers[x]
754                 if worker.machine != machine_to_avoid and worker.count > 0:
755                     worker.count -= 1
756                     self._increment_index(x)
757                     logger.debug('Selected worker %s', worker)
758                     return worker
759                 x += 1
760                 if x >= len(self.workers):
761                     x = 0
762                 if x == self.index:
763                     logger.warning('Unexpectedly could not find a worker, retrying...')
764                     return None
765         return None
766
767
768 class RemoteExecutor(BaseExecutor):
769     """An executor that uses processes on remote machines to do work.
770     To do so, it requires that a pool of remote workers to be properly
771     configured.  See instructions in
772     :class:`pyutils.parallelize.parallelize`.
773
774     Each machine in a worker pool has a *weight* and a *count*.  A
775     *weight* captures the relative speed of a processor on that worker
776     and a *count* captures the number of synchronous tasks the worker
777     can accept (i.e. the number of cpus on the machine).
778
779     To dispatch work to a remote machine, this class pickles the code
780     to be executed remotely using `cloudpickle`.  For that to work,
781     the remote machine should be running the same version of Python as
782     this machine, ideally in a virtual environment with the same
783     import libraries installed.  Differences in operating system
784     and/or processor architecture don't seem to matter for most code,
785     though.
786
787     .. warning::
788
789         Mismatches in Python version or in the version numbers of
790         third-party libraries between machines can cause problems
791         when trying to unpickle and run code remotely.
792
793     Work to be dispatched is represented in this code by creating a
794     "bundle".  Each bundle is assigned to a remote worker based on
795     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
796     general, it attempts to load all workers in the pool and maximize
797     throughput.  Once assigned to a remote worker, pickled code is
798     copied to that worker via `scp` and a remote command is issued via
799     `ssh` to execute a :file:`remote_worker.py` process on the remote
800     machine.  This process unpickles the code, runs it, and produces a
801     result which is then copied back to the local machine (again via
802     `scp`) where it can be processed by local code.
803
804     You can and probably must override the path of
805     :file:`remote_worker.py` on your pool machines using the
806     `--remote_worker_helper_path` commandline argument (or by just
807     changing the default in code, see above in this file's code).
808
809     During remote work execution, this local machine acts as a
810     controller dispatching all work to the network, copying pickled
811     tasks out, and copying results back in.  It may also be a worker
812     in the pool but do not underestimate the cost of being a
813     controller -- it takes some cpu and a lot of network bandwidth.
814     The work dispatcher logic attempts to detect when a controller is
815     also a worker and reduce its load.
816
817     Some redundancy and safety provisions are made when scheduling
818     tasks to the worker pool; e.g. slower than expected tasks have
819     redundant backups tasks created, especially if there are otherwise
820     idle workers.  If a task fails repeatedly, the dispatcher consider
821     it poisoned and give up on it.
822
823     .. warning::
824
825         This executor probably only makes sense to use with
826         computationally expensive tasks such as jobs that will execute
827         for ~30 seconds or longer.
828
829         The network overhead and latency of copying work from the
830         controller (local) machine to the remote workers and copying
831         results back again is relatively high.  Especially at startup,
832         the network can become a bottleneck.  Future versions of this
833         code may attempt to split the responsibility of being a
834         controller (distributing work to pool machines).
835
836     Instructions for how to set this up are provided in
837     :class:`pyutils.parallelize.parallelize`.
838
839     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
840
841     """
842
843     def __init__(
844         self,
845         workers: List[RemoteWorkerRecord],
846         policy: RemoteWorkerSelectionPolicy,
847     ) -> None:
848         """
849         Args:
850             workers: A list of remote workers we can call on to do tasks.
851             policy: A policy for selecting remote workers for tasks.
852         """
853
854         super().__init__()
855         self.workers = workers
856         self.policy = policy
857         self.worker_count = 0
858         for worker in self.workers:
859             self.worker_count += worker.count
860         if self.worker_count <= 0:
861             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
862             logger.critical(msg)
863             raise RemoteExecutorException(msg)
864         self.policy.register_worker_pool(self.workers)
865         self.cv = threading.Condition()
866         logger.debug(
867             'Creating %d local threads, one per remote worker.', self.worker_count
868         )
869         self._helper_executor = fut.ThreadPoolExecutor(
870             thread_name_prefix="remote_executor_helper",
871             max_workers=self.worker_count,
872         )
873         self.status = RemoteExecutorStatus(self.worker_count)
874         self.total_bundles_submitted = 0
875         self.backup_lock = threading.Lock()
876         self.last_backup = None
877         (
878             self.heartbeat_thread,
879             self.heartbeat_stop_event,
880         ) = self._run_periodic_heartbeat()
881         self.already_shutdown = False
882
883     @background_thread
884     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
885         """
886         We create a background thread to invoke :meth:`_heartbeat` regularly
887         while we are scheduling work.  It does some accounting such as
888         looking for slow bundles to tag for backup creation, checking for
889         unexpected failures, and printing a fancy message on stdout.
890         """
891         while not stop_event.is_set():
892             time.sleep(5.0)
893             logger.debug('Running periodic heartbeat code...')
894             self._heartbeat()
895         logger.debug('Periodic heartbeat thread shutting down.')
896
897     def _heartbeat(self) -> None:
898         # Note: this is invoked on a background thread, not an
899         # executor thread.  Be careful what you do with it b/c it
900         # needs to get back and dump status again periodically.
901         with self.status.lock:
902             self.status.periodic_dump(self.total_bundles_submitted)
903
904             # Look for bundles to reschedule via executor.submit
905             if config.config['executors_schedule_remote_backups']:
906                 self._maybe_schedule_backup_bundles()
907
908     def _maybe_schedule_backup_bundles(self):
909         """Maybe schedule backup bundles if we see a very slow bundle."""
910
911         assert self.status.lock.locked()
912         num_done = len(self.status.finished_bundle_timings)
913         num_idle_workers = self.worker_count - self.task_count
914         now = time.time()
915         if (
916             num_done >= 2
917             and num_idle_workers > 0
918             and (self.last_backup is None or (now - self.last_backup > 9.0))
919             and self.backup_lock.acquire(blocking=False)
920         ):
921             try:
922                 assert self.backup_lock.locked()
923
924                 bundle_to_backup = None
925                 best_score = None
926                 for (
927                     worker,
928                     bundle_uuids,
929                 ) in self.status.in_flight_bundles_by_worker.items():
930
931                     # Prefer to schedule backups of bundles running on
932                     # slower machines.
933                     base_score = 0
934                     for record in self.workers:
935                         if worker.machine == record.machine:
936                             temp_score = float(record.weight)
937                             temp_score = 1.0 / temp_score
938                             temp_score *= 200.0
939                             base_score = int(temp_score)
940                             break
941
942                     for uuid in bundle_uuids:
943                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
944                         if (
945                             bundle is not None
946                             and bundle.src_bundle is None
947                             and bundle.backup_bundles is not None
948                         ):
949                             score = base_score
950
951                             # Schedule backups of bundles running
952                             # longer; especially those that are
953                             # unexpectedly slow.
954                             start_ts = self.status.start_per_bundle[uuid]
955                             if start_ts is not None:
956                                 runtime = now - start_ts
957                                 score += runtime
958                                 logger.debug(
959                                     'score[%s] => %.1f  # latency boost', bundle, score
960                                 )
961
962                                 if bundle.slower_than_local_p95:
963                                     score += runtime / 2
964                                     logger.debug(
965                                         'score[%s] => %.1f  # >worker p95',
966                                         bundle,
967                                         score,
968                                     )
969
970                                 if bundle.slower_than_global_p95:
971                                     score += runtime / 4
972                                     logger.debug(
973                                         'score[%s] => %.1f  # >global p95',
974                                         bundle,
975                                         score,
976                                     )
977
978                             # Prefer backups of bundles that don't
979                             # have backups already.
980                             backup_count = len(bundle.backup_bundles)
981                             if backup_count == 0:
982                                 score *= 2
983                             elif backup_count == 1:
984                                 score /= 2
985                             elif backup_count == 2:
986                                 score /= 8
987                             else:
988                                 score = 0
989                             logger.debug(
990                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
991                                 bundle,
992                                 score,
993                             )
994
995                             if score != 0 and (
996                                 best_score is None or score > best_score
997                             ):
998                                 bundle_to_backup = bundle
999                                 assert bundle is not None
1000                                 assert bundle.backup_bundles is not None
1001                                 assert bundle.src_bundle is None
1002                                 best_score = score
1003
1004                 # Note: this is all still happening on the heartbeat
1005                 # runner thread.  That's ok because
1006                 # _schedule_backup_for_bundle uses the executor to
1007                 # submit the bundle again which will cause it to be
1008                 # picked up by a worker thread and allow this thread
1009                 # to return to run future heartbeats.
1010                 if bundle_to_backup is not None:
1011                     self.last_backup = now
1012                     logger.info(
1013                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1014                         bundle_to_backup,
1015                         best_score,
1016                     )
1017                     self._schedule_backup_for_bundle(bundle_to_backup)
1018             finally:
1019                 self.backup_lock.release()
1020
1021     def _is_worker_available(self) -> bool:
1022         """Is there a worker available currently?"""
1023         return self.policy.is_worker_available()
1024
1025     def _acquire_worker(
1026         self, machine_to_avoid: str = None
1027     ) -> Optional[RemoteWorkerRecord]:
1028         """Try to acquire a worker."""
1029         return self.policy.acquire_worker(machine_to_avoid)
1030
1031     def _find_available_worker_or_block(
1032         self, machine_to_avoid: str = None
1033     ) -> RemoteWorkerRecord:
1034         """Find a worker or block until one becomes available."""
1035         with self.cv:
1036             while not self._is_worker_available():
1037                 self.cv.wait()
1038             worker = self._acquire_worker(machine_to_avoid)
1039             if worker is not None:
1040                 return worker
1041         msg = "We should never reach this point in the code"
1042         logger.critical(msg)
1043         raise Exception(msg)
1044
1045     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1046         """Release a previously acquired worker."""
1047         worker = bundle.worker
1048         assert worker is not None
1049         logger.debug('Released worker %s', worker)
1050         self.status.record_release_worker(
1051             worker,
1052             bundle.uuid,
1053             was_cancelled,
1054         )
1055         with self.cv:
1056             worker.count += 1
1057             self.cv.notify()
1058         self.adjust_task_count(-1)
1059
1060     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1061         """See if a particular bundle is cancelled.  Do not block."""
1062         with self.status.lock:
1063             if bundle.is_cancelled.wait(timeout=0.0):
1064                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1065                 bundle.was_cancelled = True
1066                 return True
1067         return False
1068
1069     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1070         """Find a worker for bundle or block until one is available."""
1071
1072         self.adjust_task_count(+1)
1073         uuid = bundle.uuid
1074         controller = bundle.controller
1075         avoid_machine = override_avoid_machine
1076         is_original = bundle.src_bundle is None
1077
1078         # Try not to schedule a backup on the same host as the original.
1079         if avoid_machine is None and bundle.src_bundle is not None:
1080             avoid_machine = bundle.src_bundle.machine
1081         worker = None
1082         while worker is None:
1083             worker = self._find_available_worker_or_block(avoid_machine)
1084         assert worker is not None
1085
1086         # Ok, found a worker.
1087         bundle.worker = worker
1088         machine = bundle.machine = worker.machine
1089         username = bundle.username = worker.username
1090         self.status.record_acquire_worker(worker, uuid)
1091         logger.debug('%s: Running bundle on %s...', bundle, worker)
1092
1093         # Before we do any work, make sure the bundle is still viable.
1094         # It may have been some time between when it was submitted and
1095         # now due to lack of worker availability and someone else may
1096         # have already finished it.
1097         if self._check_if_cancelled(bundle):
1098             try:
1099                 return self._process_work_result(bundle)
1100             except Exception:
1101                 logger.warning(
1102                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1103                 )
1104                 self._release_worker(bundle)
1105                 if is_original:
1106                     # Weird.  We are the original owner of this
1107                     # bundle.  For it to have been cancelled, a backup
1108                     # must have already started and completed before
1109                     # we even for started.  Moreover, the backup says
1110                     # it is done but we can't find the results it
1111                     # should have copied over.  Reschedule the whole
1112                     # thing.
1113                     logger.exception(
1114                         '%s: We are the original owner thread and yet there are '
1115                         'no results for this bundle.  This is unexpected and bad. '
1116                         'Attempting an emergency retry...',
1117                         bundle,
1118                     )
1119                     return self._emergency_retry_nasty_bundle(bundle)
1120                 else:
1121                     # We're a backup and our bundle is cancelled
1122                     # before we even got started.  Do nothing and let
1123                     # the original bundle's thread worry about either
1124                     # finding the results or complaining about it.
1125                     return None
1126
1127         # Send input code / data to worker machine if it's not local.
1128         if controller not in machine:
1129             try:
1130                 cmd = (
1131                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1132                 )
1133                 start_ts = time.time()
1134                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1135                 run_silently(cmd)
1136                 xfer_latency = time.time() - start_ts
1137                 logger.debug(
1138                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1139                 )
1140             except Exception:
1141                 self._release_worker(bundle)
1142                 if is_original:
1143                     # Weird.  We tried to copy the code to the worker
1144                     # and it failed...  And we're the original bundle.
1145                     # We have to retry.
1146                     logger.exception(
1147                         "%s: Failed to send instructions to the worker machine?! "
1148                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1149                         "be a race condition.  Attempting an emergency retry...",
1150                         bundle,
1151                     )
1152                     return self._emergency_retry_nasty_bundle(bundle)
1153                 else:
1154                     # This is actually expected; we're a backup.
1155                     # There's a race condition where someone else
1156                     # already finished the work and removed the source
1157                     # code_file before we could copy it.  Ignore.
1158                     logger.warning(
1159                         '%s: Failed to send instructions to the worker machine... '
1160                         'We\'re a backup and this may be caused by the original (or '
1161                         'some other backup) already finishing this work.  Ignoring.',
1162                         bundle,
1163                     )
1164                     return None
1165
1166         # Kick off the work.  Note that if this fails we let
1167         # _wait_for_process deal with it.
1168         self.status.record_processing_began(uuid)
1169         helper_path = config.config['remote_worker_helper_path']
1170         cmd = (
1171             f'{SSH} {bundle.username}@{bundle.machine} '
1172             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1173         )
1174         logger.debug(
1175             '%s: Executing %s in the background to kick off work...', bundle, cmd
1176         )
1177         p = cmd_in_background(cmd, silent=True)
1178         bundle.pid = p.pid
1179         logger.debug(
1180             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1181         )
1182         return self._wait_for_process(p, bundle, 0)
1183
1184     def _wait_for_process(
1185         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1186     ) -> Any:
1187         """At this point we've copied the bundle's pickled code to the remote
1188         worker and started an ssh process that should be invoking the
1189         remote worker to have it execute the user's code.  See how
1190         that's going and wait for it to complete or fail.  Note that
1191         this code is recursive: there are codepaths where we decide to
1192         stop waiting for an ssh process (because another backup seems
1193         to have finished) but then fail to fetch or parse the results
1194         from that backup and thus call ourselves to continue waiting
1195         on an active ssh process.  This is the purpose of the depth
1196         argument: to curtail potential infinite recursion by giving up
1197         eventually.
1198
1199         Args:
1200             p: the Popen record of the ssh job
1201             bundle: the bundle of work being executed remotely
1202             depth: how many retries we've made so far.  Starts at zero.
1203
1204         """
1205
1206         machine = bundle.machine
1207         assert p is not None
1208         pid = p.pid  # pid of the ssh process
1209         if depth > 3:
1210             logger.error(
1211                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1212                 pid,
1213             )
1214             p.terminate()
1215             self._release_worker(bundle)
1216             return self._emergency_retry_nasty_bundle(bundle)
1217
1218         # Spin until either the ssh job we scheduled finishes the
1219         # bundle or some backup worker signals that they finished it
1220         # before we could.
1221         while True:
1222             try:
1223                 p.wait(timeout=0.25)
1224             except subprocess.TimeoutExpired:
1225                 if self._check_if_cancelled(bundle):
1226                     logger.info(
1227                         '%s: looks like another worker finished bundle...', bundle
1228                     )
1229                     break
1230             else:
1231                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1232                 p = None
1233                 break
1234
1235         # If we get here we believe the bundle is done; either the ssh
1236         # subprocess finished (hopefully successfully) or we noticed
1237         # that some other worker seems to have completed the bundle
1238         # before us and we're bailing out.
1239         try:
1240             ret = self._process_work_result(bundle)
1241             if ret is not None and p is not None:
1242                 p.terminate()
1243             return ret
1244
1245         # Something went wrong; e.g. we could not copy the results
1246         # back, cleanup after ourselves on the remote machine, or
1247         # unpickle the results we got from the remove machine.  If we
1248         # still have an active ssh subprocess, keep waiting on it.
1249         # Otherwise, time for an emergency reschedule.
1250         except Exception:
1251             logger.exception('%s: Something unexpected just happened...', bundle)
1252             if p is not None:
1253                 logger.warning(
1254                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1255                     bundle,
1256                 )
1257                 return self._wait_for_process(p, bundle, depth + 1)
1258             else:
1259                 self._release_worker(bundle)
1260                 return self._emergency_retry_nasty_bundle(bundle)
1261
1262     def _process_work_result(self, bundle: BundleDetails) -> Any:
1263         """A bundle seems to be completed.  Check on the results."""
1264
1265         with self.status.lock:
1266             is_original = bundle.src_bundle is None
1267             was_cancelled = bundle.was_cancelled
1268             username = bundle.username
1269             machine = bundle.machine
1270             result_file = bundle.result_file
1271             code_file = bundle.code_file
1272
1273             # Whether original or backup, if we finished first we must
1274             # fetch the results if the computation happened on a
1275             # remote machine.
1276             bundle.end_ts = time.time()
1277             if not was_cancelled:
1278                 assert bundle.machine is not None
1279                 if bundle.controller not in bundle.machine:
1280                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1281                     logger.info(
1282                         "%s: Fetching results back from %s@%s via %s",
1283                         bundle,
1284                         username,
1285                         machine,
1286                         cmd,
1287                     )
1288
1289                     # If either of these throw they are handled in
1290                     # _wait_for_process.
1291                     attempts = 0
1292                     while True:
1293                         try:
1294                             run_silently(cmd)
1295                         except Exception as e:
1296                             attempts += 1
1297                             if attempts >= 3:
1298                                 raise e
1299                         else:
1300                             break
1301
1302                     # Cleanup remote /tmp files.
1303                     run_silently(
1304                         f'{SSH} {username}@{machine}'
1305                         f' "/bin/rm -f {code_file} {result_file}"'
1306                     )
1307                     logger.debug(
1308                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1309                     )
1310                 dur = bundle.end_ts - bundle.start_ts
1311                 self.histogram.add_item(dur)
1312
1313         # Only the original worker should unpickle the file contents
1314         # though since it's the only one whose result matters.  The
1315         # original is also the only job that may delete result_file
1316         # from disk.  Note that the original may have been cancelled
1317         # if one of the backups finished first; it still must read the
1318         # result from disk.  It still does that here with is_cancelled
1319         # set.
1320         if is_original:
1321             logger.debug("%s: Unpickling %s.", bundle, result_file)
1322             try:
1323                 with open(result_file, 'rb') as rb:
1324                     serialized = rb.read()
1325                 result = cloudpickle.loads(serialized)
1326             except Exception as e:
1327                 logger.exception('Failed to load %s... this is bad news.', result_file)
1328                 self._release_worker(bundle)
1329
1330                 # Re-raise the exception; the code in _wait_for_process may
1331                 # decide to _emergency_retry_nasty_bundle here.
1332                 raise e
1333             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1334             os.remove(result_file)
1335             os.remove(code_file)
1336
1337             # Notify any backups that the original is done so they
1338             # should stop ASAP.  Do this whether or not we
1339             # finished first since there could be more than one
1340             # backup.
1341             if bundle.backup_bundles is not None:
1342                 for backup in bundle.backup_bundles:
1343                     logger.debug(
1344                         '%s: Notifying backup %s that it\'s cancelled',
1345                         bundle,
1346                         backup.uuid,
1347                     )
1348                     backup.is_cancelled.set()
1349
1350         # This is a backup job and, by now, we have already fetched
1351         # the bundle results.
1352         else:
1353             # Backup results don't matter, they just need to leave the
1354             # result file in the right place for their originals to
1355             # read/unpickle later.
1356             result = None
1357
1358             # Tell the original to stop if we finished first.
1359             if not was_cancelled:
1360                 orig_bundle = bundle.src_bundle
1361                 assert orig_bundle is not None
1362                 logger.debug(
1363                     '%s: Notifying original %s we beat them to it.',
1364                     bundle,
1365                     orig_bundle.uuid,
1366                 )
1367                 orig_bundle.is_cancelled.set()
1368         self._release_worker(bundle, was_cancelled=was_cancelled)
1369         return result
1370
1371     def _create_original_bundle(self, pickle, function_name: str):
1372         """Creates a bundle that is not a backup of any other bundle but
1373         rather represents a user task.
1374         """
1375
1376         uuid = string_utils.generate_uuid(omit_dashes=True)
1377         code_file = f'/tmp/{uuid}.code.bin'
1378         result_file = f'/tmp/{uuid}.result.bin'
1379
1380         logger.debug('Writing pickled code to %s', code_file)
1381         with open(code_file, 'wb') as wb:
1382             wb.write(pickle)
1383
1384         bundle = BundleDetails(
1385             pickled_code=pickle,
1386             uuid=uuid,
1387             function_name=function_name,
1388             worker=None,
1389             username=None,
1390             machine=None,
1391             controller=platform.node(),
1392             code_file=code_file,
1393             result_file=result_file,
1394             pid=0,
1395             start_ts=time.time(),
1396             end_ts=0.0,
1397             slower_than_local_p95=False,
1398             slower_than_global_p95=False,
1399             src_bundle=None,
1400             is_cancelled=threading.Event(),
1401             was_cancelled=False,
1402             backup_bundles=[],
1403             failure_count=0,
1404         )
1405         self.status.record_bundle_details(bundle)
1406         logger.debug('%s: Created an original bundle', bundle)
1407         return bundle
1408
1409     def _create_backup_bundle(self, src_bundle: BundleDetails):
1410         """Creates a bundle that is a backup of another bundle that is
1411         running too slowly."""
1412
1413         assert self.status.lock.locked()
1414         assert src_bundle.backup_bundles is not None
1415         n = len(src_bundle.backup_bundles)
1416         uuid = src_bundle.uuid + f'_backup#{n}'
1417
1418         backup_bundle = BundleDetails(
1419             pickled_code=src_bundle.pickled_code,
1420             uuid=uuid,
1421             function_name=src_bundle.function_name,
1422             worker=None,
1423             username=None,
1424             machine=None,
1425             controller=src_bundle.controller,
1426             code_file=src_bundle.code_file,
1427             result_file=src_bundle.result_file,
1428             pid=0,
1429             start_ts=time.time(),
1430             end_ts=0.0,
1431             slower_than_local_p95=False,
1432             slower_than_global_p95=False,
1433             src_bundle=src_bundle,
1434             is_cancelled=threading.Event(),
1435             was_cancelled=False,
1436             backup_bundles=None,  # backup backups not allowed
1437             failure_count=0,
1438         )
1439         src_bundle.backup_bundles.append(backup_bundle)
1440         self.status.record_bundle_details_already_locked(backup_bundle)
1441         logger.debug('%s: Created a backup bundle', backup_bundle)
1442         return backup_bundle
1443
1444     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1445         """Schedule a backup of src_bundle."""
1446
1447         assert self.status.lock.locked()
1448         assert src_bundle is not None
1449         backup_bundle = self._create_backup_bundle(src_bundle)
1450         logger.debug(
1451             '%s/%s: Scheduling backup for execution...',
1452             backup_bundle.uuid,
1453             backup_bundle.function_name,
1454         )
1455         self._helper_executor.submit(self._launch, backup_bundle)
1456
1457         # Results from backups don't matter; if they finish first
1458         # they will move the result_file to this machine and let
1459         # the original pick them up and unpickle them (and return
1460         # a result).
1461
1462     def _emergency_retry_nasty_bundle(
1463         self, bundle: BundleDetails
1464     ) -> Optional[fut.Future]:
1465         """Something unexpectedly failed with bundle.  Either retry it
1466         from the beginning or throw in the towel and give up on it."""
1467
1468         is_original = bundle.src_bundle is None
1469         bundle.worker = None
1470         avoid_last_machine = bundle.machine
1471         bundle.machine = None
1472         bundle.username = None
1473         bundle.failure_count += 1
1474         if is_original:
1475             retry_limit = 3
1476         else:
1477             retry_limit = 2
1478
1479         if bundle.failure_count > retry_limit:
1480             logger.error(
1481                 '%s: Tried this bundle too many times already (%dx); giving up.',
1482                 bundle,
1483                 retry_limit,
1484             )
1485             if is_original:
1486                 raise RemoteExecutorException(
1487                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1488                 )
1489             logger.error(
1490                 '%s: At least it\'s only a backup; better luck with the others.',
1491                 bundle,
1492             )
1493             return None
1494         else:
1495             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1496             logger.warning(msg)
1497             warnings.warn(msg)
1498             return self._launch(bundle, avoid_last_machine)
1499
1500     @overrides
1501     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502         """Submit work to be done.  This is the user entry point of this
1503         class."""
1504         if self.already_shutdown:
1505             raise Exception('Submitted work after shutdown.')
1506         pickle = _make_cloud_pickle(function, *args, **kwargs)
1507         bundle = self._create_original_bundle(pickle, function.__name__)
1508         self.total_bundles_submitted += 1
1509         return self._helper_executor.submit(self._launch, bundle)
1510
1511     @overrides
1512     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513         """Shutdown the executor."""
1514         if not self.already_shutdown:
1515             logging.debug('Shutting down RemoteExecutor %s', self.title)
1516             self.heartbeat_stop_event.set()
1517             self.heartbeat_thread.join()
1518             self._helper_executor.shutdown(wait)
1519             if not quiet:
1520                 print(self.histogram.__repr__(label_formatter='%ds'))
1521             self.already_shutdown = True
1522
1523
1524 class RemoteWorkerPoolProvider:
1525     @abstractmethod
1526     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1527         pass
1528
1529
1530 @persistent.persistent_autoloaded_singleton()  # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1533 ):
1534     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535         self.remote_worker_pool: List[RemoteWorkerRecord] = []
1536         for record in json_remote_worker_pool['remote_worker_records']:
1537             self.remote_worker_pool.append(
1538                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1539             )
1540         assert len(self.remote_worker_pool) > 0
1541
1542     @overrides
1543     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544         return self.remote_worker_pool
1545
1546     @overrides
1547     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548         return self.remote_worker_pool
1549
1550     @staticmethod
1551     @overrides
1552     def get_filename() -> str:
1553         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1554
1555     @staticmethod
1556     @overrides
1557     def should_we_load_data(filename: str) -> bool:
1558         return True
1559
1560     @staticmethod
1561     @overrides
1562     def should_we_save_data(filename: str) -> bool:
1563         return False
1564
1565
1566 @singleton
1567 class DefaultExecutors(object):
1568     """A container for a default thread, process and remote executor.
1569     These are not created until needed and we take care to clean up
1570     before process exit automatically for the caller's convenience.
1571     Instead of creating your own executor, consider using the one
1572     from this pool.  e.g.::
1573
1574         @par.parallelize(method=par.Method.PROCESS)
1575         def do_work(
1576             solutions: List[Work],
1577             shard_num: int,
1578             ...
1579         ):
1580             <do the work>
1581
1582
1583         def start_do_work(all_work: List[Work]):
1584             shards = []
1585             logger.debug('Sharding work into groups of 10.')
1586             for subset in list_utils.shard(all_work, 10):
1587                 shards.append([x for x in subset])
1588
1589             logger.debug('Kicking off helper pool.')
1590             try:
1591                 for n, shard in enumerate(shards):
1592                     results.append(
1593                         do_work(
1594                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1595                         )
1596                     )
1597                 smart_future.wait_all(results)
1598             finally:
1599                 # Note: if you forget to do this it will clean itself up
1600                 # during program termination including tearing down any
1601                 # active ssh connections.
1602                 executors.DefaultExecutors().process_pool().shutdown()
1603     """
1604
1605     def __init__(self):
1606         self.thread_executor: Optional[ThreadExecutor] = None
1607         self.process_executor: Optional[ProcessExecutor] = None
1608         self.remote_executor: Optional[RemoteExecutor] = None
1609
1610     @staticmethod
1611     def _ping(host) -> bool:
1612         logger.debug('RUN> ping -c 1 %s', host)
1613         try:
1614             x = cmd_exitcode(
1615                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1616             )
1617             return x == 0
1618         except Exception:
1619             return False
1620
1621     def thread_pool(self) -> ThreadExecutor:
1622         if self.thread_executor is None:
1623             self.thread_executor = ThreadExecutor()
1624         return self.thread_executor
1625
1626     def process_pool(self) -> ProcessExecutor:
1627         if self.process_executor is None:
1628             self.process_executor = ProcessExecutor()
1629         return self.process_executor
1630
1631     def remote_pool(self) -> RemoteExecutor:
1632         if self.remote_executor is None:
1633             logger.info('Looking for some helper machines...')
1634             provider = ConfigRemoteWorkerPoolProvider()
1635             all_machines = provider.get_remote_workers()
1636             pool = []
1637
1638             # Make sure we can ping each machine.
1639             for record in all_machines:
1640                 if self._ping(record.machine):
1641                     logger.info('%s is alive / responding to pings', record.machine)
1642                     pool.append(record)
1643
1644             # The controller machine has a lot to do; go easy on it.
1645             for record in pool:
1646                 if record.machine == platform.node() and record.count > 1:
1647                     logger.info('Reducing workload for %s.', record.machine)
1648                     record.count = max(int(record.count / 2), 1)
1649
1650             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651             policy.register_worker_pool(pool)
1652             self.remote_executor = RemoteExecutor(pool, policy)
1653         return self.remote_executor
1654
1655     def shutdown(self) -> None:
1656         if self.thread_executor is not None:
1657             self.thread_executor.shutdown(wait=True, quiet=True)
1658             self.thread_executor = None
1659         if self.process_executor is not None:
1660             self.process_executor.shutdown(wait=True, quiet=True)
1661             self.process_executor = None
1662         if self.remote_executor is not None:
1663             self.remote_executor.shutdown(wait=True, quiet=True)
1664             self.remote_executor = None