Change path of remote_worker so the macbook can work too.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """
7 This module defines a :class:`BaseExecutor` interface and three
8 implementations:
9
10     - :class:`ThreadExecutor`
11     - :class:`ProcessExecutor`
12     - :class:`RemoteExecutor`
13
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work.  Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code.  But it's good for
18 work that is mostly I/O bound.
19
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
22 workloads.
23
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network.  You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
29
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed.  It has the added benefit of being automatically
33 cleaned up at process termination time.
34
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
37 """
38
39 from __future__ import annotations
40
41 import concurrent.futures as fut
42 import logging
43 import os
44 import platform
45 import random
46 import subprocess
47 import threading
48 import time
49 import warnings
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
54
55 import cloudpickle  # type: ignore
56 from overrides import overrides
57
58 import pyutils.types.histogram as hist
59 from pyutils import (
60     argparse_utils,
61     config,
62     dataclass_utils,
63     math_utils,
64     persistent,
65     string_utils,
66 )
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
72
73 logger = logging.getLogger(__name__)
74
75 parser = config.add_commandline_args(
76     f"Executors ({__file__})", "Args related to processing executors."
77 )
78 parser.add_argument(
79     '--executors_threadpool_size',
80     type=int,
81     metavar='#THREADS',
82     help='Number of threads in the default threadpool, leave unset for default',
83     default=None,
84 )
85 parser.add_argument(
86     '--executors_processpool_size',
87     type=int,
88     metavar='#PROCESSES',
89     help='Number of processes in the default processpool, leave unset for default',
90     default=None,
91 )
92 parser.add_argument(
93     '--executors_schedule_remote_backups',
94     default=True,
95     action=argparse_utils.ActionNoYes,
96     help='Should we schedule duplicative backup work if a remote bundle is slow',
97 )
98 parser.add_argument(
99     '--executors_max_bundle_failures',
100     type=int,
101     default=3,
102     metavar='#FAILURES',
103     help='Maximum number of failures before giving up on a bundle',
104 )
105 parser.add_argument(
106     '--remote_worker_records_file',
107     type=str,
108     metavar='FILENAME',
109     help='Path of the remote worker records file (JSON)',
110     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
111 )
112 parser.add_argument(
113     '--remote_worker_helper_path',
114     type=str,
115     metavar='PATH_TO_REMOTE_WORKER_PY',
116     help='Path to remote_worker.py on remote machines',
117     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
118 )
119
120
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
123
124
125 def _make_cloud_pickle(fun, *args, **kwargs):
126     """Internal helper to create cloud pickles."""
127     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128     return cloudpickle.dumps((fun, args, kwargs))
129
130
131 class BaseExecutor(ABC):
132     """The base executor interface definition.  The interface for
133     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134     :class:`ThreadExecutor`.
135     """
136
137     def __init__(self, *, title=''):
138         """
139         Args:
140             title: the name of this executor.
141         """
142         self.title = title
143         self.histogram = hist.SimpleHistogram(
144             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
145         )
146         self.task_count = 0
147
148     @abstractmethod
149     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150         """Submit work for the executor to do.
151
152         Args:
153             function: the Callable to be executed.
154             *args: the arguments to function
155             **kwargs: the arguments to function
156
157         Returns:
158             A concurrent :class:`Future` representing the result of the
159             work.
160         """
161         pass
162
163     @abstractmethod
164     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165         """Shutdown the executor.
166
167         Args:
168             wait: wait for the shutdown to complete before returning?
169             quiet: keep it quiet, please.
170         """
171         pass
172
173     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174         """Shutdown the executor and return True if the executor is idle
175         (i.e. there are no pending or active tasks).  Return False
176         otherwise.  Note: this should only be called by the launcher
177         process.
178
179         Args:
180             quiet: keep it quiet, please.
181
182         Returns:
183             True if the executor could be shut down because it has no
184             pending work, False otherwise.
185         """
186         if self.task_count == 0:
187             self.shutdown(wait=True, quiet=quiet)
188             return True
189         return False
190
191     def adjust_task_count(self, delta: int) -> None:
192         """Change the task count.  Note: do not call this method from a
193         worker, it should only be called by the launcher process /
194         thread / machine.
195
196         Args:
197             delta: the delta value by which to adjust task count.
198         """
199         self.task_count += delta
200         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
201
202     def get_task_count(self) -> int:
203         """Change the task count.  Note: do not call this method from a
204         worker, it should only be called by the launcher process /
205         thread / machine.
206
207         Returns:
208             The executor's current task count.
209         """
210         return self.task_count
211
212
213 class ThreadExecutor(BaseExecutor):
214     """A threadpool executor.  This executor uses Python threads to
215     schedule tasks.  Note that, at least as of python3.10, because of
216     the global lock in the interpreter itself, these do not
217     parallelize very well so this class is useful mostly for non-CPU
218     intensive tasks.
219
220     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
221     """
222
223     def __init__(self, max_workers: Optional[int] = None):
224         """
225         Args:
226             max_workers: maximum number of threads to create in the pool.
227         """
228         super().__init__()
229         workers = None
230         if max_workers is not None:
231             workers = max_workers
232         elif 'executors_threadpool_size' in config.config:
233             workers = config.config['executors_threadpool_size']
234         if workers is not None:
235             logger.debug('Creating threadpool executor with %d workers', workers)
236         else:
237             logger.debug('Creating a default sized threadpool executor')
238         self._thread_pool_executor = fut.ThreadPoolExecutor(
239             max_workers=workers, thread_name_prefix="thread_executor_helper"
240         )
241         self.already_shutdown = False
242
243     # This is run on a different thread; do not adjust task count here.
244     @staticmethod
245     def _run_local_bundle(fun, *args, **kwargs):
246         logger.debug("Running local bundle at %s", fun.__name__)
247         result = fun(*args, **kwargs)
248         return result
249
250     @overrides
251     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252         if self.already_shutdown:
253             raise Exception('Submitted work after shutdown.')
254         self.adjust_task_count(+1)
255         newargs = []
256         newargs.append(function)
257         for arg in args:
258             newargs.append(arg)
259         start = time.time()
260         result = self._thread_pool_executor.submit(
261             ThreadExecutor._run_local_bundle, *newargs, **kwargs
262         )
263         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264         result.add_done_callback(lambda _: self.adjust_task_count(-1))
265         return result
266
267     @overrides
268     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269         if not self.already_shutdown:
270             logger.debug('Shutting down threadpool executor %s', self.title)
271             self._thread_pool_executor.shutdown(wait)
272             if not quiet:
273                 print(self.histogram.__repr__(label_formatter='%ds'))
274             self.already_shutdown = True
275
276
277 class ProcessExecutor(BaseExecutor):
278     """An executor which runs tasks in child processes.
279
280     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
281     """
282
283     def __init__(self, max_workers=None):
284         """
285         Args:
286             max_workers: the max number of worker processes to create.
287         """
288         super().__init__()
289         workers = None
290         if max_workers is not None:
291             workers = max_workers
292         elif 'executors_processpool_size' in config.config:
293             workers = config.config['executors_processpool_size']
294         if workers is not None:
295             logger.debug('Creating processpool executor with %d workers.', workers)
296         else:
297             logger.debug('Creating a default sized processpool executor')
298         self._process_executor = fut.ProcessPoolExecutor(
299             max_workers=workers,
300         )
301         self.already_shutdown = False
302
303     # This is run in another process; do not adjust task count here.
304     @staticmethod
305     def _run_cloud_pickle(pickle):
306         fun, args, kwargs = cloudpickle.loads(pickle)
307         logger.debug("Running pickled bundle at %s", fun.__name__)
308         result = fun(*args, **kwargs)
309         return result
310
311     @overrides
312     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313         if self.already_shutdown:
314             raise Exception('Submitted work after shutdown.')
315         start = time.time()
316         self.adjust_task_count(+1)
317         pickle = _make_cloud_pickle(function, *args, **kwargs)
318         result = self._process_executor.submit(
319             ProcessExecutor._run_cloud_pickle, pickle
320         )
321         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322         result.add_done_callback(lambda _: self.adjust_task_count(-1))
323         return result
324
325     @overrides
326     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327         if not self.already_shutdown:
328             logger.debug('Shutting down processpool executor %s', self.title)
329             self._process_executor.shutdown(wait)
330             if not quiet:
331                 print(self.histogram.__repr__(label_formatter='%ds'))
332             self.already_shutdown = True
333
334     def __getstate__(self):
335         state = self.__dict__.copy()
336         state['_process_executor'] = None
337         return state
338
339
340 class RemoteExecutorException(Exception):
341     """Thrown when a bundle cannot be executed despite several retries."""
342
343     pass
344
345
346 @dataclass
347 class RemoteWorkerRecord:
348     """A record of info about a remote worker."""
349
350     username: str
351     """Username we can ssh into on this machine to run work."""
352
353     machine: str
354     """Machine address / name."""
355
356     weight: int
357     """Relative probability for the weighted policy to select this
358     machine for scheduling work."""
359
360     count: int
361     """If this machine is selected, what is the maximum number of task
362     that it can handle?"""
363
364     def __hash__(self):
365         return hash((self.username, self.machine))
366
367     def __repr__(self):
368         return f'{self.username}@{self.machine}'
369
370
371 @dataclass
372 class BundleDetails:
373     """All info necessary to define some unit of work that needs to be
374     done, where it is being run, its state, whether it is an original
375     bundle of a backup bundle, how many times it has failed, etc...
376     """
377
378     pickled_code: bytes
379     """The code to run, cloud pickled"""
380
381     uuid: str
382     """A unique identifier"""
383
384     function_name: str
385     """The name of the function we pickled"""
386
387     worker: Optional[RemoteWorkerRecord]
388     """The remote worker running this bundle or None if none (yet)"""
389
390     username: Optional[str]
391     """The remote username running this bundle or None if none (yet)"""
392
393     machine: Optional[str]
394     """The remote machine running this bundle or None if none (yet)"""
395
396     hostname: str
397     """The controller machine"""
398
399     code_file: str
400     """A unique filename to hold the work to be done"""
401
402     result_file: str
403     """Where the results should be placed / read from"""
404
405     pid: int
406     """The process id of the local subprocess watching the ssh connection
407     to the remote machine"""
408
409     start_ts: float
410     """Starting time"""
411
412     end_ts: float
413     """Ending time"""
414
415     slower_than_local_p95: bool
416     """Currently slower then 95% of other bundles on remote host"""
417
418     slower_than_global_p95: bool
419     """Currently slower than 95% of other bundles globally"""
420
421     src_bundle: Optional[BundleDetails]
422     """If this is a backup bundle, this points to the original bundle
423     that it's backing up.  None otherwise."""
424
425     is_cancelled: threading.Event
426     """An event that can be signaled to indicate this bundle is cancelled.
427     This is set when another copy (backup or original) of this work has
428     completed successfully elsewhere."""
429
430     was_cancelled: bool
431     """True if this bundle was cancelled, False if it finished normally"""
432
433     backup_bundles: Optional[List[BundleDetails]]
434     """If we've created backups of this bundle, this is the list of them"""
435
436     failure_count: int
437     """How many times has this bundle failed already?"""
438
439     def __repr__(self):
440         uuid = self.uuid
441         if uuid[-9:-2] == '_backup':
442             uuid = uuid[:-9]
443             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
444         else:
445             suffix = uuid[-6:]
446
447         # We colorize the uuid based on some bits from it to make them
448         # stand out in the logging and help a reader correlate log messages
449         # related to the same bundle.
450         colorz = [
451             fg('violet red'),
452             fg('red'),
453             fg('orange'),
454             fg('peach orange'),
455             fg('yellow'),
456             fg('marigold yellow'),
457             fg('green yellow'),
458             fg('tea green'),
459             fg('cornflower blue'),
460             fg('turquoise blue'),
461             fg('tropical blue'),
462             fg('lavender purple'),
463             fg('medium purple'),
464         ]
465         c = colorz[int(uuid[-2:], 16) % len(colorz)]
466         function_name = (
467             self.function_name if self.function_name is not None else 'nofname'
468         )
469         machine = self.machine if self.machine is not None else 'nomachine'
470         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
471
472
473 class RemoteExecutorStatus:
474     """A status 'scoreboard' for a remote executor tracking various
475     metrics and able to render a periodic dump of global state.
476     """
477
478     def __init__(self, total_worker_count: int) -> None:
479         """
480         Args:
481             total_worker_count: number of workers in the pool
482         """
483         self.worker_count: int = total_worker_count
484         self.known_workers: Set[RemoteWorkerRecord] = set()
485         self.start_time: float = time.time()
486         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487         self.end_per_bundle: Dict[str, float] = defaultdict(float)
488         self.finished_bundle_timings_per_worker: Dict[
489             RemoteWorkerRecord, math_utils.NumericPopulation
490         ] = {}
491         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493         self.finished_bundle_timings: math_utils.NumericPopulation = (
494             math_utils.NumericPopulation()
495         )
496         self.last_periodic_dump: Optional[float] = None
497         self.total_bundles_submitted: int = 0
498
499         # Protects reads and modification using self.  Also used
500         # as a memory fence for modifications to bundle.
501         self.lock: threading.Lock = threading.Lock()
502
503     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504         """Record that bundle with uuid is assigned to a particular worker.
505
506         Args:
507             worker: the record of the worker to which uuid is assigned
508             uuid: the uuid of a bundle that has been assigned to a worker
509         """
510         with self.lock:
511             self.record_acquire_worker_already_locked(worker, uuid)
512
513     def record_acquire_worker_already_locked(
514         self, worker: RemoteWorkerRecord, uuid: str
515     ) -> None:
516         """Same as above but an entry point that doesn't acquire the lock
517         for codepaths where it's already held."""
518         assert self.lock.locked()
519         self.known_workers.add(worker)
520         self.start_per_bundle[uuid] = None
521         x = self.in_flight_bundles_by_worker.get(worker, set())
522         x.add(uuid)
523         self.in_flight_bundles_by_worker[worker] = x
524
525     def record_bundle_details(self, details: BundleDetails) -> None:
526         """Register the details about a bundle of work."""
527         with self.lock:
528             self.record_bundle_details_already_locked(details)
529
530     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531         """Same as above but for codepaths that already hold the lock."""
532         assert self.lock.locked()
533         self.bundle_details_by_uuid[details.uuid] = details
534
535     def record_release_worker(
536         self,
537         worker: RemoteWorkerRecord,
538         uuid: str,
539         was_cancelled: bool,
540     ) -> None:
541         """Record that a bundle has released a worker."""
542         with self.lock:
543             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
544
545     def record_release_worker_already_locked(
546         self,
547         worker: RemoteWorkerRecord,
548         uuid: str,
549         was_cancelled: bool,
550     ) -> None:
551         """Same as above but for codepaths that already hold the lock."""
552         assert self.lock.locked()
553         ts = time.time()
554         self.end_per_bundle[uuid] = ts
555         self.in_flight_bundles_by_worker[worker].remove(uuid)
556         if not was_cancelled:
557             start = self.start_per_bundle[uuid]
558             assert start is not None
559             bundle_latency = ts - start
560             x = self.finished_bundle_timings_per_worker.get(
561                 worker, math_utils.NumericPopulation()
562             )
563             x.add_number(bundle_latency)
564             self.finished_bundle_timings_per_worker[worker] = x
565             self.finished_bundle_timings.add_number(bundle_latency)
566
567     def record_processing_began(self, uuid: str):
568         """Record when work on a bundle begins."""
569         with self.lock:
570             self.start_per_bundle[uuid] = time.time()
571
572     def total_in_flight(self) -> int:
573         """How many bundles are in flight currently?"""
574         assert self.lock.locked()
575         total_in_flight = 0
576         for worker in self.known_workers:
577             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578         return total_in_flight
579
580     def total_idle(self) -> int:
581         """How many idle workers are there currently?"""
582         assert self.lock.locked()
583         return self.worker_count - self.total_in_flight()
584
585     def __repr__(self):
586         assert self.lock.locked()
587         ts = time.time()
588         total_finished = len(self.finished_bundle_timings)
589         total_in_flight = self.total_in_flight()
590         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
591         qall = None
592         if len(self.finished_bundle_timings) > 1:
593             qall_median = self.finished_bundle_timings.get_median()
594             qall_p95 = self.finished_bundle_timings.get_percentile(95)
595             ret += (
596                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
597                 f'✅={total_finished}/{self.total_bundles_submitted}, '
598                 f'💻n={total_in_flight}/{self.worker_count}\n'
599             )
600         else:
601             ret += (
602                 f'⏱={ts-self.start_time:.1f}s, '
603                 f'✅={total_finished}/{self.total_bundles_submitted}, '
604                 f'💻n={total_in_flight}/{self.worker_count}\n'
605             )
606
607         for worker in self.known_workers:
608             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
609             timings = self.finished_bundle_timings_per_worker.get(
610                 worker, math_utils.NumericPopulation()
611             )
612             count = len(timings)
613             qworker_median = None
614             qworker_p95 = None
615             if count > 1:
616                 qworker_median = timings.get_median()
617                 qworker_p95 = timings.get_percentile(95)
618                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
619             else:
620                 ret += '\n'
621             if count > 0:
622                 ret += f'    ...finished {count} total bundle(s) so far\n'
623             in_flight = len(self.in_flight_bundles_by_worker[worker])
624             if in_flight > 0:
625                 ret += f'    ...{in_flight} bundles currently in flight:\n'
626                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
627                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
628                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
629                     if self.start_per_bundle[bundle_uuid] is not None:
630                         sec = ts - self.start_per_bundle[bundle_uuid]
631                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
632                     else:
633                         ret += f'       {details} setting up / copying data...'
634                         sec = 0.0
635
636                     if qworker_p95 is not None:
637                         if sec > qworker_p95:
638                             ret += f'{bg("red")}>💻p95{reset()} '
639                             if details is not None:
640                                 details.slower_than_local_p95 = True
641                         else:
642                             if details is not None:
643                                 details.slower_than_local_p95 = False
644
645                     if qall is not None:
646                         if sec > qall[1]:
647                             ret += f'{bg("red")}>∀p95{reset()} '
648                             if details is not None:
649                                 details.slower_than_global_p95 = True
650                         else:
651                             details.slower_than_global_p95 = False
652                     ret += '\n'
653         return ret
654
655     def periodic_dump(self, total_bundles_submitted: int) -> None:
656         assert self.lock.locked()
657         self.total_bundles_submitted = total_bundles_submitted
658         ts = time.time()
659         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
660             print(self)
661             self.last_periodic_dump = ts
662
663
664 class RemoteWorkerSelectionPolicy(ABC):
665     """An interface definition of a policy for selecting a remote worker."""
666
667     def __init__(self):
668         self.workers: Optional[List[RemoteWorkerRecord]] = None
669
670     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
671         self.workers = workers
672
673     @abstractmethod
674     def is_worker_available(self) -> bool:
675         pass
676
677     @abstractmethod
678     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
679         pass
680
681
682 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
683     """A remote worker selector that uses weighted RNG."""
684
685     @overrides
686     def is_worker_available(self) -> bool:
687         if self.workers:
688             for worker in self.workers:
689                 if worker.count > 0:
690                     return True
691         return False
692
693     @overrides
694     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
695         grabbag = []
696         if self.workers:
697             for worker in self.workers:
698                 if worker.machine != machine_to_avoid:
699                     if worker.count > 0:
700                         for _ in range(worker.count * worker.weight):
701                             grabbag.append(worker)
702
703         if len(grabbag) == 0:
704             logger.debug(
705                 'There are no available workers that avoid %s', machine_to_avoid
706             )
707             if self.workers:
708                 for worker in self.workers:
709                     if worker.count > 0:
710                         for _ in range(worker.count * worker.weight):
711                             grabbag.append(worker)
712
713         if len(grabbag) == 0:
714             logger.warning('There are no available workers?!')
715             return None
716
717         worker = random.sample(grabbag, 1)[0]
718         assert worker.count > 0
719         worker.count -= 1
720         logger.debug('Selected worker %s', worker)
721         return worker
722
723
724 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
725     """A remote worker selector that just round robins."""
726
727     def __init__(self) -> None:
728         super().__init__()
729         self.index = 0
730
731     @overrides
732     def is_worker_available(self) -> bool:
733         if self.workers:
734             for worker in self.workers:
735                 if worker.count > 0:
736                     return True
737         return False
738
739     @overrides
740     def acquire_worker(
741         self, machine_to_avoid: str = None
742     ) -> Optional[RemoteWorkerRecord]:
743         if self.workers:
744             x = self.index
745             while True:
746                 worker = self.workers[x]
747                 if worker.count > 0:
748                     worker.count -= 1
749                     x += 1
750                     if x >= len(self.workers):
751                         x = 0
752                     self.index = x
753                     logger.debug('Selected worker %s', worker)
754                     return worker
755                 x += 1
756                 if x >= len(self.workers):
757                     x = 0
758                 if x == self.index:
759                     logger.warning('Unexpectedly could not find a worker, retrying...')
760                     return None
761         return None
762
763
764 class RemoteExecutor(BaseExecutor):
765     """An executor that uses processes on remote machines to do work.
766     To do so, it requires that a pool of remote workers to be properly
767     configured.  See instructions in
768     :class:`pyutils.parallelize.parallelize`.
769
770     Each machine in a worker pool has a *weight* and a *count*.  A
771     *weight* captures the relative speed of a processor on that worker
772     and a *count* captures the number of synchronous tasks the worker
773     can accept (i.e. the number of cpus on the machine).
774
775     To dispatch work to a remote machine, this class pickles the code
776     to be executed remotely using `cloudpickle`.  For that to work,
777     the remote machine should be running the same version of Python as
778     this machine, ideally in a virtual environment with the same
779     import libraries installed.  Differences in operating system
780     and/or processor architecture don't seem to matter for most code,
781     though.
782
783     .. warning::
784
785         Mismatches in Python version or in the version numbers of
786         third-party libraries between machines can cause problems
787         when trying to unpickle and run code remotely.
788
789     Work to be dispatched is represented in this code by creating a
790     "bundle".  Each bundle is assigned to a remote worker based on
791     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
792     general, it attempts to load all workers in the pool and maximize
793     throughput.  Once assigned to a remote worker, pickled code is
794     copied to that worker via `scp` and a remote command is issued via
795     `ssh` to execute a :file:`remote_worker.py` process on the remote
796     machine.  This process unpickles the code, runs it, and produces a
797     result which is then copied back to the local machine (again via
798     `scp`) where it can be processed by local code.
799
800     You can and probably must override the path of
801     :file:`remote_worker.py` on your pool machines using the
802     `--remote_worker_helper_path` commandline argument (or by just
803     changing the default in code, see above in this file's code).
804
805     During remote work execution, this local machine acts as a
806     controller dispatching all work to the network, copying pickled
807     tasks out, and copying results back in.  It may also be a worker
808     in the pool but do not underestimate the cost of being a
809     controller -- it takes some cpu and a lot of network bandwidth.
810     The work dispatcher logic attempts to detect when a controller is
811     also a worker and reduce its load.
812
813     Some redundancy and safety provisions are made when scheduling
814     tasks to the worker pool; e.g. slower than expected tasks have
815     redundant backups tasks created, especially if there are otherwise
816     idle workers.  If a task fails repeatedly, the dispatcher consider
817     it poisoned and give up on it.
818
819     .. warning::
820
821         This executor probably only makes sense to use with
822         computationally expensive tasks such as jobs that will execute
823         for ~30 seconds or longer.
824
825         The network overhead and latency of copying work from the
826         controller (local) machine to the remote workers and copying
827         results back again is relatively high.  Especially at startup,
828         the network can become a bottleneck.  Future versions of this
829         code may attempt to split the responsibility of being a
830         controller (distributing work to pool machines).
831
832     Instructions for how to set this up are provided in
833     :class:`pyutils.parallelize.parallelize`.
834
835     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
836
837     """
838
839     def __init__(
840         self,
841         workers: List[RemoteWorkerRecord],
842         policy: RemoteWorkerSelectionPolicy,
843     ) -> None:
844         """
845         Args:
846             workers: A list of remote workers we can call on to do tasks.
847             policy: A policy for selecting remote workers for tasks.
848         """
849
850         super().__init__()
851         self.workers = workers
852         self.policy = policy
853         self.worker_count = 0
854         for worker in self.workers:
855             self.worker_count += worker.count
856         if self.worker_count <= 0:
857             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
858             logger.critical(msg)
859             raise RemoteExecutorException(msg)
860         self.policy.register_worker_pool(self.workers)
861         self.cv = threading.Condition()
862         logger.debug(
863             'Creating %d local threads, one per remote worker.', self.worker_count
864         )
865         self._helper_executor = fut.ThreadPoolExecutor(
866             thread_name_prefix="remote_executor_helper",
867             max_workers=self.worker_count,
868         )
869         self.status = RemoteExecutorStatus(self.worker_count)
870         self.total_bundles_submitted = 0
871         self.backup_lock = threading.Lock()
872         self.last_backup = None
873         (
874             self.heartbeat_thread,
875             self.heartbeat_stop_event,
876         ) = self._run_periodic_heartbeat()
877         self.already_shutdown = False
878
879     @background_thread
880     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
881         """
882         We create a background thread to invoke :meth:`_heartbeat` regularly
883         while we are scheduling work.  It does some accounting such as
884         looking for slow bundles to tag for backup creation, checking for
885         unexpected failures, and printing a fancy message on stdout.
886         """
887         while not stop_event.is_set():
888             time.sleep(5.0)
889             logger.debug('Running periodic heartbeat code...')
890             self._heartbeat()
891         logger.debug('Periodic heartbeat thread shutting down.')
892
893     def _heartbeat(self) -> None:
894         # Note: this is invoked on a background thread, not an
895         # executor thread.  Be careful what you do with it b/c it
896         # needs to get back and dump status again periodically.
897         with self.status.lock:
898             self.status.periodic_dump(self.total_bundles_submitted)
899
900             # Look for bundles to reschedule via executor.submit
901             if config.config['executors_schedule_remote_backups']:
902                 self._maybe_schedule_backup_bundles()
903
904     def _maybe_schedule_backup_bundles(self):
905         """Maybe schedule backup bundles if we see a very slow bundle."""
906
907         assert self.status.lock.locked()
908         num_done = len(self.status.finished_bundle_timings)
909         num_idle_workers = self.worker_count - self.task_count
910         now = time.time()
911         if (
912             num_done >= 2
913             and num_idle_workers > 0
914             and (self.last_backup is None or (now - self.last_backup > 9.0))
915             and self.backup_lock.acquire(blocking=False)
916         ):
917             try:
918                 assert self.backup_lock.locked()
919
920                 bundle_to_backup = None
921                 best_score = None
922                 for (
923                     worker,
924                     bundle_uuids,
925                 ) in self.status.in_flight_bundles_by_worker.items():
926
927                     # Prefer to schedule backups of bundles running on
928                     # slower machines.
929                     base_score = 0
930                     for record in self.workers:
931                         if worker.machine == record.machine:
932                             base_score = float(record.weight)
933                             base_score = 1.0 / base_score
934                             base_score *= 200.0
935                             base_score = int(base_score)
936                             break
937
938                     for uuid in bundle_uuids:
939                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
940                         if (
941                             bundle is not None
942                             and bundle.src_bundle is None
943                             and bundle.backup_bundles is not None
944                         ):
945                             score = base_score
946
947                             # Schedule backups of bundles running
948                             # longer; especially those that are
949                             # unexpectedly slow.
950                             start_ts = self.status.start_per_bundle[uuid]
951                             if start_ts is not None:
952                                 runtime = now - start_ts
953                                 score += runtime
954                                 logger.debug(
955                                     'score[%s] => %.1f  # latency boost', bundle, score
956                                 )
957
958                                 if bundle.slower_than_local_p95:
959                                     score += runtime / 2
960                                     logger.debug(
961                                         'score[%s] => %.1f  # >worker p95',
962                                         bundle,
963                                         score,
964                                     )
965
966                                 if bundle.slower_than_global_p95:
967                                     score += runtime / 4
968                                     logger.debug(
969                                         'score[%s] => %.1f  # >global p95',
970                                         bundle,
971                                         score,
972                                     )
973
974                             # Prefer backups of bundles that don't
975                             # have backups already.
976                             backup_count = len(bundle.backup_bundles)
977                             if backup_count == 0:
978                                 score *= 2
979                             elif backup_count == 1:
980                                 score /= 2
981                             elif backup_count == 2:
982                                 score /= 8
983                             else:
984                                 score = 0
985                             logger.debug(
986                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
987                                 bundle,
988                                 score,
989                             )
990
991                             if score != 0 and (
992                                 best_score is None or score > best_score
993                             ):
994                                 bundle_to_backup = bundle
995                                 assert bundle is not None
996                                 assert bundle.backup_bundles is not None
997                                 assert bundle.src_bundle is None
998                                 best_score = score
999
1000                 # Note: this is all still happening on the heartbeat
1001                 # runner thread.  That's ok because
1002                 # _schedule_backup_for_bundle uses the executor to
1003                 # submit the bundle again which will cause it to be
1004                 # picked up by a worker thread and allow this thread
1005                 # to return to run future heartbeats.
1006                 if bundle_to_backup is not None:
1007                     self.last_backup = now
1008                     logger.info(
1009                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1010                         bundle_to_backup,
1011                         best_score,
1012                     )
1013                     self._schedule_backup_for_bundle(bundle_to_backup)
1014             finally:
1015                 self.backup_lock.release()
1016
1017     def _is_worker_available(self) -> bool:
1018         """Is there a worker available currently?"""
1019         return self.policy.is_worker_available()
1020
1021     def _acquire_worker(
1022         self, machine_to_avoid: str = None
1023     ) -> Optional[RemoteWorkerRecord]:
1024         """Try to acquire a worker."""
1025         return self.policy.acquire_worker(machine_to_avoid)
1026
1027     def _find_available_worker_or_block(
1028         self, machine_to_avoid: str = None
1029     ) -> RemoteWorkerRecord:
1030         """Find a worker or block until one becomes available."""
1031         with self.cv:
1032             while not self._is_worker_available():
1033                 self.cv.wait()
1034             worker = self._acquire_worker(machine_to_avoid)
1035             if worker is not None:
1036                 return worker
1037         msg = "We should never reach this point in the code"
1038         logger.critical(msg)
1039         raise Exception(msg)
1040
1041     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1042         """Release a previously acquired worker."""
1043         worker = bundle.worker
1044         assert worker is not None
1045         logger.debug('Released worker %s', worker)
1046         self.status.record_release_worker(
1047             worker,
1048             bundle.uuid,
1049             was_cancelled,
1050         )
1051         with self.cv:
1052             worker.count += 1
1053             self.cv.notify()
1054         self.adjust_task_count(-1)
1055
1056     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1057         """See if a particular bundle is cancelled.  Do not block."""
1058         with self.status.lock:
1059             if bundle.is_cancelled.wait(timeout=0.0):
1060                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1061                 bundle.was_cancelled = True
1062                 return True
1063         return False
1064
1065     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1066         """Find a worker for bundle or block until one is available."""
1067
1068         self.adjust_task_count(+1)
1069         uuid = bundle.uuid
1070         hostname = bundle.hostname
1071         avoid_machine = override_avoid_machine
1072         is_original = bundle.src_bundle is None
1073
1074         # Try not to schedule a backup on the same host as the original.
1075         if avoid_machine is None and bundle.src_bundle is not None:
1076             avoid_machine = bundle.src_bundle.machine
1077         worker = None
1078         while worker is None:
1079             worker = self._find_available_worker_or_block(avoid_machine)
1080         assert worker is not None
1081
1082         # Ok, found a worker.
1083         bundle.worker = worker
1084         machine = bundle.machine = worker.machine
1085         username = bundle.username = worker.username
1086         self.status.record_acquire_worker(worker, uuid)
1087         logger.debug('%s: Running bundle on %s...', bundle, worker)
1088
1089         # Before we do any work, make sure the bundle is still viable.
1090         # It may have been some time between when it was submitted and
1091         # now due to lack of worker availability and someone else may
1092         # have already finished it.
1093         if self._check_if_cancelled(bundle):
1094             try:
1095                 return self._process_work_result(bundle)
1096             except Exception as e:
1097                 logger.warning(
1098                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1099                 )
1100                 self._release_worker(bundle)
1101                 if is_original:
1102                     # Weird.  We are the original owner of this
1103                     # bundle.  For it to have been cancelled, a backup
1104                     # must have already started and completed before
1105                     # we even for started.  Moreover, the backup says
1106                     # it is done but we can't find the results it
1107                     # should have copied over.  Reschedule the whole
1108                     # thing.
1109                     logger.exception(e)
1110                     logger.error(
1111                         '%s: We are the original owner thread and yet there are '
1112                         'no results for this bundle.  This is unexpected and bad.',
1113                         bundle,
1114                     )
1115                     return self._emergency_retry_nasty_bundle(bundle)
1116                 else:
1117                     # We're a backup and our bundle is cancelled
1118                     # before we even got started.  Do nothing and let
1119                     # the original bundle's thread worry about either
1120                     # finding the results or complaining about it.
1121                     return None
1122
1123         # Send input code / data to worker machine if it's not local.
1124         if hostname not in machine:
1125             try:
1126                 cmd = (
1127                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1128                 )
1129                 start_ts = time.time()
1130                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1131                 run_silently(cmd)
1132                 xfer_latency = time.time() - start_ts
1133                 logger.debug(
1134                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1135                 )
1136             except Exception as e:
1137                 self._release_worker(bundle)
1138                 if is_original:
1139                     # Weird.  We tried to copy the code to the worker
1140                     # and it failed...  And we're the original bundle.
1141                     # We have to retry.
1142                     logger.exception(e)
1143                     logger.error(
1144                         "%s: Failed to send instructions to the worker machine?! "
1145                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1146                         "be a race condition.  Attempting an emergency retry...",
1147                         bundle,
1148                     )
1149                     return self._emergency_retry_nasty_bundle(bundle)
1150                 else:
1151                     # This is actually expected; we're a backup.
1152                     # There's a race condition where someone else
1153                     # already finished the work and removed the source
1154                     # code_file before we could copy it.  Ignore.
1155                     logger.warning(
1156                         '%s: Failed to send instructions to the worker machine... '
1157                         'We\'re a backup and this may be caused by the original (or '
1158                         'some other backup) already finishing this work.  Ignoring.',
1159                         bundle,
1160                     )
1161                     return None
1162
1163         # Kick off the work.  Note that if this fails we let
1164         # _wait_for_process deal with it.
1165         self.status.record_processing_began(uuid)
1166         helper_path = config.config['remote_worker_helper_path']
1167         cmd = (
1168             f'{SSH} {bundle.username}@{bundle.machine} '
1169             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1170         )
1171         logger.debug(
1172             '%s: Executing %s in the background to kick off work...', bundle, cmd
1173         )
1174         p = cmd_in_background(cmd, silent=True)
1175         bundle.pid = p.pid
1176         logger.debug(
1177             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1178         )
1179         return self._wait_for_process(p, bundle, 0)
1180
1181     def _wait_for_process(
1182         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1183     ) -> Any:
1184         """At this point we've copied the bundle's pickled code to the remote
1185         worker and started an ssh process that should be invoking the
1186         remote worker to have it execute the user's code.  See how
1187         that's going and wait for it to complete or fail.  Note that
1188         this code is recursive: there are codepaths where we decide to
1189         stop waiting for an ssh process (because another backup seems
1190         to have finished) but then fail to fetch or parse the results
1191         from that backup and thus call ourselves to continue waiting
1192         on an active ssh process.  This is the purpose of the depth
1193         argument: to curtail potential infinite recursion by giving up
1194         eventually.
1195
1196         Args:
1197             p: the Popen record of the ssh job
1198             bundle: the bundle of work being executed remotely
1199             depth: how many retries we've made so far.  Starts at zero.
1200
1201         """
1202
1203         machine = bundle.machine
1204         assert p is not None
1205         pid = p.pid  # pid of the ssh process
1206         if depth > 3:
1207             logger.error(
1208                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1209                 pid,
1210             )
1211             p.terminate()
1212             self._release_worker(bundle)
1213             return self._emergency_retry_nasty_bundle(bundle)
1214
1215         # Spin until either the ssh job we scheduled finishes the
1216         # bundle or some backup worker signals that they finished it
1217         # before we could.
1218         while True:
1219             try:
1220                 p.wait(timeout=0.25)
1221             except subprocess.TimeoutExpired:
1222                 if self._check_if_cancelled(bundle):
1223                     logger.info(
1224                         '%s: looks like another worker finished bundle...', bundle
1225                     )
1226                     break
1227             else:
1228                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1229                 p = None
1230                 break
1231
1232         # If we get here we believe the bundle is done; either the ssh
1233         # subprocess finished (hopefully successfully) or we noticed
1234         # that some other worker seems to have completed the bundle
1235         # before us and we're bailing out.
1236         try:
1237             ret = self._process_work_result(bundle)
1238             if ret is not None and p is not None:
1239                 p.terminate()
1240             return ret
1241
1242         # Something went wrong; e.g. we could not copy the results
1243         # back, cleanup after ourselves on the remote machine, or
1244         # unpickle the results we got from the remove machine.  If we
1245         # still have an active ssh subprocess, keep waiting on it.
1246         # Otherwise, time for an emergency reschedule.
1247         except Exception as e:
1248             logger.exception(e)
1249             logger.error('%s: Something unexpected just happened...', bundle)
1250             if p is not None:
1251                 logger.warning(
1252                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1253                     bundle,
1254                 )
1255                 return self._wait_for_process(p, bundle, depth + 1)
1256             else:
1257                 self._release_worker(bundle)
1258                 return self._emergency_retry_nasty_bundle(bundle)
1259
1260     def _process_work_result(self, bundle: BundleDetails) -> Any:
1261         """A bundle seems to be completed.  Check on the results."""
1262
1263         with self.status.lock:
1264             is_original = bundle.src_bundle is None
1265             was_cancelled = bundle.was_cancelled
1266             username = bundle.username
1267             machine = bundle.machine
1268             result_file = bundle.result_file
1269             code_file = bundle.code_file
1270
1271             # Whether original or backup, if we finished first we must
1272             # fetch the results if the computation happened on a
1273             # remote machine.
1274             bundle.end_ts = time.time()
1275             if not was_cancelled:
1276                 assert bundle.machine is not None
1277                 if bundle.hostname not in bundle.machine:
1278                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1279                     logger.info(
1280                         "%s: Fetching results back from %s@%s via %s",
1281                         bundle,
1282                         username,
1283                         machine,
1284                         cmd,
1285                     )
1286
1287                     # If either of these throw they are handled in
1288                     # _wait_for_process.
1289                     attempts = 0
1290                     while True:
1291                         try:
1292                             run_silently(cmd)
1293                         except Exception as e:
1294                             attempts += 1
1295                             if attempts >= 3:
1296                                 raise e
1297                         else:
1298                             break
1299
1300                     # Cleanup remote /tmp files.
1301                     run_silently(
1302                         f'{SSH} {username}@{machine}'
1303                         f' "/bin/rm -f {code_file} {result_file}"'
1304                     )
1305                     logger.debug(
1306                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1307                     )
1308                 dur = bundle.end_ts - bundle.start_ts
1309                 self.histogram.add_item(dur)
1310
1311         # Only the original worker should unpickle the file contents
1312         # though since it's the only one whose result matters.  The
1313         # original is also the only job that may delete result_file
1314         # from disk.  Note that the original may have been cancelled
1315         # if one of the backups finished first; it still must read the
1316         # result from disk.  It still does that here with is_cancelled
1317         # set.
1318         if is_original:
1319             logger.debug("%s: Unpickling %s.", bundle, result_file)
1320             try:
1321                 with open(result_file, 'rb') as rb:
1322                     serialized = rb.read()
1323                 result = cloudpickle.loads(serialized)
1324             except Exception as e:
1325                 logger.exception(e)
1326                 logger.error('Failed to load %s... this is bad news.', result_file)
1327                 self._release_worker(bundle)
1328
1329                 # Re-raise the exception; the code in _wait_for_process may
1330                 # decide to _emergency_retry_nasty_bundle here.
1331                 raise e
1332             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1333             os.remove(result_file)
1334             os.remove(code_file)
1335
1336             # Notify any backups that the original is done so they
1337             # should stop ASAP.  Do this whether or not we
1338             # finished first since there could be more than one
1339             # backup.
1340             if bundle.backup_bundles is not None:
1341                 for backup in bundle.backup_bundles:
1342                     logger.debug(
1343                         '%s: Notifying backup %s that it\'s cancelled',
1344                         bundle,
1345                         backup.uuid,
1346                     )
1347                     backup.is_cancelled.set()
1348
1349         # This is a backup job and, by now, we have already fetched
1350         # the bundle results.
1351         else:
1352             # Backup results don't matter, they just need to leave the
1353             # result file in the right place for their originals to
1354             # read/unpickle later.
1355             result = None
1356
1357             # Tell the original to stop if we finished first.
1358             if not was_cancelled:
1359                 orig_bundle = bundle.src_bundle
1360                 assert orig_bundle is not None
1361                 logger.debug(
1362                     '%s: Notifying original %s we beat them to it.',
1363                     bundle,
1364                     orig_bundle.uuid,
1365                 )
1366                 orig_bundle.is_cancelled.set()
1367         self._release_worker(bundle, was_cancelled=was_cancelled)
1368         return result
1369
1370     def _create_original_bundle(self, pickle, function_name: str):
1371         """Creates a bundle that is not a backup of any other bundle but
1372         rather represents a user task.
1373         """
1374
1375         uuid = string_utils.generate_uuid(omit_dashes=True)
1376         code_file = f'/tmp/{uuid}.code.bin'
1377         result_file = f'/tmp/{uuid}.result.bin'
1378
1379         logger.debug('Writing pickled code to %s', code_file)
1380         with open(code_file, 'wb') as wb:
1381             wb.write(pickle)
1382
1383         bundle = BundleDetails(
1384             pickled_code=pickle,
1385             uuid=uuid,
1386             function_name=function_name,
1387             worker=None,
1388             username=None,
1389             machine=None,
1390             hostname=platform.node(),
1391             code_file=code_file,
1392             result_file=result_file,
1393             pid=0,
1394             start_ts=time.time(),
1395             end_ts=0.0,
1396             slower_than_local_p95=False,
1397             slower_than_global_p95=False,
1398             src_bundle=None,
1399             is_cancelled=threading.Event(),
1400             was_cancelled=False,
1401             backup_bundles=[],
1402             failure_count=0,
1403         )
1404         self.status.record_bundle_details(bundle)
1405         logger.debug('%s: Created an original bundle', bundle)
1406         return bundle
1407
1408     def _create_backup_bundle(self, src_bundle: BundleDetails):
1409         """Creates a bundle that is a backup of another bundle that is
1410         running too slowly."""
1411
1412         assert self.status.lock.locked()
1413         assert src_bundle.backup_bundles is not None
1414         n = len(src_bundle.backup_bundles)
1415         uuid = src_bundle.uuid + f'_backup#{n}'
1416
1417         backup_bundle = BundleDetails(
1418             pickled_code=src_bundle.pickled_code,
1419             uuid=uuid,
1420             function_name=src_bundle.function_name,
1421             worker=None,
1422             username=None,
1423             machine=None,
1424             hostname=src_bundle.hostname,
1425             code_file=src_bundle.code_file,
1426             result_file=src_bundle.result_file,
1427             pid=0,
1428             start_ts=time.time(),
1429             end_ts=0.0,
1430             slower_than_local_p95=False,
1431             slower_than_global_p95=False,
1432             src_bundle=src_bundle,
1433             is_cancelled=threading.Event(),
1434             was_cancelled=False,
1435             backup_bundles=None,  # backup backups not allowed
1436             failure_count=0,
1437         )
1438         src_bundle.backup_bundles.append(backup_bundle)
1439         self.status.record_bundle_details_already_locked(backup_bundle)
1440         logger.debug('%s: Created a backup bundle', backup_bundle)
1441         return backup_bundle
1442
1443     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1444         """Schedule a backup of src_bundle."""
1445
1446         assert self.status.lock.locked()
1447         assert src_bundle is not None
1448         backup_bundle = self._create_backup_bundle(src_bundle)
1449         logger.debug(
1450             '%s/%s: Scheduling backup for execution...',
1451             backup_bundle.uuid,
1452             backup_bundle.function_name,
1453         )
1454         self._helper_executor.submit(self._launch, backup_bundle)
1455
1456         # Results from backups don't matter; if they finish first
1457         # they will move the result_file to this machine and let
1458         # the original pick them up and unpickle them (and return
1459         # a result).
1460
1461     def _emergency_retry_nasty_bundle(
1462         self, bundle: BundleDetails
1463     ) -> Optional[fut.Future]:
1464         """Something unexpectedly failed with bundle.  Either retry it
1465         from the beginning or throw in the towel and give up on it."""
1466
1467         is_original = bundle.src_bundle is None
1468         bundle.worker = None
1469         avoid_last_machine = bundle.machine
1470         bundle.machine = None
1471         bundle.username = None
1472         bundle.failure_count += 1
1473         if is_original:
1474             retry_limit = 3
1475         else:
1476             retry_limit = 2
1477
1478         if bundle.failure_count > retry_limit:
1479             logger.error(
1480                 '%s: Tried this bundle too many times already (%dx); giving up.',
1481                 bundle,
1482                 retry_limit,
1483             )
1484             if is_original:
1485                 raise RemoteExecutorException(
1486                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1487                 )
1488             else:
1489                 logger.error(
1490                     '%s: At least it\'s only a backup; better luck with the others.',
1491                     bundle,
1492                 )
1493             return None
1494         else:
1495             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1496             logger.warning(msg)
1497             warnings.warn(msg)
1498             return self._launch(bundle, avoid_last_machine)
1499
1500     @overrides
1501     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502         """Submit work to be done.  This is the user entry point of this
1503         class."""
1504         if self.already_shutdown:
1505             raise Exception('Submitted work after shutdown.')
1506         pickle = _make_cloud_pickle(function, *args, **kwargs)
1507         bundle = self._create_original_bundle(pickle, function.__name__)
1508         self.total_bundles_submitted += 1
1509         return self._helper_executor.submit(self._launch, bundle)
1510
1511     @overrides
1512     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513         """Shutdown the executor."""
1514         if not self.already_shutdown:
1515             logging.debug('Shutting down RemoteExecutor %s', self.title)
1516             self.heartbeat_stop_event.set()
1517             self.heartbeat_thread.join()
1518             self._helper_executor.shutdown(wait)
1519             if not quiet:
1520                 print(self.histogram.__repr__(label_formatter='%ds'))
1521             self.already_shutdown = True
1522
1523
1524 class RemoteWorkerPoolProvider:
1525     @abstractmethod
1526     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1527         pass
1528
1529
1530 @persistent.persistent_autoloaded_singleton()  # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1533 ):
1534     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535         self.remote_worker_pool = []
1536         for record in json_remote_worker_pool['remote_worker_records']:
1537             self.remote_worker_pool.append(
1538                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1539             )
1540         assert len(self.remote_worker_pool) > 0
1541
1542     @overrides
1543     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544         return self.remote_worker_pool
1545
1546     @overrides
1547     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548         return self.remote_worker_pool
1549
1550     @staticmethod
1551     @overrides
1552     def get_filename() -> str:
1553         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1554
1555     @staticmethod
1556     @overrides
1557     def should_we_load_data(filename: str) -> bool:
1558         return True
1559
1560     @staticmethod
1561     @overrides
1562     def should_we_save_data(filename: str) -> bool:
1563         return False
1564
1565
1566 @singleton
1567 class DefaultExecutors(object):
1568     """A container for a default thread, process and remote executor.
1569     These are not created until needed and we take care to clean up
1570     before process exit automatically for the caller's convenience.
1571     Instead of creating your own executor, consider using the one
1572     from this pool.  e.g.::
1573
1574         @par.parallelize(method=par.Method.PROCESS)
1575         def do_work(
1576             solutions: List[Work],
1577             shard_num: int,
1578             ...
1579         ):
1580             <do the work>
1581
1582
1583         def start_do_work(all_work: List[Work]):
1584             shards = []
1585             logger.debug('Sharding work into groups of 10.')
1586             for subset in list_utils.shard(all_work, 10):
1587                 shards.append([x for x in subset])
1588
1589             logger.debug('Kicking off helper pool.')
1590             try:
1591                 for n, shard in enumerate(shards):
1592                     results.append(
1593                         do_work(
1594                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1595                         )
1596                     )
1597                 smart_future.wait_all(results)
1598             finally:
1599                 # Note: if you forget to do this it will clean itself up
1600                 # during program termination including tearing down any
1601                 # active ssh connections.
1602                 executors.DefaultExecutors().process_pool().shutdown()
1603     """
1604
1605     def __init__(self):
1606         self.thread_executor: Optional[ThreadExecutor] = None
1607         self.process_executor: Optional[ProcessExecutor] = None
1608         self.remote_executor: Optional[RemoteExecutor] = None
1609
1610     @staticmethod
1611     def _ping(host) -> bool:
1612         logger.debug('RUN> ping -c 1 %s', host)
1613         try:
1614             x = cmd_exitcode(
1615                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1616             )
1617             return x == 0
1618         except Exception:
1619             return False
1620
1621     def thread_pool(self) -> ThreadExecutor:
1622         if self.thread_executor is None:
1623             self.thread_executor = ThreadExecutor()
1624         return self.thread_executor
1625
1626     def process_pool(self) -> ProcessExecutor:
1627         if self.process_executor is None:
1628             self.process_executor = ProcessExecutor()
1629         return self.process_executor
1630
1631     def remote_pool(self) -> RemoteExecutor:
1632         if self.remote_executor is None:
1633             logger.info('Looking for some helper machines...')
1634             provider = ConfigRemoteWorkerPoolProvider()
1635             all_machines = provider.get_remote_workers()
1636             pool = []
1637
1638             # Make sure we can ping each machine.
1639             for record in all_machines:
1640                 if self._ping(record.machine):
1641                     logger.info('%s is alive / responding to pings', record.machine)
1642                     pool.append(record)
1643
1644             # The controller machine has a lot to do; go easy on it.
1645             for record in pool:
1646                 if record.machine == platform.node() and record.count > 1:
1647                     logger.info('Reducing workload for %s.', record.machine)
1648                     record.count = max(int(record.count / 2), 1)
1649
1650             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651             policy.register_worker_pool(pool)
1652             self.remote_executor = RemoteExecutor(pool, policy)
1653         return self.remote_executor
1654
1655     def shutdown(self) -> None:
1656         if self.thread_executor is not None:
1657             self.thread_executor.shutdown(wait=True, quiet=True)
1658             self.thread_executor = None
1659         if self.process_executor is not None:
1660             self.process_executor.shutdown(wait=True, quiet=True)
1661             self.process_executor = None
1662         if self.remote_executor is not None:
1663             self.remote_executor.shutdown(wait=True, quiet=True)
1664             self.remote_executor = None