Bug and readability fix.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """
7 This module defines a :class:`BaseExecutor` interface and three
8 implementations:
9
10     - :class:`ThreadExecutor`
11     - :class:`ProcessExecutor`
12     - :class:`RemoteExecutor`
13
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work.  Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code.  But it's good for
18 work that is mostly I/O bound.
19
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
22 workloads.
23
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network.  You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
29
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed.  It has the added benefit of being automatically
33 cleaned up at process termination time.
34
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
37 """
38
39 from __future__ import annotations
40
41 import concurrent.futures as fut
42 import logging
43 import os
44 import platform
45 import random
46 import subprocess
47 import threading
48 import time
49 import warnings
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
54
55 import cloudpickle  # type: ignore
56 from overrides import overrides
57
58 import pyutils.types.histogram as hist
59 from pyutils import (
60     argparse_utils,
61     config,
62     dataclass_utils,
63     math_utils,
64     persistent,
65     string_utils,
66 )
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
72
73 logger = logging.getLogger(__name__)
74
75 parser = config.add_commandline_args(
76     f"Executors ({__file__})", "Args related to processing executors."
77 )
78 parser.add_argument(
79     '--executors_threadpool_size',
80     type=int,
81     metavar='#THREADS',
82     help='Number of threads in the default threadpool, leave unset for default',
83     default=None,
84 )
85 parser.add_argument(
86     '--executors_processpool_size',
87     type=int,
88     metavar='#PROCESSES',
89     help='Number of processes in the default processpool, leave unset for default',
90     default=None,
91 )
92 parser.add_argument(
93     '--executors_schedule_remote_backups',
94     default=True,
95     action=argparse_utils.ActionNoYes,
96     help='Should we schedule duplicative backup work if a remote bundle is slow',
97 )
98 parser.add_argument(
99     '--executors_max_bundle_failures',
100     type=int,
101     default=3,
102     metavar='#FAILURES',
103     help='Maximum number of failures before giving up on a bundle',
104 )
105 parser.add_argument(
106     '--remote_worker_records_file',
107     type=str,
108     metavar='FILENAME',
109     help='Path of the remote worker records file (JSON)',
110     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
111 )
112 parser.add_argument(
113     '--remote_worker_helper_path',
114     type=str,
115     metavar='PATH_TO_REMOTE_WORKER_PY',
116     help='Path to remote_worker.py on remote machines',
117     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
118 )
119
120
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
123
124
125 def _make_cloud_pickle(fun, *args, **kwargs):
126     """Internal helper to create cloud pickles."""
127     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128     return cloudpickle.dumps((fun, args, kwargs))
129
130
131 class BaseExecutor(ABC):
132     """The base executor interface definition.  The interface for
133     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134     :class:`ThreadExecutor`.
135     """
136
137     def __init__(self, *, title=''):
138         """
139         Args:
140             title: the name of this executor.
141         """
142         self.title = title
143         self.histogram = hist.SimpleHistogram(
144             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
145         )
146         self.task_count = 0
147
148     @abstractmethod
149     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150         """Submit work for the executor to do.
151
152         Args:
153             function: the Callable to be executed.
154             *args: the arguments to function
155             **kwargs: the arguments to function
156
157         Returns:
158             A concurrent :class:`Future` representing the result of the
159             work.
160         """
161         pass
162
163     @abstractmethod
164     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165         """Shutdown the executor.
166
167         Args:
168             wait: wait for the shutdown to complete before returning?
169             quiet: keep it quiet, please.
170         """
171         pass
172
173     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174         """Shutdown the executor and return True if the executor is idle
175         (i.e. there are no pending or active tasks).  Return False
176         otherwise.  Note: this should only be called by the launcher
177         process.
178
179         Args:
180             quiet: keep it quiet, please.
181
182         Returns:
183             True if the executor could be shut down because it has no
184             pending work, False otherwise.
185         """
186         if self.task_count == 0:
187             self.shutdown(wait=True, quiet=quiet)
188             return True
189         return False
190
191     def adjust_task_count(self, delta: int) -> None:
192         """Change the task count.  Note: do not call this method from a
193         worker, it should only be called by the launcher process /
194         thread / machine.
195
196         Args:
197             delta: the delta value by which to adjust task count.
198         """
199         self.task_count += delta
200         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
201
202     def get_task_count(self) -> int:
203         """Change the task count.  Note: do not call this method from a
204         worker, it should only be called by the launcher process /
205         thread / machine.
206
207         Returns:
208             The executor's current task count.
209         """
210         return self.task_count
211
212
213 class ThreadExecutor(BaseExecutor):
214     """A threadpool executor.  This executor uses Python threads to
215     schedule tasks.  Note that, at least as of python3.10, because of
216     the global lock in the interpreter itself, these do not
217     parallelize very well so this class is useful mostly for non-CPU
218     intensive tasks.
219
220     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
221     """
222
223     def __init__(self, max_workers: Optional[int] = None):
224         """
225         Args:
226             max_workers: maximum number of threads to create in the pool.
227         """
228         super().__init__()
229         workers = None
230         if max_workers is not None:
231             workers = max_workers
232         elif 'executors_threadpool_size' in config.config:
233             workers = config.config['executors_threadpool_size']
234         if workers is not None:
235             logger.debug('Creating threadpool executor with %d workers', workers)
236         else:
237             logger.debug('Creating a default sized threadpool executor')
238         self._thread_pool_executor = fut.ThreadPoolExecutor(
239             max_workers=workers, thread_name_prefix="thread_executor_helper"
240         )
241         self.already_shutdown = False
242
243     # This is run on a different thread; do not adjust task count here.
244     @staticmethod
245     def _run_local_bundle(fun, *args, **kwargs):
246         logger.debug("Running local bundle at %s", fun.__name__)
247         result = fun(*args, **kwargs)
248         return result
249
250     @overrides
251     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252         if self.already_shutdown:
253             raise Exception('Submitted work after shutdown.')
254         self.adjust_task_count(+1)
255         newargs = []
256         newargs.append(function)
257         for arg in args:
258             newargs.append(arg)
259         start = time.time()
260         result = self._thread_pool_executor.submit(
261             ThreadExecutor._run_local_bundle, *newargs, **kwargs
262         )
263         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264         result.add_done_callback(lambda _: self.adjust_task_count(-1))
265         return result
266
267     @overrides
268     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269         if not self.already_shutdown:
270             logger.debug('Shutting down threadpool executor %s', self.title)
271             self._thread_pool_executor.shutdown(wait)
272             if not quiet:
273                 print(self.histogram.__repr__(label_formatter='%ds'))
274             self.already_shutdown = True
275
276
277 class ProcessExecutor(BaseExecutor):
278     """An executor which runs tasks in child processes.
279
280     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
281     """
282
283     def __init__(self, max_workers=None):
284         """
285         Args:
286             max_workers: the max number of worker processes to create.
287         """
288         super().__init__()
289         workers = None
290         if max_workers is not None:
291             workers = max_workers
292         elif 'executors_processpool_size' in config.config:
293             workers = config.config['executors_processpool_size']
294         if workers is not None:
295             logger.debug('Creating processpool executor with %d workers.', workers)
296         else:
297             logger.debug('Creating a default sized processpool executor')
298         self._process_executor = fut.ProcessPoolExecutor(
299             max_workers=workers,
300         )
301         self.already_shutdown = False
302
303     # This is run in another process; do not adjust task count here.
304     @staticmethod
305     def _run_cloud_pickle(pickle):
306         fun, args, kwargs = cloudpickle.loads(pickle)
307         logger.debug("Running pickled bundle at %s", fun.__name__)
308         result = fun(*args, **kwargs)
309         return result
310
311     @overrides
312     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313         if self.already_shutdown:
314             raise Exception('Submitted work after shutdown.')
315         start = time.time()
316         self.adjust_task_count(+1)
317         pickle = _make_cloud_pickle(function, *args, **kwargs)
318         result = self._process_executor.submit(
319             ProcessExecutor._run_cloud_pickle, pickle
320         )
321         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322         result.add_done_callback(lambda _: self.adjust_task_count(-1))
323         return result
324
325     @overrides
326     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327         if not self.already_shutdown:
328             logger.debug('Shutting down processpool executor %s', self.title)
329             self._process_executor.shutdown(wait)
330             if not quiet:
331                 print(self.histogram.__repr__(label_formatter='%ds'))
332             self.already_shutdown = True
333
334     def __getstate__(self):
335         state = self.__dict__.copy()
336         state['_process_executor'] = None
337         return state
338
339
340 class RemoteExecutorException(Exception):
341     """Thrown when a bundle cannot be executed despite several retries."""
342
343     pass
344
345
346 @dataclass
347 class RemoteWorkerRecord:
348     """A record of info about a remote worker."""
349
350     username: str
351     """Username we can ssh into on this machine to run work."""
352
353     machine: str
354     """Machine address / name."""
355
356     weight: int
357     """Relative probability for the weighted policy to select this
358     machine for scheduling work."""
359
360     count: int
361     """If this machine is selected, what is the maximum number of task
362     that it can handle?"""
363
364     def __hash__(self):
365         return hash((self.username, self.machine))
366
367     def __repr__(self):
368         return f'{self.username}@{self.machine}'
369
370
371 @dataclass
372 class BundleDetails:
373     """All info necessary to define some unit of work that needs to be
374     done, where it is being run, its state, whether it is an original
375     bundle of a backup bundle, how many times it has failed, etc...
376     """
377
378     pickled_code: bytes
379     """The code to run, cloud pickled"""
380
381     uuid: str
382     """A unique identifier"""
383
384     function_name: str
385     """The name of the function we pickled"""
386
387     worker: Optional[RemoteWorkerRecord]
388     """The remote worker running this bundle or None if none (yet)"""
389
390     username: Optional[str]
391     """The remote username running this bundle or None if none (yet)"""
392
393     machine: Optional[str]
394     """The remote machine running this bundle or None if none (yet)"""
395
396     controller: str
397     """The controller machine"""
398
399     code_file: str
400     """A unique filename to hold the work to be done"""
401
402     result_file: str
403     """Where the results should be placed / read from"""
404
405     pid: int
406     """The process id of the local subprocess watching the ssh connection
407     to the remote machine"""
408
409     start_ts: float
410     """Starting time"""
411
412     end_ts: float
413     """Ending time"""
414
415     slower_than_local_p95: bool
416     """Currently slower then 95% of other bundles on remote host"""
417
418     slower_than_global_p95: bool
419     """Currently slower than 95% of other bundles globally"""
420
421     src_bundle: Optional[BundleDetails]
422     """If this is a backup bundle, this points to the original bundle
423     that it's backing up.  None otherwise."""
424
425     is_cancelled: threading.Event
426     """An event that can be signaled to indicate this bundle is cancelled.
427     This is set when another copy (backup or original) of this work has
428     completed successfully elsewhere."""
429
430     was_cancelled: bool
431     """True if this bundle was cancelled, False if it finished normally"""
432
433     backup_bundles: Optional[List[BundleDetails]]
434     """If we've created backups of this bundle, this is the list of them"""
435
436     failure_count: int
437     """How many times has this bundle failed already?"""
438
439     def __repr__(self):
440         uuid = self.uuid
441         if uuid[-9:-2] == '_backup':
442             uuid = uuid[:-9]
443             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
444         else:
445             suffix = uuid[-6:]
446
447         # We colorize the uuid based on some bits from it to make them
448         # stand out in the logging and help a reader correlate log messages
449         # related to the same bundle.
450         colorz = [
451             fg('violet red'),
452             fg('red'),
453             fg('orange'),
454             fg('peach orange'),
455             fg('yellow'),
456             fg('marigold yellow'),
457             fg('green yellow'),
458             fg('tea green'),
459             fg('cornflower blue'),
460             fg('turquoise blue'),
461             fg('tropical blue'),
462             fg('lavender purple'),
463             fg('medium purple'),
464         ]
465         c = colorz[int(uuid[-2:], 16) % len(colorz)]
466         function_name = (
467             self.function_name if self.function_name is not None else 'nofname'
468         )
469         machine = self.machine if self.machine is not None else 'nomachine'
470         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
471
472
473 class RemoteExecutorStatus:
474     """A status 'scoreboard' for a remote executor tracking various
475     metrics and able to render a periodic dump of global state.
476     """
477
478     def __init__(self, total_worker_count: int) -> None:
479         """
480         Args:
481             total_worker_count: number of workers in the pool
482         """
483         self.worker_count: int = total_worker_count
484         self.known_workers: Set[RemoteWorkerRecord] = set()
485         self.start_time: float = time.time()
486         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487         self.end_per_bundle: Dict[str, float] = defaultdict(float)
488         self.finished_bundle_timings_per_worker: Dict[
489             RemoteWorkerRecord, math_utils.NumericPopulation
490         ] = {}
491         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493         self.finished_bundle_timings: math_utils.NumericPopulation = (
494             math_utils.NumericPopulation()
495         )
496         self.last_periodic_dump: Optional[float] = None
497         self.total_bundles_submitted: int = 0
498
499         # Protects reads and modification using self.  Also used
500         # as a memory fence for modifications to bundle.
501         self.lock: threading.Lock = threading.Lock()
502
503     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504         """Record that bundle with uuid is assigned to a particular worker.
505
506         Args:
507             worker: the record of the worker to which uuid is assigned
508             uuid: the uuid of a bundle that has been assigned to a worker
509         """
510         with self.lock:
511             self.record_acquire_worker_already_locked(worker, uuid)
512
513     def record_acquire_worker_already_locked(
514         self, worker: RemoteWorkerRecord, uuid: str
515     ) -> None:
516         """Same as above but an entry point that doesn't acquire the lock
517         for codepaths where it's already held."""
518         assert self.lock.locked()
519         self.known_workers.add(worker)
520         self.start_per_bundle[uuid] = None
521         x = self.in_flight_bundles_by_worker.get(worker, set())
522         x.add(uuid)
523         self.in_flight_bundles_by_worker[worker] = x
524
525     def record_bundle_details(self, details: BundleDetails) -> None:
526         """Register the details about a bundle of work."""
527         with self.lock:
528             self.record_bundle_details_already_locked(details)
529
530     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531         """Same as above but for codepaths that already hold the lock."""
532         assert self.lock.locked()
533         self.bundle_details_by_uuid[details.uuid] = details
534
535     def record_release_worker(
536         self,
537         worker: RemoteWorkerRecord,
538         uuid: str,
539         was_cancelled: bool,
540     ) -> None:
541         """Record that a bundle has released a worker."""
542         with self.lock:
543             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
544
545     def record_release_worker_already_locked(
546         self,
547         worker: RemoteWorkerRecord,
548         uuid: str,
549         was_cancelled: bool,
550     ) -> None:
551         """Same as above but for codepaths that already hold the lock."""
552         assert self.lock.locked()
553         ts = time.time()
554         self.end_per_bundle[uuid] = ts
555         self.in_flight_bundles_by_worker[worker].remove(uuid)
556         if not was_cancelled:
557             start = self.start_per_bundle[uuid]
558             assert start is not None
559             bundle_latency = ts - start
560             x = self.finished_bundle_timings_per_worker.get(
561                 worker, math_utils.NumericPopulation()
562             )
563             x.add_number(bundle_latency)
564             self.finished_bundle_timings_per_worker[worker] = x
565             self.finished_bundle_timings.add_number(bundle_latency)
566
567     def record_processing_began(self, uuid: str):
568         """Record when work on a bundle begins."""
569         with self.lock:
570             self.start_per_bundle[uuid] = time.time()
571
572     def total_in_flight(self) -> int:
573         """How many bundles are in flight currently?"""
574         assert self.lock.locked()
575         total_in_flight = 0
576         for worker in self.known_workers:
577             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578         return total_in_flight
579
580     def total_idle(self) -> int:
581         """How many idle workers are there currently?"""
582         assert self.lock.locked()
583         return self.worker_count - self.total_in_flight()
584
585     def __repr__(self):
586         assert self.lock.locked()
587         ts = time.time()
588         total_finished = len(self.finished_bundle_timings)
589         total_in_flight = self.total_in_flight()
590         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
591         qall_median = None
592         qall_p95 = None
593         if len(self.finished_bundle_timings) > 1:
594             qall_median = self.finished_bundle_timings.get_median()
595             qall_p95 = self.finished_bundle_timings.get_percentile(95)
596             ret += (
597                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
598                 f'✅={total_finished}/{self.total_bundles_submitted}, '
599                 f'💻n={total_in_flight}/{self.worker_count}\n'
600             )
601         else:
602             ret += (
603                 f'⏱={ts-self.start_time:.1f}s, '
604                 f'✅={total_finished}/{self.total_bundles_submitted}, '
605                 f'💻n={total_in_flight}/{self.worker_count}\n'
606             )
607
608         for worker in self.known_workers:
609             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
610             timings = self.finished_bundle_timings_per_worker.get(
611                 worker, math_utils.NumericPopulation()
612             )
613             count = len(timings)
614             qworker_median = None
615             qworker_p95 = None
616             if count > 1:
617                 qworker_median = timings.get_median()
618                 qworker_p95 = timings.get_percentile(95)
619                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
620             else:
621                 ret += '\n'
622             if count > 0:
623                 ret += f'    ...finished {count} total bundle(s) so far\n'
624             in_flight = len(self.in_flight_bundles_by_worker[worker])
625             if in_flight > 0:
626                 ret += f'    ...{in_flight} bundles currently in flight:\n'
627                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
628                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
629                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
630                     if self.start_per_bundle[bundle_uuid] is not None:
631                         sec = ts - self.start_per_bundle[bundle_uuid]
632                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
633                     else:
634                         ret += f'       {details} setting up / copying data...'
635                         sec = 0.0
636
637                     if qworker_p95 is not None:
638                         if sec > qworker_p95:
639                             ret += f'{bg("red")}>💻p95{reset()} '
640                             if details is not None:
641                                 details.slower_than_local_p95 = True
642                         else:
643                             if details is not None:
644                                 details.slower_than_local_p95 = False
645
646                     if qall_p95 is not None:
647                         if sec > qall_p95:
648                             ret += f'{bg("red")}>∀p95{reset()} '
649                             if details is not None:
650                                 details.slower_than_global_p95 = True
651                         else:
652                             details.slower_than_global_p95 = False
653                     ret += '\n'
654         return ret
655
656     def periodic_dump(self, total_bundles_submitted: int) -> None:
657         assert self.lock.locked()
658         self.total_bundles_submitted = total_bundles_submitted
659         ts = time.time()
660         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
661             print(self)
662             self.last_periodic_dump = ts
663
664
665 class RemoteWorkerSelectionPolicy(ABC):
666     """An interface definition of a policy for selecting a remote worker."""
667
668     def __init__(self):
669         self.workers: Optional[List[RemoteWorkerRecord]] = None
670
671     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
672         self.workers = workers
673
674     @abstractmethod
675     def is_worker_available(self) -> bool:
676         pass
677
678     @abstractmethod
679     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
680         pass
681
682
683 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
684     """A remote worker selector that uses weighted RNG."""
685
686     @overrides
687     def is_worker_available(self) -> bool:
688         if self.workers:
689             for worker in self.workers:
690                 if worker.count > 0:
691                     return True
692         return False
693
694     @overrides
695     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
696         grabbag = []
697         if self.workers:
698             for worker in self.workers:
699                 if worker.machine != machine_to_avoid:
700                     if worker.count > 0:
701                         for _ in range(worker.count * worker.weight):
702                             grabbag.append(worker)
703
704         if len(grabbag) == 0:
705             logger.debug(
706                 'There are no available workers that avoid %s', machine_to_avoid
707             )
708             if self.workers:
709                 for worker in self.workers:
710                     if worker.count > 0:
711                         for _ in range(worker.count * worker.weight):
712                             grabbag.append(worker)
713
714         if len(grabbag) == 0:
715             logger.warning('There are no available workers?!')
716             return None
717
718         worker = random.sample(grabbag, 1)[0]
719         assert worker.count > 0
720         worker.count -= 1
721         logger.debug('Selected worker %s', worker)
722         return worker
723
724
725 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
726     """A remote worker selector that just round robins."""
727
728     def __init__(self) -> None:
729         super().__init__()
730         self.index = 0
731
732     @overrides
733     def is_worker_available(self) -> bool:
734         if self.workers:
735             for worker in self.workers:
736                 if worker.count > 0:
737                     return True
738         return False
739
740     @overrides
741     def acquire_worker(
742         self, machine_to_avoid: str = None
743     ) -> Optional[RemoteWorkerRecord]:
744         if self.workers:
745             x = self.index
746             while True:
747                 worker = self.workers[x]
748                 if worker.count > 0:
749                     worker.count -= 1
750                     x += 1
751                     if x >= len(self.workers):
752                         x = 0
753                     self.index = x
754                     logger.debug('Selected worker %s', worker)
755                     return worker
756                 x += 1
757                 if x >= len(self.workers):
758                     x = 0
759                 if x == self.index:
760                     logger.warning('Unexpectedly could not find a worker, retrying...')
761                     return None
762         return None
763
764
765 class RemoteExecutor(BaseExecutor):
766     """An executor that uses processes on remote machines to do work.
767     To do so, it requires that a pool of remote workers to be properly
768     configured.  See instructions in
769     :class:`pyutils.parallelize.parallelize`.
770
771     Each machine in a worker pool has a *weight* and a *count*.  A
772     *weight* captures the relative speed of a processor on that worker
773     and a *count* captures the number of synchronous tasks the worker
774     can accept (i.e. the number of cpus on the machine).
775
776     To dispatch work to a remote machine, this class pickles the code
777     to be executed remotely using `cloudpickle`.  For that to work,
778     the remote machine should be running the same version of Python as
779     this machine, ideally in a virtual environment with the same
780     import libraries installed.  Differences in operating system
781     and/or processor architecture don't seem to matter for most code,
782     though.
783
784     .. warning::
785
786         Mismatches in Python version or in the version numbers of
787         third-party libraries between machines can cause problems
788         when trying to unpickle and run code remotely.
789
790     Work to be dispatched is represented in this code by creating a
791     "bundle".  Each bundle is assigned to a remote worker based on
792     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
793     general, it attempts to load all workers in the pool and maximize
794     throughput.  Once assigned to a remote worker, pickled code is
795     copied to that worker via `scp` and a remote command is issued via
796     `ssh` to execute a :file:`remote_worker.py` process on the remote
797     machine.  This process unpickles the code, runs it, and produces a
798     result which is then copied back to the local machine (again via
799     `scp`) where it can be processed by local code.
800
801     You can and probably must override the path of
802     :file:`remote_worker.py` on your pool machines using the
803     `--remote_worker_helper_path` commandline argument (or by just
804     changing the default in code, see above in this file's code).
805
806     During remote work execution, this local machine acts as a
807     controller dispatching all work to the network, copying pickled
808     tasks out, and copying results back in.  It may also be a worker
809     in the pool but do not underestimate the cost of being a
810     controller -- it takes some cpu and a lot of network bandwidth.
811     The work dispatcher logic attempts to detect when a controller is
812     also a worker and reduce its load.
813
814     Some redundancy and safety provisions are made when scheduling
815     tasks to the worker pool; e.g. slower than expected tasks have
816     redundant backups tasks created, especially if there are otherwise
817     idle workers.  If a task fails repeatedly, the dispatcher consider
818     it poisoned and give up on it.
819
820     .. warning::
821
822         This executor probably only makes sense to use with
823         computationally expensive tasks such as jobs that will execute
824         for ~30 seconds or longer.
825
826         The network overhead and latency of copying work from the
827         controller (local) machine to the remote workers and copying
828         results back again is relatively high.  Especially at startup,
829         the network can become a bottleneck.  Future versions of this
830         code may attempt to split the responsibility of being a
831         controller (distributing work to pool machines).
832
833     Instructions for how to set this up are provided in
834     :class:`pyutils.parallelize.parallelize`.
835
836     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
837
838     """
839
840     def __init__(
841         self,
842         workers: List[RemoteWorkerRecord],
843         policy: RemoteWorkerSelectionPolicy,
844     ) -> None:
845         """
846         Args:
847             workers: A list of remote workers we can call on to do tasks.
848             policy: A policy for selecting remote workers for tasks.
849         """
850
851         super().__init__()
852         self.workers = workers
853         self.policy = policy
854         self.worker_count = 0
855         for worker in self.workers:
856             self.worker_count += worker.count
857         if self.worker_count <= 0:
858             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
859             logger.critical(msg)
860             raise RemoteExecutorException(msg)
861         self.policy.register_worker_pool(self.workers)
862         self.cv = threading.Condition()
863         logger.debug(
864             'Creating %d local threads, one per remote worker.', self.worker_count
865         )
866         self._helper_executor = fut.ThreadPoolExecutor(
867             thread_name_prefix="remote_executor_helper",
868             max_workers=self.worker_count,
869         )
870         self.status = RemoteExecutorStatus(self.worker_count)
871         self.total_bundles_submitted = 0
872         self.backup_lock = threading.Lock()
873         self.last_backup = None
874         (
875             self.heartbeat_thread,
876             self.heartbeat_stop_event,
877         ) = self._run_periodic_heartbeat()
878         self.already_shutdown = False
879
880     @background_thread
881     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
882         """
883         We create a background thread to invoke :meth:`_heartbeat` regularly
884         while we are scheduling work.  It does some accounting such as
885         looking for slow bundles to tag for backup creation, checking for
886         unexpected failures, and printing a fancy message on stdout.
887         """
888         while not stop_event.is_set():
889             time.sleep(5.0)
890             logger.debug('Running periodic heartbeat code...')
891             self._heartbeat()
892         logger.debug('Periodic heartbeat thread shutting down.')
893
894     def _heartbeat(self) -> None:
895         # Note: this is invoked on a background thread, not an
896         # executor thread.  Be careful what you do with it b/c it
897         # needs to get back and dump status again periodically.
898         with self.status.lock:
899             self.status.periodic_dump(self.total_bundles_submitted)
900
901             # Look for bundles to reschedule via executor.submit
902             if config.config['executors_schedule_remote_backups']:
903                 self._maybe_schedule_backup_bundles()
904
905     def _maybe_schedule_backup_bundles(self):
906         """Maybe schedule backup bundles if we see a very slow bundle."""
907
908         assert self.status.lock.locked()
909         num_done = len(self.status.finished_bundle_timings)
910         num_idle_workers = self.worker_count - self.task_count
911         now = time.time()
912         if (
913             num_done >= 2
914             and num_idle_workers > 0
915             and (self.last_backup is None or (now - self.last_backup > 9.0))
916             and self.backup_lock.acquire(blocking=False)
917         ):
918             try:
919                 assert self.backup_lock.locked()
920
921                 bundle_to_backup = None
922                 best_score = None
923                 for (
924                     worker,
925                     bundle_uuids,
926                 ) in self.status.in_flight_bundles_by_worker.items():
927
928                     # Prefer to schedule backups of bundles running on
929                     # slower machines.
930                     base_score = 0
931                     for record in self.workers:
932                         if worker.machine == record.machine:
933                             base_score = float(record.weight)
934                             base_score = 1.0 / base_score
935                             base_score *= 200.0
936                             base_score = int(base_score)
937                             break
938
939                     for uuid in bundle_uuids:
940                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
941                         if (
942                             bundle is not None
943                             and bundle.src_bundle is None
944                             and bundle.backup_bundles is not None
945                         ):
946                             score = base_score
947
948                             # Schedule backups of bundles running
949                             # longer; especially those that are
950                             # unexpectedly slow.
951                             start_ts = self.status.start_per_bundle[uuid]
952                             if start_ts is not None:
953                                 runtime = now - start_ts
954                                 score += runtime
955                                 logger.debug(
956                                     'score[%s] => %.1f  # latency boost', bundle, score
957                                 )
958
959                                 if bundle.slower_than_local_p95:
960                                     score += runtime / 2
961                                     logger.debug(
962                                         'score[%s] => %.1f  # >worker p95',
963                                         bundle,
964                                         score,
965                                     )
966
967                                 if bundle.slower_than_global_p95:
968                                     score += runtime / 4
969                                     logger.debug(
970                                         'score[%s] => %.1f  # >global p95',
971                                         bundle,
972                                         score,
973                                     )
974
975                             # Prefer backups of bundles that don't
976                             # have backups already.
977                             backup_count = len(bundle.backup_bundles)
978                             if backup_count == 0:
979                                 score *= 2
980                             elif backup_count == 1:
981                                 score /= 2
982                             elif backup_count == 2:
983                                 score /= 8
984                             else:
985                                 score = 0
986                             logger.debug(
987                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
988                                 bundle,
989                                 score,
990                             )
991
992                             if score != 0 and (
993                                 best_score is None or score > best_score
994                             ):
995                                 bundle_to_backup = bundle
996                                 assert bundle is not None
997                                 assert bundle.backup_bundles is not None
998                                 assert bundle.src_bundle is None
999                                 best_score = score
1000
1001                 # Note: this is all still happening on the heartbeat
1002                 # runner thread.  That's ok because
1003                 # _schedule_backup_for_bundle uses the executor to
1004                 # submit the bundle again which will cause it to be
1005                 # picked up by a worker thread and allow this thread
1006                 # to return to run future heartbeats.
1007                 if bundle_to_backup is not None:
1008                     self.last_backup = now
1009                     logger.info(
1010                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1011                         bundle_to_backup,
1012                         best_score,
1013                     )
1014                     self._schedule_backup_for_bundle(bundle_to_backup)
1015             finally:
1016                 self.backup_lock.release()
1017
1018     def _is_worker_available(self) -> bool:
1019         """Is there a worker available currently?"""
1020         return self.policy.is_worker_available()
1021
1022     def _acquire_worker(
1023         self, machine_to_avoid: str = None
1024     ) -> Optional[RemoteWorkerRecord]:
1025         """Try to acquire a worker."""
1026         return self.policy.acquire_worker(machine_to_avoid)
1027
1028     def _find_available_worker_or_block(
1029         self, machine_to_avoid: str = None
1030     ) -> RemoteWorkerRecord:
1031         """Find a worker or block until one becomes available."""
1032         with self.cv:
1033             while not self._is_worker_available():
1034                 self.cv.wait()
1035             worker = self._acquire_worker(machine_to_avoid)
1036             if worker is not None:
1037                 return worker
1038         msg = "We should never reach this point in the code"
1039         logger.critical(msg)
1040         raise Exception(msg)
1041
1042     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1043         """Release a previously acquired worker."""
1044         worker = bundle.worker
1045         assert worker is not None
1046         logger.debug('Released worker %s', worker)
1047         self.status.record_release_worker(
1048             worker,
1049             bundle.uuid,
1050             was_cancelled,
1051         )
1052         with self.cv:
1053             worker.count += 1
1054             self.cv.notify()
1055         self.adjust_task_count(-1)
1056
1057     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1058         """See if a particular bundle is cancelled.  Do not block."""
1059         with self.status.lock:
1060             if bundle.is_cancelled.wait(timeout=0.0):
1061                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1062                 bundle.was_cancelled = True
1063                 return True
1064         return False
1065
1066     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1067         """Find a worker for bundle or block until one is available."""
1068
1069         self.adjust_task_count(+1)
1070         uuid = bundle.uuid
1071         controller = bundle.controller
1072         avoid_machine = override_avoid_machine
1073         is_original = bundle.src_bundle is None
1074
1075         # Try not to schedule a backup on the same host as the original.
1076         if avoid_machine is None and bundle.src_bundle is not None:
1077             avoid_machine = bundle.src_bundle.machine
1078         worker = None
1079         while worker is None:
1080             worker = self._find_available_worker_or_block(avoid_machine)
1081         assert worker is not None
1082
1083         # Ok, found a worker.
1084         bundle.worker = worker
1085         machine = bundle.machine = worker.machine
1086         username = bundle.username = worker.username
1087         self.status.record_acquire_worker(worker, uuid)
1088         logger.debug('%s: Running bundle on %s...', bundle, worker)
1089
1090         # Before we do any work, make sure the bundle is still viable.
1091         # It may have been some time between when it was submitted and
1092         # now due to lack of worker availability and someone else may
1093         # have already finished it.
1094         if self._check_if_cancelled(bundle):
1095             try:
1096                 return self._process_work_result(bundle)
1097             except Exception as e:
1098                 logger.warning(
1099                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1100                 )
1101                 self._release_worker(bundle)
1102                 if is_original:
1103                     # Weird.  We are the original owner of this
1104                     # bundle.  For it to have been cancelled, a backup
1105                     # must have already started and completed before
1106                     # we even for started.  Moreover, the backup says
1107                     # it is done but we can't find the results it
1108                     # should have copied over.  Reschedule the whole
1109                     # thing.
1110                     logger.exception(e)
1111                     logger.error(
1112                         '%s: We are the original owner thread and yet there are '
1113                         'no results for this bundle.  This is unexpected and bad.',
1114                         bundle,
1115                     )
1116                     return self._emergency_retry_nasty_bundle(bundle)
1117                 else:
1118                     # We're a backup and our bundle is cancelled
1119                     # before we even got started.  Do nothing and let
1120                     # the original bundle's thread worry about either
1121                     # finding the results or complaining about it.
1122                     return None
1123
1124         # Send input code / data to worker machine if it's not local.
1125         if controller not in machine:
1126             try:
1127                 cmd = (
1128                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1129                 )
1130                 start_ts = time.time()
1131                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1132                 run_silently(cmd)
1133                 xfer_latency = time.time() - start_ts
1134                 logger.debug(
1135                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1136                 )
1137             except Exception as e:
1138                 self._release_worker(bundle)
1139                 if is_original:
1140                     # Weird.  We tried to copy the code to the worker
1141                     # and it failed...  And we're the original bundle.
1142                     # We have to retry.
1143                     logger.exception(e)
1144                     logger.error(
1145                         "%s: Failed to send instructions to the worker machine?! "
1146                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1147                         "be a race condition.  Attempting an emergency retry...",
1148                         bundle,
1149                     )
1150                     return self._emergency_retry_nasty_bundle(bundle)
1151                 else:
1152                     # This is actually expected; we're a backup.
1153                     # There's a race condition where someone else
1154                     # already finished the work and removed the source
1155                     # code_file before we could copy it.  Ignore.
1156                     logger.warning(
1157                         '%s: Failed to send instructions to the worker machine... '
1158                         'We\'re a backup and this may be caused by the original (or '
1159                         'some other backup) already finishing this work.  Ignoring.',
1160                         bundle,
1161                     )
1162                     return None
1163
1164         # Kick off the work.  Note that if this fails we let
1165         # _wait_for_process deal with it.
1166         self.status.record_processing_began(uuid)
1167         helper_path = config.config['remote_worker_helper_path']
1168         cmd = (
1169             f'{SSH} {bundle.username}@{bundle.machine} '
1170             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1171         )
1172         logger.debug(
1173             '%s: Executing %s in the background to kick off work...', bundle, cmd
1174         )
1175         p = cmd_in_background(cmd, silent=True)
1176         bundle.pid = p.pid
1177         logger.debug(
1178             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1179         )
1180         return self._wait_for_process(p, bundle, 0)
1181
1182     def _wait_for_process(
1183         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1184     ) -> Any:
1185         """At this point we've copied the bundle's pickled code to the remote
1186         worker and started an ssh process that should be invoking the
1187         remote worker to have it execute the user's code.  See how
1188         that's going and wait for it to complete or fail.  Note that
1189         this code is recursive: there are codepaths where we decide to
1190         stop waiting for an ssh process (because another backup seems
1191         to have finished) but then fail to fetch or parse the results
1192         from that backup and thus call ourselves to continue waiting
1193         on an active ssh process.  This is the purpose of the depth
1194         argument: to curtail potential infinite recursion by giving up
1195         eventually.
1196
1197         Args:
1198             p: the Popen record of the ssh job
1199             bundle: the bundle of work being executed remotely
1200             depth: how many retries we've made so far.  Starts at zero.
1201
1202         """
1203
1204         machine = bundle.machine
1205         assert p is not None
1206         pid = p.pid  # pid of the ssh process
1207         if depth > 3:
1208             logger.error(
1209                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1210                 pid,
1211             )
1212             p.terminate()
1213             self._release_worker(bundle)
1214             return self._emergency_retry_nasty_bundle(bundle)
1215
1216         # Spin until either the ssh job we scheduled finishes the
1217         # bundle or some backup worker signals that they finished it
1218         # before we could.
1219         while True:
1220             try:
1221                 p.wait(timeout=0.25)
1222             except subprocess.TimeoutExpired:
1223                 if self._check_if_cancelled(bundle):
1224                     logger.info(
1225                         '%s: looks like another worker finished bundle...', bundle
1226                     )
1227                     break
1228             else:
1229                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1230                 p = None
1231                 break
1232
1233         # If we get here we believe the bundle is done; either the ssh
1234         # subprocess finished (hopefully successfully) or we noticed
1235         # that some other worker seems to have completed the bundle
1236         # before us and we're bailing out.
1237         try:
1238             ret = self._process_work_result(bundle)
1239             if ret is not None and p is not None:
1240                 p.terminate()
1241             return ret
1242
1243         # Something went wrong; e.g. we could not copy the results
1244         # back, cleanup after ourselves on the remote machine, or
1245         # unpickle the results we got from the remove machine.  If we
1246         # still have an active ssh subprocess, keep waiting on it.
1247         # Otherwise, time for an emergency reschedule.
1248         except Exception as e:
1249             logger.exception(e)
1250             logger.error('%s: Something unexpected just happened...', bundle)
1251             if p is not None:
1252                 logger.warning(
1253                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1254                     bundle,
1255                 )
1256                 return self._wait_for_process(p, bundle, depth + 1)
1257             else:
1258                 self._release_worker(bundle)
1259                 return self._emergency_retry_nasty_bundle(bundle)
1260
1261     def _process_work_result(self, bundle: BundleDetails) -> Any:
1262         """A bundle seems to be completed.  Check on the results."""
1263
1264         with self.status.lock:
1265             is_original = bundle.src_bundle is None
1266             was_cancelled = bundle.was_cancelled
1267             username = bundle.username
1268             machine = bundle.machine
1269             result_file = bundle.result_file
1270             code_file = bundle.code_file
1271
1272             # Whether original or backup, if we finished first we must
1273             # fetch the results if the computation happened on a
1274             # remote machine.
1275             bundle.end_ts = time.time()
1276             if not was_cancelled:
1277                 assert bundle.machine is not None
1278                 if bundle.controller not in bundle.machine:
1279                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1280                     logger.info(
1281                         "%s: Fetching results back from %s@%s via %s",
1282                         bundle,
1283                         username,
1284                         machine,
1285                         cmd,
1286                     )
1287
1288                     # If either of these throw they are handled in
1289                     # _wait_for_process.
1290                     attempts = 0
1291                     while True:
1292                         try:
1293                             run_silently(cmd)
1294                         except Exception as e:
1295                             attempts += 1
1296                             if attempts >= 3:
1297                                 raise e
1298                         else:
1299                             break
1300
1301                     # Cleanup remote /tmp files.
1302                     run_silently(
1303                         f'{SSH} {username}@{machine}'
1304                         f' "/bin/rm -f {code_file} {result_file}"'
1305                     )
1306                     logger.debug(
1307                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1308                     )
1309                 dur = bundle.end_ts - bundle.start_ts
1310                 self.histogram.add_item(dur)
1311
1312         # Only the original worker should unpickle the file contents
1313         # though since it's the only one whose result matters.  The
1314         # original is also the only job that may delete result_file
1315         # from disk.  Note that the original may have been cancelled
1316         # if one of the backups finished first; it still must read the
1317         # result from disk.  It still does that here with is_cancelled
1318         # set.
1319         if is_original:
1320             logger.debug("%s: Unpickling %s.", bundle, result_file)
1321             try:
1322                 with open(result_file, 'rb') as rb:
1323                     serialized = rb.read()
1324                 result = cloudpickle.loads(serialized)
1325             except Exception as e:
1326                 logger.exception(e)
1327                 logger.error('Failed to load %s... this is bad news.', result_file)
1328                 self._release_worker(bundle)
1329
1330                 # Re-raise the exception; the code in _wait_for_process may
1331                 # decide to _emergency_retry_nasty_bundle here.
1332                 raise e
1333             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1334             os.remove(result_file)
1335             os.remove(code_file)
1336
1337             # Notify any backups that the original is done so they
1338             # should stop ASAP.  Do this whether or not we
1339             # finished first since there could be more than one
1340             # backup.
1341             if bundle.backup_bundles is not None:
1342                 for backup in bundle.backup_bundles:
1343                     logger.debug(
1344                         '%s: Notifying backup %s that it\'s cancelled',
1345                         bundle,
1346                         backup.uuid,
1347                     )
1348                     backup.is_cancelled.set()
1349
1350         # This is a backup job and, by now, we have already fetched
1351         # the bundle results.
1352         else:
1353             # Backup results don't matter, they just need to leave the
1354             # result file in the right place for their originals to
1355             # read/unpickle later.
1356             result = None
1357
1358             # Tell the original to stop if we finished first.
1359             if not was_cancelled:
1360                 orig_bundle = bundle.src_bundle
1361                 assert orig_bundle is not None
1362                 logger.debug(
1363                     '%s: Notifying original %s we beat them to it.',
1364                     bundle,
1365                     orig_bundle.uuid,
1366                 )
1367                 orig_bundle.is_cancelled.set()
1368         self._release_worker(bundle, was_cancelled=was_cancelled)
1369         return result
1370
1371     def _create_original_bundle(self, pickle, function_name: str):
1372         """Creates a bundle that is not a backup of any other bundle but
1373         rather represents a user task.
1374         """
1375
1376         uuid = string_utils.generate_uuid(omit_dashes=True)
1377         code_file = f'/tmp/{uuid}.code.bin'
1378         result_file = f'/tmp/{uuid}.result.bin'
1379
1380         logger.debug('Writing pickled code to %s', code_file)
1381         with open(code_file, 'wb') as wb:
1382             wb.write(pickle)
1383
1384         bundle = BundleDetails(
1385             pickled_code=pickle,
1386             uuid=uuid,
1387             function_name=function_name,
1388             worker=None,
1389             username=None,
1390             machine=None,
1391             controller=platform.node(),
1392             code_file=code_file,
1393             result_file=result_file,
1394             pid=0,
1395             start_ts=time.time(),
1396             end_ts=0.0,
1397             slower_than_local_p95=False,
1398             slower_than_global_p95=False,
1399             src_bundle=None,
1400             is_cancelled=threading.Event(),
1401             was_cancelled=False,
1402             backup_bundles=[],
1403             failure_count=0,
1404         )
1405         self.status.record_bundle_details(bundle)
1406         logger.debug('%s: Created an original bundle', bundle)
1407         return bundle
1408
1409     def _create_backup_bundle(self, src_bundle: BundleDetails):
1410         """Creates a bundle that is a backup of another bundle that is
1411         running too slowly."""
1412
1413         assert self.status.lock.locked()
1414         assert src_bundle.backup_bundles is not None
1415         n = len(src_bundle.backup_bundles)
1416         uuid = src_bundle.uuid + f'_backup#{n}'
1417
1418         backup_bundle = BundleDetails(
1419             pickled_code=src_bundle.pickled_code,
1420             uuid=uuid,
1421             function_name=src_bundle.function_name,
1422             worker=None,
1423             username=None,
1424             machine=None,
1425             controller=src_bundle.controller,
1426             code_file=src_bundle.code_file,
1427             result_file=src_bundle.result_file,
1428             pid=0,
1429             start_ts=time.time(),
1430             end_ts=0.0,
1431             slower_than_local_p95=False,
1432             slower_than_global_p95=False,
1433             src_bundle=src_bundle,
1434             is_cancelled=threading.Event(),
1435             was_cancelled=False,
1436             backup_bundles=None,  # backup backups not allowed
1437             failure_count=0,
1438         )
1439         src_bundle.backup_bundles.append(backup_bundle)
1440         self.status.record_bundle_details_already_locked(backup_bundle)
1441         logger.debug('%s: Created a backup bundle', backup_bundle)
1442         return backup_bundle
1443
1444     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1445         """Schedule a backup of src_bundle."""
1446
1447         assert self.status.lock.locked()
1448         assert src_bundle is not None
1449         backup_bundle = self._create_backup_bundle(src_bundle)
1450         logger.debug(
1451             '%s/%s: Scheduling backup for execution...',
1452             backup_bundle.uuid,
1453             backup_bundle.function_name,
1454         )
1455         self._helper_executor.submit(self._launch, backup_bundle)
1456
1457         # Results from backups don't matter; if they finish first
1458         # they will move the result_file to this machine and let
1459         # the original pick them up and unpickle them (and return
1460         # a result).
1461
1462     def _emergency_retry_nasty_bundle(
1463         self, bundle: BundleDetails
1464     ) -> Optional[fut.Future]:
1465         """Something unexpectedly failed with bundle.  Either retry it
1466         from the beginning or throw in the towel and give up on it."""
1467
1468         is_original = bundle.src_bundle is None
1469         bundle.worker = None
1470         avoid_last_machine = bundle.machine
1471         bundle.machine = None
1472         bundle.username = None
1473         bundle.failure_count += 1
1474         if is_original:
1475             retry_limit = 3
1476         else:
1477             retry_limit = 2
1478
1479         if bundle.failure_count > retry_limit:
1480             logger.error(
1481                 '%s: Tried this bundle too many times already (%dx); giving up.',
1482                 bundle,
1483                 retry_limit,
1484             )
1485             if is_original:
1486                 raise RemoteExecutorException(
1487                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1488                 )
1489             logger.error(
1490                 '%s: At least it\'s only a backup; better luck with the others.',
1491                 bundle,
1492             )
1493             return None
1494         else:
1495             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1496             logger.warning(msg)
1497             warnings.warn(msg)
1498             return self._launch(bundle, avoid_last_machine)
1499
1500     @overrides
1501     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502         """Submit work to be done.  This is the user entry point of this
1503         class."""
1504         if self.already_shutdown:
1505             raise Exception('Submitted work after shutdown.')
1506         pickle = _make_cloud_pickle(function, *args, **kwargs)
1507         bundle = self._create_original_bundle(pickle, function.__name__)
1508         self.total_bundles_submitted += 1
1509         return self._helper_executor.submit(self._launch, bundle)
1510
1511     @overrides
1512     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513         """Shutdown the executor."""
1514         if not self.already_shutdown:
1515             logging.debug('Shutting down RemoteExecutor %s', self.title)
1516             self.heartbeat_stop_event.set()
1517             self.heartbeat_thread.join()
1518             self._helper_executor.shutdown(wait)
1519             if not quiet:
1520                 print(self.histogram.__repr__(label_formatter='%ds'))
1521             self.already_shutdown = True
1522
1523
1524 class RemoteWorkerPoolProvider:
1525     @abstractmethod
1526     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1527         pass
1528
1529
1530 @persistent.persistent_autoloaded_singleton()  # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1533 ):
1534     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535         self.remote_worker_pool = []
1536         for record in json_remote_worker_pool['remote_worker_records']:
1537             self.remote_worker_pool.append(
1538                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1539             )
1540         assert len(self.remote_worker_pool) > 0
1541
1542     @overrides
1543     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544         return self.remote_worker_pool
1545
1546     @overrides
1547     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548         return self.remote_worker_pool
1549
1550     @staticmethod
1551     @overrides
1552     def get_filename() -> str:
1553         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1554
1555     @staticmethod
1556     @overrides
1557     def should_we_load_data(filename: str) -> bool:
1558         return True
1559
1560     @staticmethod
1561     @overrides
1562     def should_we_save_data(filename: str) -> bool:
1563         return False
1564
1565
1566 @singleton
1567 class DefaultExecutors(object):
1568     """A container for a default thread, process and remote executor.
1569     These are not created until needed and we take care to clean up
1570     before process exit automatically for the caller's convenience.
1571     Instead of creating your own executor, consider using the one
1572     from this pool.  e.g.::
1573
1574         @par.parallelize(method=par.Method.PROCESS)
1575         def do_work(
1576             solutions: List[Work],
1577             shard_num: int,
1578             ...
1579         ):
1580             <do the work>
1581
1582
1583         def start_do_work(all_work: List[Work]):
1584             shards = []
1585             logger.debug('Sharding work into groups of 10.')
1586             for subset in list_utils.shard(all_work, 10):
1587                 shards.append([x for x in subset])
1588
1589             logger.debug('Kicking off helper pool.')
1590             try:
1591                 for n, shard in enumerate(shards):
1592                     results.append(
1593                         do_work(
1594                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1595                         )
1596                     )
1597                 smart_future.wait_all(results)
1598             finally:
1599                 # Note: if you forget to do this it will clean itself up
1600                 # during program termination including tearing down any
1601                 # active ssh connections.
1602                 executors.DefaultExecutors().process_pool().shutdown()
1603     """
1604
1605     def __init__(self):
1606         self.thread_executor: Optional[ThreadExecutor] = None
1607         self.process_executor: Optional[ProcessExecutor] = None
1608         self.remote_executor: Optional[RemoteExecutor] = None
1609
1610     @staticmethod
1611     def _ping(host) -> bool:
1612         logger.debug('RUN> ping -c 1 %s', host)
1613         try:
1614             x = cmd_exitcode(
1615                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1616             )
1617             return x == 0
1618         except Exception:
1619             return False
1620
1621     def thread_pool(self) -> ThreadExecutor:
1622         if self.thread_executor is None:
1623             self.thread_executor = ThreadExecutor()
1624         return self.thread_executor
1625
1626     def process_pool(self) -> ProcessExecutor:
1627         if self.process_executor is None:
1628             self.process_executor = ProcessExecutor()
1629         return self.process_executor
1630
1631     def remote_pool(self) -> RemoteExecutor:
1632         if self.remote_executor is None:
1633             logger.info('Looking for some helper machines...')
1634             provider = ConfigRemoteWorkerPoolProvider()
1635             all_machines = provider.get_remote_workers()
1636             pool = []
1637
1638             # Make sure we can ping each machine.
1639             for record in all_machines:
1640                 if self._ping(record.machine):
1641                     logger.info('%s is alive / responding to pings', record.machine)
1642                     pool.append(record)
1643
1644             # The controller machine has a lot to do; go easy on it.
1645             for record in pool:
1646                 if record.machine == platform.node() and record.count > 1:
1647                     logger.info('Reducing workload for %s.', record.machine)
1648                     record.count = max(int(record.count / 2), 1)
1649
1650             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651             policy.register_worker_pool(pool)
1652             self.remote_executor = RemoteExecutor(pool, policy)
1653         return self.remote_executor
1654
1655     def shutdown(self) -> None:
1656         if self.thread_executor is not None:
1657             self.thread_executor.shutdown(wait=True, quiet=True)
1658             self.thread_executor = None
1659         if self.process_executor is not None:
1660             self.process_executor.shutdown(wait=True, quiet=True)
1661             self.process_executor = None
1662         if self.remote_executor is not None:
1663             self.remote_executor.shutdown(wait=True, quiet=True)
1664             self.remote_executor = None