I guess it's 2023 now...
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2023, Scott Gasch
5
6 """
7 This module defines a :class:`BaseExecutor` interface and three
8 implementations:
9
10     - :class:`ThreadExecutor`
11     - :class:`ProcessExecutor`
12     - :class:`RemoteExecutor`
13
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work.  Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code.  But it's good for
18 work that is mostly I/O bound.
19
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
22 workloads.
23
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network.  You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
29
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed.  It has the added benefit of being automatically
33 cleaned up at process termination time.
34
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
37 """
38
39 from __future__ import annotations
40
41 import concurrent.futures as fut
42 import logging
43 import os
44 import platform
45 import random
46 import subprocess
47 import threading
48 import time
49 import warnings
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
54
55 import cloudpickle  # type: ignore
56 from overrides import overrides
57
58 import pyutils.types.histogram as hist
59 from pyutils import (
60     argparse_utils,
61     config,
62     dataclass_utils,
63     math_utils,
64     persistent,
65     string_utils,
66 )
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
72
73 logger = logging.getLogger(__name__)
74
75 parser = config.add_commandline_args(
76     f"Executors ({__file__})", "Args related to processing executors."
77 )
78 parser.add_argument(
79     '--executors_threadpool_size',
80     type=int,
81     metavar='#THREADS',
82     help='Number of threads in the default threadpool, leave unset for default',
83     default=None,
84 )
85 parser.add_argument(
86     '--executors_processpool_size',
87     type=int,
88     metavar='#PROCESSES',
89     help='Number of processes in the default processpool, leave unset for default',
90     default=None,
91 )
92 parser.add_argument(
93     '--executors_schedule_remote_backups',
94     default=True,
95     action=argparse_utils.ActionNoYes,
96     help='Should we schedule duplicative backup work if a remote bundle is slow',
97 )
98 parser.add_argument(
99     '--executors_max_bundle_failures',
100     type=int,
101     default=3,
102     metavar='#FAILURES',
103     help='Maximum number of failures before giving up on a bundle',
104 )
105 parser.add_argument(
106     '--remote_worker_records_file',
107     type=str,
108     metavar='FILENAME',
109     help='Path of the remote worker records file (JSON)',
110     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
111 )
112 parser.add_argument(
113     '--remote_worker_helper_path',
114     type=str,
115     metavar='PATH_TO_REMOTE_WORKER_PY',
116     help='Path to remote_worker.py on remote machines',
117     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
118 )
119
120
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
123
124
125 def _make_cloud_pickle(fun, *args, **kwargs):
126     """Internal helper to create cloud pickles."""
127     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128     return cloudpickle.dumps((fun, args, kwargs))
129
130
131 class BaseExecutor(ABC):
132     """The base executor interface definition.  The interface for
133     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134     :class:`ThreadExecutor`.
135     """
136
137     def __init__(self, *, title=''):
138         """
139         Args:
140             title: the name of this executor.
141         """
142         self.title = title
143         self.histogram = hist.SimpleHistogram(
144             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
145         )
146         self.task_count = 0
147
148     @abstractmethod
149     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150         """Submit work for the executor to do.
151
152         Args:
153             function: the Callable to be executed.
154             *args: the arguments to function
155             **kwargs: the arguments to function
156
157         Returns:
158             A concurrent :class:`Future` representing the result of the
159             work.
160         """
161         pass
162
163     @abstractmethod
164     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165         """Shutdown the executor.
166
167         Args:
168             wait: wait for the shutdown to complete before returning?
169             quiet: keep it quiet, please.
170         """
171         pass
172
173     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174         """Shutdown the executor and return True if the executor is idle
175         (i.e. there are no pending or active tasks).  Return False
176         otherwise.  Note: this should only be called by the launcher
177         process.
178
179         Args:
180             quiet: keep it quiet, please.
181
182         Returns:
183             True if the executor could be shut down because it has no
184             pending work, False otherwise.
185         """
186         if self.task_count == 0:
187             self.shutdown(wait=True, quiet=quiet)
188             return True
189         return False
190
191     def adjust_task_count(self, delta: int) -> None:
192         """Change the task count.  Note: do not call this method from a
193         worker, it should only be called by the launcher process /
194         thread / machine.
195
196         Args:
197             delta: the delta value by which to adjust task count.
198         """
199         self.task_count += delta
200         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
201
202     def get_task_count(self) -> int:
203         """Change the task count.  Note: do not call this method from a
204         worker, it should only be called by the launcher process /
205         thread / machine.
206
207         Returns:
208             The executor's current task count.
209         """
210         return self.task_count
211
212
213 class ThreadExecutor(BaseExecutor):
214     """A threadpool executor.  This executor uses Python threads to
215     schedule tasks.  Note that, at least as of python3.10, because of
216     the global lock in the interpreter itself, these do not
217     parallelize very well so this class is useful mostly for non-CPU
218     intensive tasks.
219
220     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
221     """
222
223     def __init__(self, max_workers: Optional[int] = None):
224         """
225         Args:
226             max_workers: maximum number of threads to create in the pool.
227         """
228         super().__init__()
229         workers = None
230         if max_workers is not None:
231             workers = max_workers
232         elif 'executors_threadpool_size' in config.config:
233             workers = config.config['executors_threadpool_size']
234         if workers is not None:
235             logger.debug('Creating threadpool executor with %d workers', workers)
236         else:
237             logger.debug('Creating a default sized threadpool executor')
238         self._thread_pool_executor = fut.ThreadPoolExecutor(
239             max_workers=workers, thread_name_prefix="thread_executor_helper"
240         )
241         self.already_shutdown = False
242
243     # This is run on a different thread; do not adjust task count here.
244     @staticmethod
245     def _run_local_bundle(fun, *args, **kwargs):
246         logger.debug("Running local bundle at %s", fun.__name__)
247         result = fun(*args, **kwargs)
248         return result
249
250     @overrides
251     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252         if self.already_shutdown:
253             raise Exception('Submitted work after shutdown.')
254         self.adjust_task_count(+1)
255         newargs = []
256         newargs.append(function)
257         for arg in args:
258             newargs.append(arg)
259         start = time.time()
260         result = self._thread_pool_executor.submit(
261             ThreadExecutor._run_local_bundle, *newargs, **kwargs
262         )
263         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264         result.add_done_callback(lambda _: self.adjust_task_count(-1))
265         return result
266
267     @overrides
268     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269         if not self.already_shutdown:
270             logger.debug('Shutting down threadpool executor %s', self.title)
271             self._thread_pool_executor.shutdown(wait)
272             if not quiet:
273                 print(self.histogram.__repr__(label_formatter='%ds'))
274             self.already_shutdown = True
275
276
277 class ProcessExecutor(BaseExecutor):
278     """An executor which runs tasks in child processes.
279
280     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
281     """
282
283     def __init__(self, max_workers=None):
284         """
285         Args:
286             max_workers: the max number of worker processes to create.
287         """
288         super().__init__()
289         workers = None
290         if max_workers is not None:
291             workers = max_workers
292         elif 'executors_processpool_size' in config.config:
293             workers = config.config['executors_processpool_size']
294         if workers is not None:
295             logger.debug('Creating processpool executor with %d workers.', workers)
296         else:
297             logger.debug('Creating a default sized processpool executor')
298         self._process_executor = fut.ProcessPoolExecutor(
299             max_workers=workers,
300         )
301         self.already_shutdown = False
302
303     # This is run in another process; do not adjust task count here.
304     @staticmethod
305     def _run_cloud_pickle(pickle):
306         fun, args, kwargs = cloudpickle.loads(pickle)
307         logger.debug("Running pickled bundle at %s", fun.__name__)
308         result = fun(*args, **kwargs)
309         return result
310
311     @overrides
312     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313         if self.already_shutdown:
314             raise Exception('Submitted work after shutdown.')
315         start = time.time()
316         self.adjust_task_count(+1)
317         pickle = _make_cloud_pickle(function, *args, **kwargs)
318         result = self._process_executor.submit(
319             ProcessExecutor._run_cloud_pickle, pickle
320         )
321         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322         result.add_done_callback(lambda _: self.adjust_task_count(-1))
323         return result
324
325     @overrides
326     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327         if not self.already_shutdown:
328             logger.debug('Shutting down processpool executor %s', self.title)
329             self._process_executor.shutdown(wait)
330             if not quiet:
331                 print(self.histogram.__repr__(label_formatter='%ds'))
332             self.already_shutdown = True
333
334     def __getstate__(self):
335         state = self.__dict__.copy()
336         state['_process_executor'] = None
337         return state
338
339
340 class RemoteExecutorException(Exception):
341     """Thrown when a bundle cannot be executed despite several retries."""
342
343     pass
344
345
346 @dataclass
347 class RemoteWorkerRecord:
348     """A record of info about a remote worker."""
349
350     username: str
351     """Username we can ssh into on this machine to run work."""
352
353     machine: str
354     """Machine address / name."""
355
356     weight: int
357     """Relative probability for the weighted policy to select this
358     machine for scheduling work."""
359
360     count: int
361     """If this machine is selected, what is the maximum number of task
362     that it can handle?"""
363
364     def __hash__(self):
365         return hash((self.username, self.machine))
366
367     def __repr__(self):
368         return f'{self.username}@{self.machine}'
369
370
371 @dataclass
372 class BundleDetails:
373     """All info necessary to define some unit of work that needs to be
374     done, where it is being run, its state, whether it is an original
375     bundle of a backup bundle, how many times it has failed, etc...
376     """
377
378     pickled_code: bytes
379     """The code to run, cloud pickled"""
380
381     uuid: str
382     """A unique identifier"""
383
384     function_name: str
385     """The name of the function we pickled"""
386
387     worker: Optional[RemoteWorkerRecord]
388     """The remote worker running this bundle or None if none (yet)"""
389
390     username: Optional[str]
391     """The remote username running this bundle or None if none (yet)"""
392
393     machine: Optional[str]
394     """The remote machine running this bundle or None if none (yet)"""
395
396     controller: str
397     """The controller machine"""
398
399     code_file: str
400     """A unique filename to hold the work to be done"""
401
402     result_file: str
403     """Where the results should be placed / read from"""
404
405     pid: int
406     """The process id of the local subprocess watching the ssh connection
407     to the remote machine"""
408
409     start_ts: float
410     """Starting time"""
411
412     end_ts: float
413     """Ending time"""
414
415     slower_than_local_p95: bool
416     """Currently slower then 95% of other bundles on remote host"""
417
418     slower_than_global_p95: bool
419     """Currently slower than 95% of other bundles globally"""
420
421     src_bundle: Optional[BundleDetails]
422     """If this is a backup bundle, this points to the original bundle
423     that it's backing up.  None otherwise."""
424
425     is_cancelled: threading.Event
426     """An event that can be signaled to indicate this bundle is cancelled.
427     This is set when another copy (backup or original) of this work has
428     completed successfully elsewhere."""
429
430     was_cancelled: bool
431     """True if this bundle was cancelled, False if it finished normally"""
432
433     backup_bundles: Optional[List[BundleDetails]]
434     """If we've created backups of this bundle, this is the list of them"""
435
436     failure_count: int
437     """How many times has this bundle failed already?"""
438
439     def __repr__(self):
440         uuid = self.uuid
441         if uuid[-9:-2] == '_backup':
442             uuid = uuid[:-9]
443             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
444         else:
445             suffix = uuid[-6:]
446
447         # We colorize the uuid based on some bits from it to make them
448         # stand out in the logging and help a reader correlate log messages
449         # related to the same bundle.
450         colorz = [
451             fg('violet red'),
452             fg('red'),
453             fg('orange'),
454             fg('peach orange'),
455             fg('yellow'),
456             fg('marigold yellow'),
457             fg('green yellow'),
458             fg('tea green'),
459             fg('cornflower blue'),
460             fg('turquoise blue'),
461             fg('tropical blue'),
462             fg('lavender purple'),
463             fg('medium purple'),
464         ]
465         c = colorz[int(uuid[-2:], 16) % len(colorz)]
466         function_name = (
467             self.function_name if self.function_name is not None else 'nofname'
468         )
469         machine = self.machine if self.machine is not None else 'nomachine'
470         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
471
472
473 class RemoteExecutorStatus:
474     """A status 'scoreboard' for a remote executor tracking various
475     metrics and able to render a periodic dump of global state.
476     """
477
478     def __init__(self, total_worker_count: int) -> None:
479         """
480         Args:
481             total_worker_count: number of workers in the pool
482         """
483         self.worker_count: int = total_worker_count
484         self.known_workers: Set[RemoteWorkerRecord] = set()
485         self.start_time: float = time.time()
486         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487         self.end_per_bundle: Dict[str, float] = defaultdict(float)
488         self.finished_bundle_timings_per_worker: Dict[
489             RemoteWorkerRecord, math_utils.NumericPopulation
490         ] = {}
491         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493         self.finished_bundle_timings: math_utils.NumericPopulation = (
494             math_utils.NumericPopulation()
495         )
496         self.last_periodic_dump: Optional[float] = None
497         self.total_bundles_submitted: int = 0
498
499         # Protects reads and modification using self.  Also used
500         # as a memory fence for modifications to bundle.
501         self.lock: threading.Lock = threading.Lock()
502
503     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504         """Record that bundle with uuid is assigned to a particular worker.
505
506         Args:
507             worker: the record of the worker to which uuid is assigned
508             uuid: the uuid of a bundle that has been assigned to a worker
509         """
510         with self.lock:
511             self.record_acquire_worker_already_locked(worker, uuid)
512
513     def record_acquire_worker_already_locked(
514         self, worker: RemoteWorkerRecord, uuid: str
515     ) -> None:
516         """Same as above but an entry point that doesn't acquire the lock
517         for codepaths where it's already held."""
518         assert self.lock.locked()
519         self.known_workers.add(worker)
520         self.start_per_bundle[uuid] = None
521         x = self.in_flight_bundles_by_worker.get(worker, set())
522         x.add(uuid)
523         self.in_flight_bundles_by_worker[worker] = x
524
525     def record_bundle_details(self, details: BundleDetails) -> None:
526         """Register the details about a bundle of work."""
527         with self.lock:
528             self.record_bundle_details_already_locked(details)
529
530     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531         """Same as above but for codepaths that already hold the lock."""
532         assert self.lock.locked()
533         self.bundle_details_by_uuid[details.uuid] = details
534
535     def record_release_worker(
536         self,
537         worker: RemoteWorkerRecord,
538         uuid: str,
539         was_cancelled: bool,
540     ) -> None:
541         """Record that a bundle has released a worker."""
542         with self.lock:
543             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
544
545     def record_release_worker_already_locked(
546         self,
547         worker: RemoteWorkerRecord,
548         uuid: str,
549         was_cancelled: bool,
550     ) -> None:
551         """Same as above but for codepaths that already hold the lock."""
552         assert self.lock.locked()
553         ts = time.time()
554         self.end_per_bundle[uuid] = ts
555         self.in_flight_bundles_by_worker[worker].remove(uuid)
556         if not was_cancelled:
557             start = self.start_per_bundle[uuid]
558             assert start is not None
559             bundle_latency = ts - start
560             x = self.finished_bundle_timings_per_worker.get(
561                 worker, math_utils.NumericPopulation()
562             )
563             x.add_number(bundle_latency)
564             self.finished_bundle_timings_per_worker[worker] = x
565             self.finished_bundle_timings.add_number(bundle_latency)
566
567     def record_processing_began(self, uuid: str):
568         """Record when work on a bundle begins."""
569         with self.lock:
570             self.start_per_bundle[uuid] = time.time()
571
572     def total_in_flight(self) -> int:
573         """How many bundles are in flight currently?"""
574         assert self.lock.locked()
575         total_in_flight = 0
576         for worker in self.known_workers:
577             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578         return total_in_flight
579
580     def total_idle(self) -> int:
581         """How many idle workers are there currently?"""
582         assert self.lock.locked()
583         return self.worker_count - self.total_in_flight()
584
585     def __repr__(self):
586         assert self.lock.locked()
587         ts = time.time()
588         total_finished = len(self.finished_bundle_timings)
589         total_in_flight = self.total_in_flight()
590         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
591         qall_median = None
592         qall_p95 = None
593         if len(self.finished_bundle_timings) > 1:
594             qall_median = self.finished_bundle_timings.get_median()
595             qall_p95 = self.finished_bundle_timings.get_percentile(95)
596             ret += (
597                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
598                 f'✅={total_finished}/{self.total_bundles_submitted}, '
599                 f'💻n={total_in_flight}/{self.worker_count}\n'
600             )
601         else:
602             ret += (
603                 f'⏱={ts-self.start_time:.1f}s, '
604                 f'✅={total_finished}/{self.total_bundles_submitted}, '
605                 f'💻n={total_in_flight}/{self.worker_count}\n'
606             )
607
608         for worker in self.known_workers:
609             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
610             timings = self.finished_bundle_timings_per_worker.get(
611                 worker, math_utils.NumericPopulation()
612             )
613             count = len(timings)
614             qworker_median = None
615             qworker_p95 = None
616             if count > 1:
617                 qworker_median = timings.get_median()
618                 qworker_p95 = timings.get_percentile(95)
619                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
620             else:
621                 ret += '\n'
622             if count > 0:
623                 ret += f'    ...finished {count} total bundle(s) so far\n'
624             in_flight = len(self.in_flight_bundles_by_worker[worker])
625             if in_flight > 0:
626                 ret += f'    ...{in_flight} bundles currently in flight:\n'
627                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
628                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
629                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
630                     if self.start_per_bundle[bundle_uuid] is not None:
631                         sec = ts - self.start_per_bundle[bundle_uuid]
632                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
633                     else:
634                         ret += f'       {details} setting up / copying data...'
635                         sec = 0.0
636
637                     if qworker_p95 is not None:
638                         if sec > qworker_p95:
639                             ret += f'{bg("red")}>💻p95{reset()} '
640                             if details is not None:
641                                 details.slower_than_local_p95 = True
642                         else:
643                             if details is not None:
644                                 details.slower_than_local_p95 = False
645
646                     if qall_p95 is not None:
647                         if sec > qall_p95:
648                             ret += f'{bg("red")}>∀p95{reset()} '
649                             if details is not None:
650                                 details.slower_than_global_p95 = True
651                         else:
652                             details.slower_than_global_p95 = False
653                     ret += '\n'
654         return ret
655
656     def periodic_dump(self, total_bundles_submitted: int) -> None:
657         assert self.lock.locked()
658         self.total_bundles_submitted = total_bundles_submitted
659         ts = time.time()
660         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
661             print(self)
662             self.last_periodic_dump = ts
663
664
665 class RemoteWorkerSelectionPolicy(ABC):
666     """An interface definition of a policy for selecting a remote worker."""
667
668     def __init__(self):
669         self.workers: Optional[List[RemoteWorkerRecord]] = None
670
671     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
672         self.workers = workers
673
674     @abstractmethod
675     def is_worker_available(self) -> bool:
676         pass
677
678     @abstractmethod
679     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
680         pass
681
682
683 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
684     """A remote worker selector that uses weighted RNG."""
685
686     @overrides
687     def is_worker_available(self) -> bool:
688         if self.workers:
689             for worker in self.workers:
690                 if worker.count > 0:
691                     return True
692         return False
693
694     @overrides
695     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
696         grabbag = []
697         if self.workers:
698             for worker in self.workers:
699                 if worker.machine != machine_to_avoid:
700                     if worker.count > 0:
701                         for _ in range(worker.count * worker.weight):
702                             grabbag.append(worker)
703
704         if len(grabbag) == 0:
705             logger.debug(
706                 'There are no available workers that avoid %s', machine_to_avoid
707             )
708             if self.workers:
709                 for worker in self.workers:
710                     if worker.count > 0:
711                         for _ in range(worker.count * worker.weight):
712                             grabbag.append(worker)
713
714         if len(grabbag) == 0:
715             logger.warning('There are no available workers?!')
716             return None
717
718         worker = random.sample(grabbag, 1)[0]
719         assert worker.count > 0
720         worker.count -= 1
721         logger.debug('Selected worker %s', worker)
722         return worker
723
724
725 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
726     """A remote worker selector that just round robins."""
727
728     def __init__(self) -> None:
729         super().__init__()
730         self.index = 0
731
732     @overrides
733     def is_worker_available(self) -> bool:
734         if self.workers:
735             for worker in self.workers:
736                 if worker.count > 0:
737                     return True
738         return False
739
740     @overrides
741     def acquire_worker(
742         self, machine_to_avoid: str = None
743     ) -> Optional[RemoteWorkerRecord]:
744         if self.workers:
745             x = self.index
746             while True:
747                 worker = self.workers[x]
748                 if worker.count > 0:
749                     worker.count -= 1
750                     x += 1
751                     if x >= len(self.workers):
752                         x = 0
753                     self.index = x
754                     logger.debug('Selected worker %s', worker)
755                     return worker
756                 x += 1
757                 if x >= len(self.workers):
758                     x = 0
759                 if x == self.index:
760                     logger.warning('Unexpectedly could not find a worker, retrying...')
761                     return None
762         return None
763
764
765 class RemoteExecutor(BaseExecutor):
766     """An executor that uses processes on remote machines to do work.
767     To do so, it requires that a pool of remote workers to be properly
768     configured.  See instructions in
769     :class:`pyutils.parallelize.parallelize`.
770
771     Each machine in a worker pool has a *weight* and a *count*.  A
772     *weight* captures the relative speed of a processor on that worker
773     and a *count* captures the number of synchronous tasks the worker
774     can accept (i.e. the number of cpus on the machine).
775
776     To dispatch work to a remote machine, this class pickles the code
777     to be executed remotely using `cloudpickle`.  For that to work,
778     the remote machine should be running the same version of Python as
779     this machine, ideally in a virtual environment with the same
780     import libraries installed.  Differences in operating system
781     and/or processor architecture don't seem to matter for most code,
782     though.
783
784     .. warning::
785
786         Mismatches in Python version or in the version numbers of
787         third-party libraries between machines can cause problems
788         when trying to unpickle and run code remotely.
789
790     Work to be dispatched is represented in this code by creating a
791     "bundle".  Each bundle is assigned to a remote worker based on
792     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
793     general, it attempts to load all workers in the pool and maximize
794     throughput.  Once assigned to a remote worker, pickled code is
795     copied to that worker via `scp` and a remote command is issued via
796     `ssh` to execute a :file:`remote_worker.py` process on the remote
797     machine.  This process unpickles the code, runs it, and produces a
798     result which is then copied back to the local machine (again via
799     `scp`) where it can be processed by local code.
800
801     You can and probably must override the path of
802     :file:`remote_worker.py` on your pool machines using the
803     `--remote_worker_helper_path` commandline argument (or by just
804     changing the default in code, see above in this file's code).
805
806     During remote work execution, this local machine acts as a
807     controller dispatching all work to the network, copying pickled
808     tasks out, and copying results back in.  It may also be a worker
809     in the pool but do not underestimate the cost of being a
810     controller -- it takes some cpu and a lot of network bandwidth.
811     The work dispatcher logic attempts to detect when a controller is
812     also a worker and reduce its load.
813
814     Some redundancy and safety provisions are made when scheduling
815     tasks to the worker pool; e.g. slower than expected tasks have
816     redundant backups tasks created, especially if there are otherwise
817     idle workers.  If a task fails repeatedly, the dispatcher consider
818     it poisoned and give up on it.
819
820     .. warning::
821
822         This executor probably only makes sense to use with
823         computationally expensive tasks such as jobs that will execute
824         for ~30 seconds or longer.
825
826         The network overhead and latency of copying work from the
827         controller (local) machine to the remote workers and copying
828         results back again is relatively high.  Especially at startup,
829         the network can become a bottleneck.  Future versions of this
830         code may attempt to split the responsibility of being a
831         controller (distributing work to pool machines).
832
833     Instructions for how to set this up are provided in
834     :class:`pyutils.parallelize.parallelize`.
835
836     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
837
838     """
839
840     def __init__(
841         self,
842         workers: List[RemoteWorkerRecord],
843         policy: RemoteWorkerSelectionPolicy,
844     ) -> None:
845         """
846         Args:
847             workers: A list of remote workers we can call on to do tasks.
848             policy: A policy for selecting remote workers for tasks.
849         """
850
851         super().__init__()
852         self.workers = workers
853         self.policy = policy
854         self.worker_count = 0
855         for worker in self.workers:
856             self.worker_count += worker.count
857         if self.worker_count <= 0:
858             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
859             logger.critical(msg)
860             raise RemoteExecutorException(msg)
861         self.policy.register_worker_pool(self.workers)
862         self.cv = threading.Condition()
863         logger.debug(
864             'Creating %d local threads, one per remote worker.', self.worker_count
865         )
866         self._helper_executor = fut.ThreadPoolExecutor(
867             thread_name_prefix="remote_executor_helper",
868             max_workers=self.worker_count,
869         )
870         self.status = RemoteExecutorStatus(self.worker_count)
871         self.total_bundles_submitted = 0
872         self.backup_lock = threading.Lock()
873         self.last_backup = None
874         (
875             self.heartbeat_thread,
876             self.heartbeat_stop_event,
877         ) = self._run_periodic_heartbeat()
878         self.already_shutdown = False
879
880     @background_thread
881     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
882         """
883         We create a background thread to invoke :meth:`_heartbeat` regularly
884         while we are scheduling work.  It does some accounting such as
885         looking for slow bundles to tag for backup creation, checking for
886         unexpected failures, and printing a fancy message on stdout.
887         """
888         while not stop_event.is_set():
889             time.sleep(5.0)
890             logger.debug('Running periodic heartbeat code...')
891             self._heartbeat()
892         logger.debug('Periodic heartbeat thread shutting down.')
893
894     def _heartbeat(self) -> None:
895         # Note: this is invoked on a background thread, not an
896         # executor thread.  Be careful what you do with it b/c it
897         # needs to get back and dump status again periodically.
898         with self.status.lock:
899             self.status.periodic_dump(self.total_bundles_submitted)
900
901             # Look for bundles to reschedule via executor.submit
902             if config.config['executors_schedule_remote_backups']:
903                 self._maybe_schedule_backup_bundles()
904
905     def _maybe_schedule_backup_bundles(self):
906         """Maybe schedule backup bundles if we see a very slow bundle."""
907
908         assert self.status.lock.locked()
909         num_done = len(self.status.finished_bundle_timings)
910         num_idle_workers = self.worker_count - self.task_count
911         now = time.time()
912         if (
913             num_done >= 2
914             and num_idle_workers > 0
915             and (self.last_backup is None or (now - self.last_backup > 9.0))
916             and self.backup_lock.acquire(blocking=False)
917         ):
918             try:
919                 assert self.backup_lock.locked()
920
921                 bundle_to_backup = None
922                 best_score = None
923                 for (
924                     worker,
925                     bundle_uuids,
926                 ) in self.status.in_flight_bundles_by_worker.items():
927
928                     # Prefer to schedule backups of bundles running on
929                     # slower machines.
930                     base_score = 0
931                     for record in self.workers:
932                         if worker.machine == record.machine:
933                             base_score = float(record.weight)
934                             base_score = 1.0 / base_score
935                             base_score *= 200.0
936                             base_score = int(base_score)
937                             break
938
939                     for uuid in bundle_uuids:
940                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
941                         if (
942                             bundle is not None
943                             and bundle.src_bundle is None
944                             and bundle.backup_bundles is not None
945                         ):
946                             score = base_score
947
948                             # Schedule backups of bundles running
949                             # longer; especially those that are
950                             # unexpectedly slow.
951                             start_ts = self.status.start_per_bundle[uuid]
952                             if start_ts is not None:
953                                 runtime = now - start_ts
954                                 score += runtime
955                                 logger.debug(
956                                     'score[%s] => %.1f  # latency boost', bundle, score
957                                 )
958
959                                 if bundle.slower_than_local_p95:
960                                     score += runtime / 2
961                                     logger.debug(
962                                         'score[%s] => %.1f  # >worker p95',
963                                         bundle,
964                                         score,
965                                     )
966
967                                 if bundle.slower_than_global_p95:
968                                     score += runtime / 4
969                                     logger.debug(
970                                         'score[%s] => %.1f  # >global p95',
971                                         bundle,
972                                         score,
973                                     )
974
975                             # Prefer backups of bundles that don't
976                             # have backups already.
977                             backup_count = len(bundle.backup_bundles)
978                             if backup_count == 0:
979                                 score *= 2
980                             elif backup_count == 1:
981                                 score /= 2
982                             elif backup_count == 2:
983                                 score /= 8
984                             else:
985                                 score = 0
986                             logger.debug(
987                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
988                                 bundle,
989                                 score,
990                             )
991
992                             if score != 0 and (
993                                 best_score is None or score > best_score
994                             ):
995                                 bundle_to_backup = bundle
996                                 assert bundle is not None
997                                 assert bundle.backup_bundles is not None
998                                 assert bundle.src_bundle is None
999                                 best_score = score
1000
1001                 # Note: this is all still happening on the heartbeat
1002                 # runner thread.  That's ok because
1003                 # _schedule_backup_for_bundle uses the executor to
1004                 # submit the bundle again which will cause it to be
1005                 # picked up by a worker thread and allow this thread
1006                 # to return to run future heartbeats.
1007                 if bundle_to_backup is not None:
1008                     self.last_backup = now
1009                     logger.info(
1010                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1011                         bundle_to_backup,
1012                         best_score,
1013                     )
1014                     self._schedule_backup_for_bundle(bundle_to_backup)
1015             finally:
1016                 self.backup_lock.release()
1017
1018     def _is_worker_available(self) -> bool:
1019         """Is there a worker available currently?"""
1020         return self.policy.is_worker_available()
1021
1022     def _acquire_worker(
1023         self, machine_to_avoid: str = None
1024     ) -> Optional[RemoteWorkerRecord]:
1025         """Try to acquire a worker."""
1026         return self.policy.acquire_worker(machine_to_avoid)
1027
1028     def _find_available_worker_or_block(
1029         self, machine_to_avoid: str = None
1030     ) -> RemoteWorkerRecord:
1031         """Find a worker or block until one becomes available."""
1032         with self.cv:
1033             while not self._is_worker_available():
1034                 self.cv.wait()
1035             worker = self._acquire_worker(machine_to_avoid)
1036             if worker is not None:
1037                 return worker
1038         msg = "We should never reach this point in the code"
1039         logger.critical(msg)
1040         raise Exception(msg)
1041
1042     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1043         """Release a previously acquired worker."""
1044         worker = bundle.worker
1045         assert worker is not None
1046         logger.debug('Released worker %s', worker)
1047         self.status.record_release_worker(
1048             worker,
1049             bundle.uuid,
1050             was_cancelled,
1051         )
1052         with self.cv:
1053             worker.count += 1
1054             self.cv.notify()
1055         self.adjust_task_count(-1)
1056
1057     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1058         """See if a particular bundle is cancelled.  Do not block."""
1059         with self.status.lock:
1060             if bundle.is_cancelled.wait(timeout=0.0):
1061                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1062                 bundle.was_cancelled = True
1063                 return True
1064         return False
1065
1066     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1067         """Find a worker for bundle or block until one is available."""
1068
1069         self.adjust_task_count(+1)
1070         uuid = bundle.uuid
1071         controller = bundle.controller
1072         avoid_machine = override_avoid_machine
1073         is_original = bundle.src_bundle is None
1074
1075         # Try not to schedule a backup on the same host as the original.
1076         if avoid_machine is None and bundle.src_bundle is not None:
1077             avoid_machine = bundle.src_bundle.machine
1078         worker = None
1079         while worker is None:
1080             worker = self._find_available_worker_or_block(avoid_machine)
1081         assert worker is not None
1082
1083         # Ok, found a worker.
1084         bundle.worker = worker
1085         machine = bundle.machine = worker.machine
1086         username = bundle.username = worker.username
1087         self.status.record_acquire_worker(worker, uuid)
1088         logger.debug('%s: Running bundle on %s...', bundle, worker)
1089
1090         # Before we do any work, make sure the bundle is still viable.
1091         # It may have been some time between when it was submitted and
1092         # now due to lack of worker availability and someone else may
1093         # have already finished it.
1094         if self._check_if_cancelled(bundle):
1095             try:
1096                 return self._process_work_result(bundle)
1097             except Exception:
1098                 logger.warning(
1099                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1100                 )
1101                 self._release_worker(bundle)
1102                 if is_original:
1103                     # Weird.  We are the original owner of this
1104                     # bundle.  For it to have been cancelled, a backup
1105                     # must have already started and completed before
1106                     # we even for started.  Moreover, the backup says
1107                     # it is done but we can't find the results it
1108                     # should have copied over.  Reschedule the whole
1109                     # thing.
1110                     logger.exception(
1111                         '%s: We are the original owner thread and yet there are '
1112                         'no results for this bundle.  This is unexpected and bad. '
1113                         'Attempting an emergency retry...',
1114                         bundle,
1115                     )
1116                     return self._emergency_retry_nasty_bundle(bundle)
1117                 else:
1118                     # We're a backup and our bundle is cancelled
1119                     # before we even got started.  Do nothing and let
1120                     # the original bundle's thread worry about either
1121                     # finding the results or complaining about it.
1122                     return None
1123
1124         # Send input code / data to worker machine if it's not local.
1125         if controller not in machine:
1126             try:
1127                 cmd = (
1128                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1129                 )
1130                 start_ts = time.time()
1131                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1132                 run_silently(cmd)
1133                 xfer_latency = time.time() - start_ts
1134                 logger.debug(
1135                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1136                 )
1137             except Exception:
1138                 self._release_worker(bundle)
1139                 if is_original:
1140                     # Weird.  We tried to copy the code to the worker
1141                     # and it failed...  And we're the original bundle.
1142                     # We have to retry.
1143                     logger.exception(
1144                         "%s: Failed to send instructions to the worker machine?! "
1145                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1146                         "be a race condition.  Attempting an emergency retry...",
1147                         bundle,
1148                     )
1149                     return self._emergency_retry_nasty_bundle(bundle)
1150                 else:
1151                     # This is actually expected; we're a backup.
1152                     # There's a race condition where someone else
1153                     # already finished the work and removed the source
1154                     # code_file before we could copy it.  Ignore.
1155                     logger.warning(
1156                         '%s: Failed to send instructions to the worker machine... '
1157                         'We\'re a backup and this may be caused by the original (or '
1158                         'some other backup) already finishing this work.  Ignoring.',
1159                         bundle,
1160                     )
1161                     return None
1162
1163         # Kick off the work.  Note that if this fails we let
1164         # _wait_for_process deal with it.
1165         self.status.record_processing_began(uuid)
1166         helper_path = config.config['remote_worker_helper_path']
1167         cmd = (
1168             f'{SSH} {bundle.username}@{bundle.machine} '
1169             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1170         )
1171         logger.debug(
1172             '%s: Executing %s in the background to kick off work...', bundle, cmd
1173         )
1174         p = cmd_in_background(cmd, silent=True)
1175         bundle.pid = p.pid
1176         logger.debug(
1177             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1178         )
1179         return self._wait_for_process(p, bundle, 0)
1180
1181     def _wait_for_process(
1182         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1183     ) -> Any:
1184         """At this point we've copied the bundle's pickled code to the remote
1185         worker and started an ssh process that should be invoking the
1186         remote worker to have it execute the user's code.  See how
1187         that's going and wait for it to complete or fail.  Note that
1188         this code is recursive: there are codepaths where we decide to
1189         stop waiting for an ssh process (because another backup seems
1190         to have finished) but then fail to fetch or parse the results
1191         from that backup and thus call ourselves to continue waiting
1192         on an active ssh process.  This is the purpose of the depth
1193         argument: to curtail potential infinite recursion by giving up
1194         eventually.
1195
1196         Args:
1197             p: the Popen record of the ssh job
1198             bundle: the bundle of work being executed remotely
1199             depth: how many retries we've made so far.  Starts at zero.
1200
1201         """
1202
1203         machine = bundle.machine
1204         assert p is not None
1205         pid = p.pid  # pid of the ssh process
1206         if depth > 3:
1207             logger.error(
1208                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1209                 pid,
1210             )
1211             p.terminate()
1212             self._release_worker(bundle)
1213             return self._emergency_retry_nasty_bundle(bundle)
1214
1215         # Spin until either the ssh job we scheduled finishes the
1216         # bundle or some backup worker signals that they finished it
1217         # before we could.
1218         while True:
1219             try:
1220                 p.wait(timeout=0.25)
1221             except subprocess.TimeoutExpired:
1222                 if self._check_if_cancelled(bundle):
1223                     logger.info(
1224                         '%s: looks like another worker finished bundle...', bundle
1225                     )
1226                     break
1227             else:
1228                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1229                 p = None
1230                 break
1231
1232         # If we get here we believe the bundle is done; either the ssh
1233         # subprocess finished (hopefully successfully) or we noticed
1234         # that some other worker seems to have completed the bundle
1235         # before us and we're bailing out.
1236         try:
1237             ret = self._process_work_result(bundle)
1238             if ret is not None and p is not None:
1239                 p.terminate()
1240             return ret
1241
1242         # Something went wrong; e.g. we could not copy the results
1243         # back, cleanup after ourselves on the remote machine, or
1244         # unpickle the results we got from the remove machine.  If we
1245         # still have an active ssh subprocess, keep waiting on it.
1246         # Otherwise, time for an emergency reschedule.
1247         except Exception:
1248             logger.exception('%s: Something unexpected just happened...', bundle)
1249             if p is not None:
1250                 logger.warning(
1251                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1252                     bundle,
1253                 )
1254                 return self._wait_for_process(p, bundle, depth + 1)
1255             else:
1256                 self._release_worker(bundle)
1257                 return self._emergency_retry_nasty_bundle(bundle)
1258
1259     def _process_work_result(self, bundle: BundleDetails) -> Any:
1260         """A bundle seems to be completed.  Check on the results."""
1261
1262         with self.status.lock:
1263             is_original = bundle.src_bundle is None
1264             was_cancelled = bundle.was_cancelled
1265             username = bundle.username
1266             machine = bundle.machine
1267             result_file = bundle.result_file
1268             code_file = bundle.code_file
1269
1270             # Whether original or backup, if we finished first we must
1271             # fetch the results if the computation happened on a
1272             # remote machine.
1273             bundle.end_ts = time.time()
1274             if not was_cancelled:
1275                 assert bundle.machine is not None
1276                 if bundle.controller not in bundle.machine:
1277                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1278                     logger.info(
1279                         "%s: Fetching results back from %s@%s via %s",
1280                         bundle,
1281                         username,
1282                         machine,
1283                         cmd,
1284                     )
1285
1286                     # If either of these throw they are handled in
1287                     # _wait_for_process.
1288                     attempts = 0
1289                     while True:
1290                         try:
1291                             run_silently(cmd)
1292                         except Exception as e:
1293                             attempts += 1
1294                             if attempts >= 3:
1295                                 raise e
1296                         else:
1297                             break
1298
1299                     # Cleanup remote /tmp files.
1300                     run_silently(
1301                         f'{SSH} {username}@{machine}'
1302                         f' "/bin/rm -f {code_file} {result_file}"'
1303                     )
1304                     logger.debug(
1305                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1306                     )
1307                 dur = bundle.end_ts - bundle.start_ts
1308                 self.histogram.add_item(dur)
1309
1310         # Only the original worker should unpickle the file contents
1311         # though since it's the only one whose result matters.  The
1312         # original is also the only job that may delete result_file
1313         # from disk.  Note that the original may have been cancelled
1314         # if one of the backups finished first; it still must read the
1315         # result from disk.  It still does that here with is_cancelled
1316         # set.
1317         if is_original:
1318             logger.debug("%s: Unpickling %s.", bundle, result_file)
1319             try:
1320                 with open(result_file, 'rb') as rb:
1321                     serialized = rb.read()
1322                 result = cloudpickle.loads(serialized)
1323             except Exception as e:
1324                 logger.exception('Failed to load %s... this is bad news.', result_file)
1325                 self._release_worker(bundle)
1326
1327                 # Re-raise the exception; the code in _wait_for_process may
1328                 # decide to _emergency_retry_nasty_bundle here.
1329                 raise e
1330             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1331             os.remove(result_file)
1332             os.remove(code_file)
1333
1334             # Notify any backups that the original is done so they
1335             # should stop ASAP.  Do this whether or not we
1336             # finished first since there could be more than one
1337             # backup.
1338             if bundle.backup_bundles is not None:
1339                 for backup in bundle.backup_bundles:
1340                     logger.debug(
1341                         '%s: Notifying backup %s that it\'s cancelled',
1342                         bundle,
1343                         backup.uuid,
1344                     )
1345                     backup.is_cancelled.set()
1346
1347         # This is a backup job and, by now, we have already fetched
1348         # the bundle results.
1349         else:
1350             # Backup results don't matter, they just need to leave the
1351             # result file in the right place for their originals to
1352             # read/unpickle later.
1353             result = None
1354
1355             # Tell the original to stop if we finished first.
1356             if not was_cancelled:
1357                 orig_bundle = bundle.src_bundle
1358                 assert orig_bundle is not None
1359                 logger.debug(
1360                     '%s: Notifying original %s we beat them to it.',
1361                     bundle,
1362                     orig_bundle.uuid,
1363                 )
1364                 orig_bundle.is_cancelled.set()
1365         self._release_worker(bundle, was_cancelled=was_cancelled)
1366         return result
1367
1368     def _create_original_bundle(self, pickle, function_name: str):
1369         """Creates a bundle that is not a backup of any other bundle but
1370         rather represents a user task.
1371         """
1372
1373         uuid = string_utils.generate_uuid(omit_dashes=True)
1374         code_file = f'/tmp/{uuid}.code.bin'
1375         result_file = f'/tmp/{uuid}.result.bin'
1376
1377         logger.debug('Writing pickled code to %s', code_file)
1378         with open(code_file, 'wb') as wb:
1379             wb.write(pickle)
1380
1381         bundle = BundleDetails(
1382             pickled_code=pickle,
1383             uuid=uuid,
1384             function_name=function_name,
1385             worker=None,
1386             username=None,
1387             machine=None,
1388             controller=platform.node(),
1389             code_file=code_file,
1390             result_file=result_file,
1391             pid=0,
1392             start_ts=time.time(),
1393             end_ts=0.0,
1394             slower_than_local_p95=False,
1395             slower_than_global_p95=False,
1396             src_bundle=None,
1397             is_cancelled=threading.Event(),
1398             was_cancelled=False,
1399             backup_bundles=[],
1400             failure_count=0,
1401         )
1402         self.status.record_bundle_details(bundle)
1403         logger.debug('%s: Created an original bundle', bundle)
1404         return bundle
1405
1406     def _create_backup_bundle(self, src_bundle: BundleDetails):
1407         """Creates a bundle that is a backup of another bundle that is
1408         running too slowly."""
1409
1410         assert self.status.lock.locked()
1411         assert src_bundle.backup_bundles is not None
1412         n = len(src_bundle.backup_bundles)
1413         uuid = src_bundle.uuid + f'_backup#{n}'
1414
1415         backup_bundle = BundleDetails(
1416             pickled_code=src_bundle.pickled_code,
1417             uuid=uuid,
1418             function_name=src_bundle.function_name,
1419             worker=None,
1420             username=None,
1421             machine=None,
1422             controller=src_bundle.controller,
1423             code_file=src_bundle.code_file,
1424             result_file=src_bundle.result_file,
1425             pid=0,
1426             start_ts=time.time(),
1427             end_ts=0.0,
1428             slower_than_local_p95=False,
1429             slower_than_global_p95=False,
1430             src_bundle=src_bundle,
1431             is_cancelled=threading.Event(),
1432             was_cancelled=False,
1433             backup_bundles=None,  # backup backups not allowed
1434             failure_count=0,
1435         )
1436         src_bundle.backup_bundles.append(backup_bundle)
1437         self.status.record_bundle_details_already_locked(backup_bundle)
1438         logger.debug('%s: Created a backup bundle', backup_bundle)
1439         return backup_bundle
1440
1441     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1442         """Schedule a backup of src_bundle."""
1443
1444         assert self.status.lock.locked()
1445         assert src_bundle is not None
1446         backup_bundle = self._create_backup_bundle(src_bundle)
1447         logger.debug(
1448             '%s/%s: Scheduling backup for execution...',
1449             backup_bundle.uuid,
1450             backup_bundle.function_name,
1451         )
1452         self._helper_executor.submit(self._launch, backup_bundle)
1453
1454         # Results from backups don't matter; if they finish first
1455         # they will move the result_file to this machine and let
1456         # the original pick them up and unpickle them (and return
1457         # a result).
1458
1459     def _emergency_retry_nasty_bundle(
1460         self, bundle: BundleDetails
1461     ) -> Optional[fut.Future]:
1462         """Something unexpectedly failed with bundle.  Either retry it
1463         from the beginning or throw in the towel and give up on it."""
1464
1465         is_original = bundle.src_bundle is None
1466         bundle.worker = None
1467         avoid_last_machine = bundle.machine
1468         bundle.machine = None
1469         bundle.username = None
1470         bundle.failure_count += 1
1471         if is_original:
1472             retry_limit = 3
1473         else:
1474             retry_limit = 2
1475
1476         if bundle.failure_count > retry_limit:
1477             logger.error(
1478                 '%s: Tried this bundle too many times already (%dx); giving up.',
1479                 bundle,
1480                 retry_limit,
1481             )
1482             if is_original:
1483                 raise RemoteExecutorException(
1484                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1485                 )
1486             logger.error(
1487                 '%s: At least it\'s only a backup; better luck with the others.',
1488                 bundle,
1489             )
1490             return None
1491         else:
1492             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1493             logger.warning(msg)
1494             warnings.warn(msg)
1495             return self._launch(bundle, avoid_last_machine)
1496
1497     @overrides
1498     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1499         """Submit work to be done.  This is the user entry point of this
1500         class."""
1501         if self.already_shutdown:
1502             raise Exception('Submitted work after shutdown.')
1503         pickle = _make_cloud_pickle(function, *args, **kwargs)
1504         bundle = self._create_original_bundle(pickle, function.__name__)
1505         self.total_bundles_submitted += 1
1506         return self._helper_executor.submit(self._launch, bundle)
1507
1508     @overrides
1509     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1510         """Shutdown the executor."""
1511         if not self.already_shutdown:
1512             logging.debug('Shutting down RemoteExecutor %s', self.title)
1513             self.heartbeat_stop_event.set()
1514             self.heartbeat_thread.join()
1515             self._helper_executor.shutdown(wait)
1516             if not quiet:
1517                 print(self.histogram.__repr__(label_formatter='%ds'))
1518             self.already_shutdown = True
1519
1520
1521 class RemoteWorkerPoolProvider:
1522     @abstractmethod
1523     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1524         pass
1525
1526
1527 @persistent.persistent_autoloaded_singleton()  # type: ignore
1528 class ConfigRemoteWorkerPoolProvider(
1529     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1530 ):
1531     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1532         self.remote_worker_pool = []
1533         for record in json_remote_worker_pool['remote_worker_records']:
1534             self.remote_worker_pool.append(
1535                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1536             )
1537         assert len(self.remote_worker_pool) > 0
1538
1539     @overrides
1540     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1541         return self.remote_worker_pool
1542
1543     @overrides
1544     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1545         return self.remote_worker_pool
1546
1547     @staticmethod
1548     @overrides
1549     def get_filename() -> str:
1550         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1551
1552     @staticmethod
1553     @overrides
1554     def should_we_load_data(filename: str) -> bool:
1555         return True
1556
1557     @staticmethod
1558     @overrides
1559     def should_we_save_data(filename: str) -> bool:
1560         return False
1561
1562
1563 @singleton
1564 class DefaultExecutors(object):
1565     """A container for a default thread, process and remote executor.
1566     These are not created until needed and we take care to clean up
1567     before process exit automatically for the caller's convenience.
1568     Instead of creating your own executor, consider using the one
1569     from this pool.  e.g.::
1570
1571         @par.parallelize(method=par.Method.PROCESS)
1572         def do_work(
1573             solutions: List[Work],
1574             shard_num: int,
1575             ...
1576         ):
1577             <do the work>
1578
1579
1580         def start_do_work(all_work: List[Work]):
1581             shards = []
1582             logger.debug('Sharding work into groups of 10.')
1583             for subset in list_utils.shard(all_work, 10):
1584                 shards.append([x for x in subset])
1585
1586             logger.debug('Kicking off helper pool.')
1587             try:
1588                 for n, shard in enumerate(shards):
1589                     results.append(
1590                         do_work(
1591                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1592                         )
1593                     )
1594                 smart_future.wait_all(results)
1595             finally:
1596                 # Note: if you forget to do this it will clean itself up
1597                 # during program termination including tearing down any
1598                 # active ssh connections.
1599                 executors.DefaultExecutors().process_pool().shutdown()
1600     """
1601
1602     def __init__(self):
1603         self.thread_executor: Optional[ThreadExecutor] = None
1604         self.process_executor: Optional[ProcessExecutor] = None
1605         self.remote_executor: Optional[RemoteExecutor] = None
1606
1607     @staticmethod
1608     def _ping(host) -> bool:
1609         logger.debug('RUN> ping -c 1 %s', host)
1610         try:
1611             x = cmd_exitcode(
1612                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1613             )
1614             return x == 0
1615         except Exception:
1616             return False
1617
1618     def thread_pool(self) -> ThreadExecutor:
1619         if self.thread_executor is None:
1620             self.thread_executor = ThreadExecutor()
1621         return self.thread_executor
1622
1623     def process_pool(self) -> ProcessExecutor:
1624         if self.process_executor is None:
1625             self.process_executor = ProcessExecutor()
1626         return self.process_executor
1627
1628     def remote_pool(self) -> RemoteExecutor:
1629         if self.remote_executor is None:
1630             logger.info('Looking for some helper machines...')
1631             provider = ConfigRemoteWorkerPoolProvider()
1632             all_machines = provider.get_remote_workers()
1633             pool = []
1634
1635             # Make sure we can ping each machine.
1636             for record in all_machines:
1637                 if self._ping(record.machine):
1638                     logger.info('%s is alive / responding to pings', record.machine)
1639                     pool.append(record)
1640
1641             # The controller machine has a lot to do; go easy on it.
1642             for record in pool:
1643                 if record.machine == platform.node() and record.count > 1:
1644                     logger.info('Reducing workload for %s.', record.machine)
1645                     record.count = max(int(record.count / 2), 1)
1646
1647             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1648             policy.register_worker_pool(pool)
1649             self.remote_executor = RemoteExecutor(pool, policy)
1650         return self.remote_executor
1651
1652     def shutdown(self) -> None:
1653         if self.thread_executor is not None:
1654             self.thread_executor.shutdown(wait=True, quiet=True)
1655             self.thread_executor = None
1656         if self.process_executor is not None:
1657             self.process_executor.shutdown(wait=True, quiet=True)
1658             self.process_executor = None
1659         if self.remote_executor is not None:
1660             self.remote_executor.shutdown(wait=True, quiet=True)
1661             self.remote_executor = None