types -> typez as the name mirrors a python core library name.
[pyutils.git] / src / pyutils / parallelize / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
5
6 # © Copyright 2021-2023, Scott Gasch
7
8 """
9 This module defines a :class:`BaseExecutor` interface and three
10 implementations:
11
12     - :class:`ThreadExecutor`
13     - :class:`ProcessExecutor`
14     - :class:`RemoteExecutor`
15
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work.  Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code.  But it's good for
20 work that is mostly I/O bound.
21
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
24 workloads.
25
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network.  You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
31
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed.  It has the added benefit of being automatically
35 cleaned up at process termination time.
36
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
39 """
40
41 from __future__ import annotations
42
43 import concurrent.futures as fut
44 import logging
45 import os
46 import platform
47 import random
48 import subprocess
49 import threading
50 import time
51 import warnings
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
56
57 import cloudpickle  # type: ignore
58 from overrides import overrides
59
60 import pyutils.typez.histogram as hist
61 from pyutils import (
62     argparse_utils,
63     config,
64     dataclass_utils,
65     math_utils,
66     persistent,
67     string_utils,
68 )
69 from pyutils.ansi import bg, fg, reset, underline
70 from pyutils.decorator_utils import singleton
71 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
72 from pyutils.parallelize.thread_utils import background_thread
73 from pyutils.typez import type_utils
74
75 logger = logging.getLogger(__name__)
76
77 parser = config.add_commandline_args(
78     f"Executors ({__file__})", "Args related to processing executors."
79 )
80 parser.add_argument(
81     '--executors_threadpool_size',
82     type=int,
83     metavar='#THREADS',
84     help='Number of threads in the default threadpool, leave unset for default',
85     default=None,
86 )
87 parser.add_argument(
88     '--executors_processpool_size',
89     type=int,
90     metavar='#PROCESSES',
91     help='Number of processes in the default processpool, leave unset for default',
92     default=None,
93 )
94 parser.add_argument(
95     '--executors_schedule_remote_backups',
96     default=True,
97     action=argparse_utils.ActionNoYes,
98     help='Should we schedule duplicative backup work if a remote bundle is slow',
99 )
100 parser.add_argument(
101     '--executors_max_bundle_failures',
102     type=int,
103     default=3,
104     metavar='#FAILURES',
105     help='Maximum number of failures before giving up on a bundle',
106 )
107 parser.add_argument(
108     '--remote_worker_records_file',
109     type=str,
110     metavar='FILENAME',
111     help='Path of the remote worker records file (JSON)',
112     default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
113 )
114 parser.add_argument(
115     '--remote_worker_helper_path',
116     type=str,
117     metavar='PATH_TO_REMOTE_WORKER_PY',
118     help='Path to remote_worker.py on remote machines',
119     default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
120 )
121
122
123 SSH = '/usr/bin/ssh -oForwardX11=no'
124 SCP = '/usr/bin/scp -C'
125
126
127 def _make_cloud_pickle(fun, *args, **kwargs):
128     """Internal helper to create cloud pickles."""
129     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
130     return cloudpickle.dumps((fun, args, kwargs))
131
132
133 class BaseExecutor(ABC):
134     """The base executor interface definition.  The interface for
135     :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
136     :class:`ThreadExecutor`.
137     """
138
139     def __init__(self, *, title=''):
140         """
141         Args:
142             title: the name of this executor.
143         """
144         self.title = title
145         self.histogram = hist.SimpleHistogram(
146             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
147         )
148         self.task_count = 0
149
150     @abstractmethod
151     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
152         """Submit work for the executor to do.
153
154         Args:
155             function: the Callable to be executed.
156             *args: the arguments to function
157             **kwargs: the arguments to function
158
159         Returns:
160             A concurrent :class:`Future` representing the result of the
161             work.
162         """
163         pass
164
165     @abstractmethod
166     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
167         """Shutdown the executor.
168
169         Args:
170             wait: wait for the shutdown to complete before returning?
171             quiet: keep it quiet, please.
172         """
173         pass
174
175     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
176         """Shutdown the executor and return True if the executor is idle
177         (i.e. there are no pending or active tasks).  Return False
178         otherwise.  Note: this should only be called by the launcher
179         process.
180
181         Args:
182             quiet: keep it quiet, please.
183
184         Returns:
185             True if the executor could be shut down because it has no
186             pending work, False otherwise.
187         """
188         if self.task_count == 0:
189             self.shutdown(wait=True, quiet=quiet)
190             return True
191         return False
192
193     def adjust_task_count(self, delta: int) -> None:
194         """Change the task count.  Note: do not call this method from a
195         worker, it should only be called by the launcher process /
196         thread / machine.
197
198         Args:
199             delta: the delta value by which to adjust task count.
200         """
201         self.task_count += delta
202         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
203
204     def get_task_count(self) -> int:
205         """Change the task count.  Note: do not call this method from a
206         worker, it should only be called by the launcher process /
207         thread / machine.
208
209         Returns:
210             The executor's current task count.
211         """
212         return self.task_count
213
214
215 class ThreadExecutor(BaseExecutor):
216     """A threadpool executor.  This executor uses Python threads to
217     schedule tasks.  Note that, at least as of python3.10, because of
218     the global lock in the interpreter itself, these do not
219     parallelize very well so this class is useful mostly for non-CPU
220     intensive tasks.
221
222     See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
223     """
224
225     def __init__(self, max_workers: Optional[int] = None):
226         """
227         Args:
228             max_workers: maximum number of threads to create in the pool.
229         """
230         super().__init__()
231         workers = None
232         if max_workers is not None:
233             workers = max_workers
234         elif 'executors_threadpool_size' in config.config:
235             workers = config.config['executors_threadpool_size']
236         if workers is not None:
237             logger.debug('Creating threadpool executor with %d workers', workers)
238         else:
239             logger.debug('Creating a default sized threadpool executor')
240         self._thread_pool_executor = fut.ThreadPoolExecutor(
241             max_workers=workers, thread_name_prefix="thread_executor_helper"
242         )
243         self.already_shutdown = False
244
245     # This is run on a different thread; do not adjust task count here.
246     @staticmethod
247     def _run_local_bundle(fun, *args, **kwargs):
248         logger.debug("Running local bundle at %s", fun.__name__)
249         result = fun(*args, **kwargs)
250         return result
251
252     @overrides
253     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
254         if self.already_shutdown:
255             raise Exception('Submitted work after shutdown.')
256         self.adjust_task_count(+1)
257         newargs = []
258         newargs.append(function)
259         for arg in args:
260             newargs.append(arg)
261         start = time.time()
262         result = self._thread_pool_executor.submit(
263             ThreadExecutor._run_local_bundle, *newargs, **kwargs
264         )
265         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
266         result.add_done_callback(lambda _: self.adjust_task_count(-1))
267         return result
268
269     @overrides
270     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
271         if not self.already_shutdown:
272             logger.debug('Shutting down threadpool executor %s', self.title)
273             self._thread_pool_executor.shutdown(wait)
274             if not quiet:
275                 print(self.histogram.__repr__(label_formatter='%ds'))
276             self.already_shutdown = True
277
278
279 class ProcessExecutor(BaseExecutor):
280     """An executor which runs tasks in child processes.
281
282     See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
283     """
284
285     def __init__(self, max_workers=None):
286         """
287         Args:
288             max_workers: the max number of worker processes to create.
289         """
290         super().__init__()
291         workers = None
292         if max_workers is not None:
293             workers = max_workers
294         elif 'executors_processpool_size' in config.config:
295             workers = config.config['executors_processpool_size']
296         if workers is not None:
297             logger.debug('Creating processpool executor with %d workers.', workers)
298         else:
299             logger.debug('Creating a default sized processpool executor')
300         self._process_executor = fut.ProcessPoolExecutor(
301             max_workers=workers,
302         )
303         self.already_shutdown = False
304
305     # This is run in another process; do not adjust task count here.
306     @staticmethod
307     def _run_cloud_pickle(pickle):
308         fun, args, kwargs = cloudpickle.loads(pickle)
309         logger.debug("Running pickled bundle at %s", fun.__name__)
310         result = fun(*args, **kwargs)
311         return result
312
313     @overrides
314     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
315         if self.already_shutdown:
316             raise Exception('Submitted work after shutdown.')
317         start = time.time()
318         self.adjust_task_count(+1)
319         pickle = _make_cloud_pickle(function, *args, **kwargs)
320         result = self._process_executor.submit(
321             ProcessExecutor._run_cloud_pickle, pickle
322         )
323         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
324         result.add_done_callback(lambda _: self.adjust_task_count(-1))
325         return result
326
327     @overrides
328     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
329         if not self.already_shutdown:
330             logger.debug('Shutting down processpool executor %s', self.title)
331             self._process_executor.shutdown(wait)
332             if not quiet:
333                 print(self.histogram.__repr__(label_formatter='%ds'))
334             self.already_shutdown = True
335
336     def __getstate__(self):
337         state = self.__dict__.copy()
338         state['_process_executor'] = None
339         return state
340
341
342 class RemoteExecutorException(Exception):
343     """Thrown when a bundle cannot be executed despite several retries."""
344
345     pass
346
347
348 @dataclass
349 class RemoteWorkerRecord:
350     """A record of info about a remote worker."""
351
352     username: str
353     """Username we can ssh into on this machine to run work."""
354
355     machine: str
356     """Machine address / name."""
357
358     weight: int
359     """Relative probability for the weighted policy to select this
360     machine for scheduling work."""
361
362     count: int
363     """If this machine is selected, what is the maximum number of task
364     that it can handle?"""
365
366     def __hash__(self):
367         return hash((self.username, self.machine))
368
369     def __repr__(self):
370         return f'{self.username}@{self.machine}'
371
372
373 @dataclass
374 class BundleDetails:
375     """All info necessary to define some unit of work that needs to be
376     done, where it is being run, its state, whether it is an original
377     bundle of a backup bundle, how many times it has failed, etc...
378     """
379
380     pickled_code: bytes
381     """The code to run, cloud pickled"""
382
383     uuid: str
384     """A unique identifier"""
385
386     function_name: str
387     """The name of the function we pickled"""
388
389     worker: Optional[RemoteWorkerRecord]
390     """The remote worker running this bundle or None if none (yet)"""
391
392     username: Optional[str]
393     """The remote username running this bundle or None if none (yet)"""
394
395     machine: Optional[str]
396     """The remote machine running this bundle or None if none (yet)"""
397
398     controller: str
399     """The controller machine"""
400
401     code_file: str
402     """A unique filename to hold the work to be done"""
403
404     result_file: str
405     """Where the results should be placed / read from"""
406
407     pid: int
408     """The process id of the local subprocess watching the ssh connection
409     to the remote machine"""
410
411     start_ts: float
412     """Starting time"""
413
414     end_ts: float
415     """Ending time"""
416
417     slower_than_local_p95: bool
418     """Currently slower then 95% of other bundles on remote host"""
419
420     slower_than_global_p95: bool
421     """Currently slower than 95% of other bundles globally"""
422
423     src_bundle: Optional[BundleDetails]
424     """If this is a backup bundle, this points to the original bundle
425     that it's backing up.  None otherwise."""
426
427     is_cancelled: threading.Event
428     """An event that can be signaled to indicate this bundle is cancelled.
429     This is set when another copy (backup or original) of this work has
430     completed successfully elsewhere."""
431
432     was_cancelled: bool
433     """True if this bundle was cancelled, False if it finished normally"""
434
435     backup_bundles: Optional[List[BundleDetails]]
436     """If we've created backups of this bundle, this is the list of them"""
437
438     failure_count: int
439     """How many times has this bundle failed already?"""
440
441     def __repr__(self):
442         uuid = self.uuid
443         if uuid[-9:-2] == '_backup':
444             uuid = uuid[:-9]
445             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
446         else:
447             suffix = uuid[-6:]
448
449         # We colorize the uuid based on some bits from it to make them
450         # stand out in the logging and help a reader correlate log messages
451         # related to the same bundle.
452         colorz = [
453             fg('violet red'),
454             fg('red'),
455             fg('orange'),
456             fg('peach orange'),
457             fg('yellow'),
458             fg('marigold yellow'),
459             fg('green yellow'),
460             fg('tea green'),
461             fg('cornflower blue'),
462             fg('turquoise blue'),
463             fg('tropical blue'),
464             fg('lavender purple'),
465             fg('medium purple'),
466         ]
467         c = colorz[int(uuid[-2:], 16) % len(colorz)]
468         function_name = (
469             self.function_name if self.function_name is not None else 'nofname'
470         )
471         machine = self.machine if self.machine is not None else 'nomachine'
472         return f'{c}{suffix}/{function_name}/{machine}{reset()}'
473
474
475 class RemoteExecutorStatus:
476     """A status 'scoreboard' for a remote executor tracking various
477     metrics and able to render a periodic dump of global state.
478     """
479
480     def __init__(self, total_worker_count: int) -> None:
481         """
482         Args:
483             total_worker_count: number of workers in the pool
484         """
485         self.worker_count: int = total_worker_count
486         self.known_workers: Set[RemoteWorkerRecord] = set()
487         self.start_time: float = time.time()
488         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
489         self.end_per_bundle: Dict[str, float] = defaultdict(float)
490         self.finished_bundle_timings_per_worker: Dict[
491             RemoteWorkerRecord, math_utils.NumericPopulation
492         ] = {}
493         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
494         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
495         self.finished_bundle_timings: math_utils.NumericPopulation = (
496             math_utils.NumericPopulation()
497         )
498         self.last_periodic_dump: Optional[float] = None
499         self.total_bundles_submitted: int = 0
500
501         # Protects reads and modification using self.  Also used
502         # as a memory fence for modifications to bundle.
503         self.lock: threading.Lock = threading.Lock()
504
505     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
506         """Record that bundle with uuid is assigned to a particular worker.
507
508         Args:
509             worker: the record of the worker to which uuid is assigned
510             uuid: the uuid of a bundle that has been assigned to a worker
511         """
512         with self.lock:
513             self.record_acquire_worker_already_locked(worker, uuid)
514
515     def record_acquire_worker_already_locked(
516         self, worker: RemoteWorkerRecord, uuid: str
517     ) -> None:
518         """Same as above but an entry point that doesn't acquire the lock
519         for codepaths where it's already held."""
520         assert self.lock.locked()
521         self.known_workers.add(worker)
522         self.start_per_bundle[uuid] = None
523         x = self.in_flight_bundles_by_worker.get(worker, set())
524         x.add(uuid)
525         self.in_flight_bundles_by_worker[worker] = x
526
527     def record_bundle_details(self, details: BundleDetails) -> None:
528         """Register the details about a bundle of work."""
529         with self.lock:
530             self.record_bundle_details_already_locked(details)
531
532     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
533         """Same as above but for codepaths that already hold the lock."""
534         assert self.lock.locked()
535         self.bundle_details_by_uuid[details.uuid] = details
536
537     def record_release_worker(
538         self,
539         worker: RemoteWorkerRecord,
540         uuid: str,
541         was_cancelled: bool,
542     ) -> None:
543         """Record that a bundle has released a worker."""
544         with self.lock:
545             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
546
547     def record_release_worker_already_locked(
548         self,
549         worker: RemoteWorkerRecord,
550         uuid: str,
551         was_cancelled: bool,
552     ) -> None:
553         """Same as above but for codepaths that already hold the lock."""
554         assert self.lock.locked()
555         ts = time.time()
556         self.end_per_bundle[uuid] = ts
557         self.in_flight_bundles_by_worker[worker].remove(uuid)
558         if not was_cancelled:
559             start = self.start_per_bundle[uuid]
560             assert start is not None
561             bundle_latency = ts - start
562             x = self.finished_bundle_timings_per_worker.get(
563                 worker, math_utils.NumericPopulation()
564             )
565             x.add_number(bundle_latency)
566             self.finished_bundle_timings_per_worker[worker] = x
567             self.finished_bundle_timings.add_number(bundle_latency)
568
569     def record_processing_began(self, uuid: str):
570         """Record when work on a bundle begins."""
571         with self.lock:
572             self.start_per_bundle[uuid] = time.time()
573
574     def total_in_flight(self) -> int:
575         """How many bundles are in flight currently?"""
576         assert self.lock.locked()
577         total_in_flight = 0
578         for worker in self.known_workers:
579             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
580         return total_in_flight
581
582     def total_idle(self) -> int:
583         """How many idle workers are there currently?"""
584         assert self.lock.locked()
585         return self.worker_count - self.total_in_flight()
586
587     def __repr__(self):
588         assert self.lock.locked()
589         ts = time.time()
590         total_finished = len(self.finished_bundle_timings)
591         total_in_flight = self.total_in_flight()
592         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
593         qall_median = None
594         qall_p95 = None
595         if len(self.finished_bundle_timings) > 1:
596             qall_median = self.finished_bundle_timings.get_median()
597             qall_p95 = self.finished_bundle_timings.get_percentile(95)
598             ret += (
599                 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
600                 f'✅={total_finished}/{self.total_bundles_submitted}, '
601                 f'💻n={total_in_flight}/{self.worker_count}\n'
602             )
603         else:
604             ret += (
605                 f'⏱={ts-self.start_time:.1f}s, '
606                 f'✅={total_finished}/{self.total_bundles_submitted}, '
607                 f'💻n={total_in_flight}/{self.worker_count}\n'
608             )
609
610         for worker in self.known_workers:
611             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
612             timings = self.finished_bundle_timings_per_worker.get(
613                 worker, math_utils.NumericPopulation()
614             )
615             count = len(timings)
616             qworker_median = None
617             qworker_p95 = None
618             if count > 1:
619                 qworker_median = timings.get_median()
620                 qworker_p95 = timings.get_percentile(95)
621                 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
622             else:
623                 ret += '\n'
624             if count > 0:
625                 ret += f'    ...finished {count} total bundle(s) so far\n'
626             in_flight = len(self.in_flight_bundles_by_worker[worker])
627             if in_flight > 0:
628                 ret += f'    ...{in_flight} bundles currently in flight:\n'
629                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
630                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
631                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
632                     if self.start_per_bundle[bundle_uuid] is not None:
633                         sec = ts - self.start_per_bundle[bundle_uuid]
634                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
635                     else:
636                         ret += f'       {details} setting up / copying data...'
637                         sec = 0.0
638
639                     if qworker_p95 is not None:
640                         if sec > qworker_p95:
641                             ret += f'{bg("red")}>💻p95{reset()} '
642                             if details is not None:
643                                 details.slower_than_local_p95 = True
644                         else:
645                             if details is not None:
646                                 details.slower_than_local_p95 = False
647
648                     if qall_p95 is not None:
649                         if sec > qall_p95:
650                             ret += f'{bg("red")}>∀p95{reset()} '
651                             if details is not None:
652                                 details.slower_than_global_p95 = True
653                         else:
654                             details.slower_than_global_p95 = False
655                     ret += '\n'
656         return ret
657
658     def periodic_dump(self, total_bundles_submitted: int) -> None:
659         assert self.lock.locked()
660         self.total_bundles_submitted = total_bundles_submitted
661         ts = time.time()
662         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
663             print(self)
664             self.last_periodic_dump = ts
665
666
667 class RemoteWorkerSelectionPolicy(ABC):
668     """An interface definition of a policy for selecting a remote worker."""
669
670     def __init__(self):
671         self.workers: Optional[List[RemoteWorkerRecord]] = None
672
673     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
674         self.workers = workers
675
676     @abstractmethod
677     def is_worker_available(self) -> bool:
678         pass
679
680     @abstractmethod
681     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
682         pass
683
684
685 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
686     """A remote worker selector that uses weighted RNG."""
687
688     @overrides
689     def is_worker_available(self) -> bool:
690         if self.workers:
691             for worker in self.workers:
692                 if worker.count > 0:
693                     return True
694         return False
695
696     @overrides
697     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
698         grabbag = []
699         if self.workers:
700             for worker in self.workers:
701                 if worker.machine != machine_to_avoid:
702                     if worker.count > 0:
703                         for _ in range(worker.count * worker.weight):
704                             grabbag.append(worker)
705
706         if len(grabbag) == 0:
707             logger.debug(
708                 'There are no available workers that avoid %s', machine_to_avoid
709             )
710             if self.workers:
711                 for worker in self.workers:
712                     if worker.count > 0:
713                         for _ in range(worker.count * worker.weight):
714                             grabbag.append(worker)
715
716         if len(grabbag) == 0:
717             logger.warning('There are no available workers?!')
718             return None
719
720         worker = random.sample(grabbag, 1)[0]
721         assert worker.count > 0
722         worker.count -= 1
723         logger.debug('Selected worker %s', worker)
724         return worker
725
726
727 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
728     """A remote worker selector that just round robins."""
729
730     def __init__(self) -> None:
731         super().__init__()
732         self.index = 0
733
734     @overrides
735     def is_worker_available(self) -> bool:
736         if self.workers:
737             for worker in self.workers:
738                 if worker.count > 0:
739                     return True
740         return False
741
742     @overrides
743     def acquire_worker(
744         self, machine_to_avoid: str = None
745     ) -> Optional[RemoteWorkerRecord]:
746         if self.workers:
747             x = self.index
748             while True:
749                 worker = self.workers[x]
750                 if worker.count > 0:
751                     worker.count -= 1
752                     x += 1
753                     if x >= len(self.workers):
754                         x = 0
755                     self.index = x
756                     logger.debug('Selected worker %s', worker)
757                     return worker
758                 x += 1
759                 if x >= len(self.workers):
760                     x = 0
761                 if x == self.index:
762                     logger.warning('Unexpectedly could not find a worker, retrying...')
763                     return None
764         return None
765
766
767 class RemoteExecutor(BaseExecutor):
768     """An executor that uses processes on remote machines to do work.
769     To do so, it requires that a pool of remote workers to be properly
770     configured.  See instructions in
771     :class:`pyutils.parallelize.parallelize`.
772
773     Each machine in a worker pool has a *weight* and a *count*.  A
774     *weight* captures the relative speed of a processor on that worker
775     and a *count* captures the number of synchronous tasks the worker
776     can accept (i.e. the number of cpus on the machine).
777
778     To dispatch work to a remote machine, this class pickles the code
779     to be executed remotely using `cloudpickle`.  For that to work,
780     the remote machine should be running the same version of Python as
781     this machine, ideally in a virtual environment with the same
782     import libraries installed.  Differences in operating system
783     and/or processor architecture don't seem to matter for most code,
784     though.
785
786     .. warning::
787
788         Mismatches in Python version or in the version numbers of
789         third-party libraries between machines can cause problems
790         when trying to unpickle and run code remotely.
791
792     Work to be dispatched is represented in this code by creating a
793     "bundle".  Each bundle is assigned to a remote worker based on
794     heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
795     general, it attempts to load all workers in the pool and maximize
796     throughput.  Once assigned to a remote worker, pickled code is
797     copied to that worker via `scp` and a remote command is issued via
798     `ssh` to execute a :file:`remote_worker.py` process on the remote
799     machine.  This process unpickles the code, runs it, and produces a
800     result which is then copied back to the local machine (again via
801     `scp`) where it can be processed by local code.
802
803     You can and probably must override the path of
804     :file:`remote_worker.py` on your pool machines using the
805     `--remote_worker_helper_path` commandline argument (or by just
806     changing the default in code, see above in this file's code).
807
808     During remote work execution, this local machine acts as a
809     controller dispatching all work to the network, copying pickled
810     tasks out, and copying results back in.  It may also be a worker
811     in the pool but do not underestimate the cost of being a
812     controller -- it takes some cpu and a lot of network bandwidth.
813     The work dispatcher logic attempts to detect when a controller is
814     also a worker and reduce its load.
815
816     Some redundancy and safety provisions are made when scheduling
817     tasks to the worker pool; e.g. slower than expected tasks have
818     redundant backups tasks created, especially if there are otherwise
819     idle workers.  If a task fails repeatedly, the dispatcher consider
820     it poisoned and give up on it.
821
822     .. warning::
823
824         This executor probably only makes sense to use with
825         computationally expensive tasks such as jobs that will execute
826         for ~30 seconds or longer.
827
828         The network overhead and latency of copying work from the
829         controller (local) machine to the remote workers and copying
830         results back again is relatively high.  Especially at startup,
831         the network can become a bottleneck.  Future versions of this
832         code may attempt to split the responsibility of being a
833         controller (distributing work to pool machines).
834
835     Instructions for how to set this up are provided in
836     :class:`pyutils.parallelize.parallelize`.
837
838     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
839
840     """
841
842     def __init__(
843         self,
844         workers: List[RemoteWorkerRecord],
845         policy: RemoteWorkerSelectionPolicy,
846     ) -> None:
847         """
848         Args:
849             workers: A list of remote workers we can call on to do tasks.
850             policy: A policy for selecting remote workers for tasks.
851         """
852
853         super().__init__()
854         self.workers = workers
855         self.policy = policy
856         self.worker_count = 0
857         for worker in self.workers:
858             self.worker_count += worker.count
859         if self.worker_count <= 0:
860             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
861             logger.critical(msg)
862             raise RemoteExecutorException(msg)
863         self.policy.register_worker_pool(self.workers)
864         self.cv = threading.Condition()
865         logger.debug(
866             'Creating %d local threads, one per remote worker.', self.worker_count
867         )
868         self._helper_executor = fut.ThreadPoolExecutor(
869             thread_name_prefix="remote_executor_helper",
870             max_workers=self.worker_count,
871         )
872         self.status = RemoteExecutorStatus(self.worker_count)
873         self.total_bundles_submitted = 0
874         self.backup_lock = threading.Lock()
875         self.last_backup = None
876         (
877             self.heartbeat_thread,
878             self.heartbeat_stop_event,
879         ) = self._run_periodic_heartbeat()
880         self.already_shutdown = False
881
882     @background_thread
883     def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
884         """
885         We create a background thread to invoke :meth:`_heartbeat` regularly
886         while we are scheduling work.  It does some accounting such as
887         looking for slow bundles to tag for backup creation, checking for
888         unexpected failures, and printing a fancy message on stdout.
889         """
890         while not stop_event.is_set():
891             time.sleep(5.0)
892             logger.debug('Running periodic heartbeat code...')
893             self._heartbeat()
894         logger.debug('Periodic heartbeat thread shutting down.')
895
896     def _heartbeat(self) -> None:
897         # Note: this is invoked on a background thread, not an
898         # executor thread.  Be careful what you do with it b/c it
899         # needs to get back and dump status again periodically.
900         with self.status.lock:
901             self.status.periodic_dump(self.total_bundles_submitted)
902
903             # Look for bundles to reschedule via executor.submit
904             if config.config['executors_schedule_remote_backups']:
905                 self._maybe_schedule_backup_bundles()
906
907     def _maybe_schedule_backup_bundles(self):
908         """Maybe schedule backup bundles if we see a very slow bundle."""
909
910         assert self.status.lock.locked()
911         num_done = len(self.status.finished_bundle_timings)
912         num_idle_workers = self.worker_count - self.task_count
913         now = time.time()
914         if (
915             num_done >= 2
916             and num_idle_workers > 0
917             and (self.last_backup is None or (now - self.last_backup > 9.0))
918             and self.backup_lock.acquire(blocking=False)
919         ):
920             try:
921                 assert self.backup_lock.locked()
922
923                 bundle_to_backup = None
924                 best_score = None
925                 for (
926                     worker,
927                     bundle_uuids,
928                 ) in self.status.in_flight_bundles_by_worker.items():
929
930                     # Prefer to schedule backups of bundles running on
931                     # slower machines.
932                     base_score = 0
933                     for record in self.workers:
934                         if worker.machine == record.machine:
935                             temp_score = float(record.weight)
936                             temp_score = 1.0 / temp_score
937                             temp_score *= 200.0
938                             base_score = int(temp_score)
939                             break
940
941                     for uuid in bundle_uuids:
942                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
943                         if (
944                             bundle is not None
945                             and bundle.src_bundle is None
946                             and bundle.backup_bundles is not None
947                         ):
948                             score = base_score
949
950                             # Schedule backups of bundles running
951                             # longer; especially those that are
952                             # unexpectedly slow.
953                             start_ts = self.status.start_per_bundle[uuid]
954                             if start_ts is not None:
955                                 runtime = now - start_ts
956                                 score += runtime
957                                 logger.debug(
958                                     'score[%s] => %.1f  # latency boost', bundle, score
959                                 )
960
961                                 if bundle.slower_than_local_p95:
962                                     score += runtime / 2
963                                     logger.debug(
964                                         'score[%s] => %.1f  # >worker p95',
965                                         bundle,
966                                         score,
967                                     )
968
969                                 if bundle.slower_than_global_p95:
970                                     score += runtime / 4
971                                     logger.debug(
972                                         'score[%s] => %.1f  # >global p95',
973                                         bundle,
974                                         score,
975                                     )
976
977                             # Prefer backups of bundles that don't
978                             # have backups already.
979                             backup_count = len(bundle.backup_bundles)
980                             if backup_count == 0:
981                                 score *= 2
982                             elif backup_count == 1:
983                                 score /= 2
984                             elif backup_count == 2:
985                                 score /= 8
986                             else:
987                                 score = 0
988                             logger.debug(
989                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
990                                 bundle,
991                                 score,
992                             )
993
994                             if score != 0 and (
995                                 best_score is None or score > best_score
996                             ):
997                                 bundle_to_backup = bundle
998                                 assert bundle is not None
999                                 assert bundle.backup_bundles is not None
1000                                 assert bundle.src_bundle is None
1001                                 best_score = score
1002
1003                 # Note: this is all still happening on the heartbeat
1004                 # runner thread.  That's ok because
1005                 # _schedule_backup_for_bundle uses the executor to
1006                 # submit the bundle again which will cause it to be
1007                 # picked up by a worker thread and allow this thread
1008                 # to return to run future heartbeats.
1009                 if bundle_to_backup is not None:
1010                     self.last_backup = now
1011                     logger.info(
1012                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1013                         bundle_to_backup,
1014                         best_score,
1015                     )
1016                     self._schedule_backup_for_bundle(bundle_to_backup)
1017             finally:
1018                 self.backup_lock.release()
1019
1020     def _is_worker_available(self) -> bool:
1021         """Is there a worker available currently?"""
1022         return self.policy.is_worker_available()
1023
1024     def _acquire_worker(
1025         self, machine_to_avoid: str = None
1026     ) -> Optional[RemoteWorkerRecord]:
1027         """Try to acquire a worker."""
1028         return self.policy.acquire_worker(machine_to_avoid)
1029
1030     def _find_available_worker_or_block(
1031         self, machine_to_avoid: str = None
1032     ) -> RemoteWorkerRecord:
1033         """Find a worker or block until one becomes available."""
1034         with self.cv:
1035             while not self._is_worker_available():
1036                 self.cv.wait()
1037             worker = self._acquire_worker(machine_to_avoid)
1038             if worker is not None:
1039                 return worker
1040         msg = "We should never reach this point in the code"
1041         logger.critical(msg)
1042         raise Exception(msg)
1043
1044     def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1045         """Release a previously acquired worker."""
1046         worker = bundle.worker
1047         assert worker is not None
1048         logger.debug('Released worker %s', worker)
1049         self.status.record_release_worker(
1050             worker,
1051             bundle.uuid,
1052             was_cancelled,
1053         )
1054         with self.cv:
1055             worker.count += 1
1056             self.cv.notify()
1057         self.adjust_task_count(-1)
1058
1059     def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1060         """See if a particular bundle is cancelled.  Do not block."""
1061         with self.status.lock:
1062             if bundle.is_cancelled.wait(timeout=0.0):
1063                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1064                 bundle.was_cancelled = True
1065                 return True
1066         return False
1067
1068     def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1069         """Find a worker for bundle or block until one is available."""
1070
1071         self.adjust_task_count(+1)
1072         uuid = bundle.uuid
1073         controller = bundle.controller
1074         avoid_machine = override_avoid_machine
1075         is_original = bundle.src_bundle is None
1076
1077         # Try not to schedule a backup on the same host as the original.
1078         if avoid_machine is None and bundle.src_bundle is not None:
1079             avoid_machine = bundle.src_bundle.machine
1080         worker = None
1081         while worker is None:
1082             worker = self._find_available_worker_or_block(avoid_machine)
1083         assert worker is not None
1084
1085         # Ok, found a worker.
1086         bundle.worker = worker
1087         machine = bundle.machine = worker.machine
1088         username = bundle.username = worker.username
1089         self.status.record_acquire_worker(worker, uuid)
1090         logger.debug('%s: Running bundle on %s...', bundle, worker)
1091
1092         # Before we do any work, make sure the bundle is still viable.
1093         # It may have been some time between when it was submitted and
1094         # now due to lack of worker availability and someone else may
1095         # have already finished it.
1096         if self._check_if_cancelled(bundle):
1097             try:
1098                 return self._process_work_result(bundle)
1099             except Exception:
1100                 logger.warning(
1101                     '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1102                 )
1103                 self._release_worker(bundle)
1104                 if is_original:
1105                     # Weird.  We are the original owner of this
1106                     # bundle.  For it to have been cancelled, a backup
1107                     # must have already started and completed before
1108                     # we even for started.  Moreover, the backup says
1109                     # it is done but we can't find the results it
1110                     # should have copied over.  Reschedule the whole
1111                     # thing.
1112                     logger.exception(
1113                         '%s: We are the original owner thread and yet there are '
1114                         'no results for this bundle.  This is unexpected and bad. '
1115                         'Attempting an emergency retry...',
1116                         bundle,
1117                     )
1118                     return self._emergency_retry_nasty_bundle(bundle)
1119                 else:
1120                     # We're a backup and our bundle is cancelled
1121                     # before we even got started.  Do nothing and let
1122                     # the original bundle's thread worry about either
1123                     # finding the results or complaining about it.
1124                     return None
1125
1126         # Send input code / data to worker machine if it's not local.
1127         if controller not in machine:
1128             try:
1129                 cmd = (
1130                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1131                 )
1132                 start_ts = time.time()
1133                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1134                 run_silently(cmd)
1135                 xfer_latency = time.time() - start_ts
1136                 logger.debug(
1137                     "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1138                 )
1139             except Exception:
1140                 self._release_worker(bundle)
1141                 if is_original:
1142                     # Weird.  We tried to copy the code to the worker
1143                     # and it failed...  And we're the original bundle.
1144                     # We have to retry.
1145                     logger.exception(
1146                         "%s: Failed to send instructions to the worker machine?! "
1147                         "This is not expected; we\'re the original bundle so this shouldn\'t "
1148                         "be a race condition.  Attempting an emergency retry...",
1149                         bundle,
1150                     )
1151                     return self._emergency_retry_nasty_bundle(bundle)
1152                 else:
1153                     # This is actually expected; we're a backup.
1154                     # There's a race condition where someone else
1155                     # already finished the work and removed the source
1156                     # code_file before we could copy it.  Ignore.
1157                     logger.warning(
1158                         '%s: Failed to send instructions to the worker machine... '
1159                         'We\'re a backup and this may be caused by the original (or '
1160                         'some other backup) already finishing this work.  Ignoring.',
1161                         bundle,
1162                     )
1163                     return None
1164
1165         # Kick off the work.  Note that if this fails we let
1166         # _wait_for_process deal with it.
1167         self.status.record_processing_began(uuid)
1168         helper_path = config.config['remote_worker_helper_path']
1169         cmd = (
1170             f'{SSH} {bundle.username}@{bundle.machine} '
1171             f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1172         )
1173         logger.debug(
1174             '%s: Executing %s in the background to kick off work...', bundle, cmd
1175         )
1176         p = cmd_in_background(cmd, silent=True)
1177         bundle.pid = p.pid
1178         logger.debug(
1179             '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1180         )
1181         return self._wait_for_process(p, bundle, 0)
1182
1183     def _wait_for_process(
1184         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1185     ) -> Any:
1186         """At this point we've copied the bundle's pickled code to the remote
1187         worker and started an ssh process that should be invoking the
1188         remote worker to have it execute the user's code.  See how
1189         that's going and wait for it to complete or fail.  Note that
1190         this code is recursive: there are codepaths where we decide to
1191         stop waiting for an ssh process (because another backup seems
1192         to have finished) but then fail to fetch or parse the results
1193         from that backup and thus call ourselves to continue waiting
1194         on an active ssh process.  This is the purpose of the depth
1195         argument: to curtail potential infinite recursion by giving up
1196         eventually.
1197
1198         Args:
1199             p: the Popen record of the ssh job
1200             bundle: the bundle of work being executed remotely
1201             depth: how many retries we've made so far.  Starts at zero.
1202
1203         """
1204
1205         machine = bundle.machine
1206         assert p is not None
1207         pid = p.pid  # pid of the ssh process
1208         if depth > 3:
1209             logger.error(
1210                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1211                 pid,
1212             )
1213             p.terminate()
1214             self._release_worker(bundle)
1215             return self._emergency_retry_nasty_bundle(bundle)
1216
1217         # Spin until either the ssh job we scheduled finishes the
1218         # bundle or some backup worker signals that they finished it
1219         # before we could.
1220         while True:
1221             try:
1222                 p.wait(timeout=0.25)
1223             except subprocess.TimeoutExpired:
1224                 if self._check_if_cancelled(bundle):
1225                     logger.info(
1226                         '%s: looks like another worker finished bundle...', bundle
1227                     )
1228                     break
1229             else:
1230                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1231                 p = None
1232                 break
1233
1234         # If we get here we believe the bundle is done; either the ssh
1235         # subprocess finished (hopefully successfully) or we noticed
1236         # that some other worker seems to have completed the bundle
1237         # before us and we're bailing out.
1238         try:
1239             ret = self._process_work_result(bundle)
1240             if ret is not None and p is not None:
1241                 p.terminate()
1242             return ret
1243
1244         # Something went wrong; e.g. we could not copy the results
1245         # back, cleanup after ourselves on the remote machine, or
1246         # unpickle the results we got from the remove machine.  If we
1247         # still have an active ssh subprocess, keep waiting on it.
1248         # Otherwise, time for an emergency reschedule.
1249         except Exception:
1250             logger.exception('%s: Something unexpected just happened...', bundle)
1251             if p is not None:
1252                 logger.warning(
1253                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1254                     bundle,
1255                 )
1256                 return self._wait_for_process(p, bundle, depth + 1)
1257             else:
1258                 self._release_worker(bundle)
1259                 return self._emergency_retry_nasty_bundle(bundle)
1260
1261     def _process_work_result(self, bundle: BundleDetails) -> Any:
1262         """A bundle seems to be completed.  Check on the results."""
1263
1264         with self.status.lock:
1265             is_original = bundle.src_bundle is None
1266             was_cancelled = bundle.was_cancelled
1267             username = bundle.username
1268             machine = bundle.machine
1269             result_file = bundle.result_file
1270             code_file = bundle.code_file
1271
1272             # Whether original or backup, if we finished first we must
1273             # fetch the results if the computation happened on a
1274             # remote machine.
1275             bundle.end_ts = time.time()
1276             if not was_cancelled:
1277                 assert bundle.machine is not None
1278                 if bundle.controller not in bundle.machine:
1279                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1280                     logger.info(
1281                         "%s: Fetching results back from %s@%s via %s",
1282                         bundle,
1283                         username,
1284                         machine,
1285                         cmd,
1286                     )
1287
1288                     # If either of these throw they are handled in
1289                     # _wait_for_process.
1290                     attempts = 0
1291                     while True:
1292                         try:
1293                             run_silently(cmd)
1294                         except Exception as e:
1295                             attempts += 1
1296                             if attempts >= 3:
1297                                 raise e
1298                         else:
1299                             break
1300
1301                     # Cleanup remote /tmp files.
1302                     run_silently(
1303                         f'{SSH} {username}@{machine}'
1304                         f' "/bin/rm -f {code_file} {result_file}"'
1305                     )
1306                     logger.debug(
1307                         'Fetching results back took %.2fs', time.time() - bundle.end_ts
1308                     )
1309                 dur = bundle.end_ts - bundle.start_ts
1310                 self.histogram.add_item(dur)
1311
1312         # Only the original worker should unpickle the file contents
1313         # though since it's the only one whose result matters.  The
1314         # original is also the only job that may delete result_file
1315         # from disk.  Note that the original may have been cancelled
1316         # if one of the backups finished first; it still must read the
1317         # result from disk.  It still does that here with is_cancelled
1318         # set.
1319         if is_original:
1320             logger.debug("%s: Unpickling %s.", bundle, result_file)
1321             try:
1322                 with open(result_file, 'rb') as rb:
1323                     serialized = rb.read()
1324                 result = cloudpickle.loads(serialized)
1325             except Exception as e:
1326                 logger.exception('Failed to load %s... this is bad news.', result_file)
1327                 self._release_worker(bundle)
1328
1329                 # Re-raise the exception; the code in _wait_for_process may
1330                 # decide to _emergency_retry_nasty_bundle here.
1331                 raise e
1332             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1333             os.remove(result_file)
1334             os.remove(code_file)
1335
1336             # Notify any backups that the original is done so they
1337             # should stop ASAP.  Do this whether or not we
1338             # finished first since there could be more than one
1339             # backup.
1340             if bundle.backup_bundles is not None:
1341                 for backup in bundle.backup_bundles:
1342                     logger.debug(
1343                         '%s: Notifying backup %s that it\'s cancelled',
1344                         bundle,
1345                         backup.uuid,
1346                     )
1347                     backup.is_cancelled.set()
1348
1349         # This is a backup job and, by now, we have already fetched
1350         # the bundle results.
1351         else:
1352             # Backup results don't matter, they just need to leave the
1353             # result file in the right place for their originals to
1354             # read/unpickle later.
1355             result = None
1356
1357             # Tell the original to stop if we finished first.
1358             if not was_cancelled:
1359                 orig_bundle = bundle.src_bundle
1360                 assert orig_bundle is not None
1361                 logger.debug(
1362                     '%s: Notifying original %s we beat them to it.',
1363                     bundle,
1364                     orig_bundle.uuid,
1365                 )
1366                 orig_bundle.is_cancelled.set()
1367         self._release_worker(bundle, was_cancelled=was_cancelled)
1368         return result
1369
1370     def _create_original_bundle(self, pickle, function_name: str):
1371         """Creates a bundle that is not a backup of any other bundle but
1372         rather represents a user task.
1373         """
1374
1375         uuid = string_utils.generate_uuid(omit_dashes=True)
1376         code_file = f'/tmp/{uuid}.code.bin'
1377         result_file = f'/tmp/{uuid}.result.bin'
1378
1379         logger.debug('Writing pickled code to %s', code_file)
1380         with open(code_file, 'wb') as wb:
1381             wb.write(pickle)
1382
1383         bundle = BundleDetails(
1384             pickled_code=pickle,
1385             uuid=uuid,
1386             function_name=function_name,
1387             worker=None,
1388             username=None,
1389             machine=None,
1390             controller=platform.node(),
1391             code_file=code_file,
1392             result_file=result_file,
1393             pid=0,
1394             start_ts=time.time(),
1395             end_ts=0.0,
1396             slower_than_local_p95=False,
1397             slower_than_global_p95=False,
1398             src_bundle=None,
1399             is_cancelled=threading.Event(),
1400             was_cancelled=False,
1401             backup_bundles=[],
1402             failure_count=0,
1403         )
1404         self.status.record_bundle_details(bundle)
1405         logger.debug('%s: Created an original bundle', bundle)
1406         return bundle
1407
1408     def _create_backup_bundle(self, src_bundle: BundleDetails):
1409         """Creates a bundle that is a backup of another bundle that is
1410         running too slowly."""
1411
1412         assert self.status.lock.locked()
1413         assert src_bundle.backup_bundles is not None
1414         n = len(src_bundle.backup_bundles)
1415         uuid = src_bundle.uuid + f'_backup#{n}'
1416
1417         backup_bundle = BundleDetails(
1418             pickled_code=src_bundle.pickled_code,
1419             uuid=uuid,
1420             function_name=src_bundle.function_name,
1421             worker=None,
1422             username=None,
1423             machine=None,
1424             controller=src_bundle.controller,
1425             code_file=src_bundle.code_file,
1426             result_file=src_bundle.result_file,
1427             pid=0,
1428             start_ts=time.time(),
1429             end_ts=0.0,
1430             slower_than_local_p95=False,
1431             slower_than_global_p95=False,
1432             src_bundle=src_bundle,
1433             is_cancelled=threading.Event(),
1434             was_cancelled=False,
1435             backup_bundles=None,  # backup backups not allowed
1436             failure_count=0,
1437         )
1438         src_bundle.backup_bundles.append(backup_bundle)
1439         self.status.record_bundle_details_already_locked(backup_bundle)
1440         logger.debug('%s: Created a backup bundle', backup_bundle)
1441         return backup_bundle
1442
1443     def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1444         """Schedule a backup of src_bundle."""
1445
1446         assert self.status.lock.locked()
1447         assert src_bundle is not None
1448         backup_bundle = self._create_backup_bundle(src_bundle)
1449         logger.debug(
1450             '%s/%s: Scheduling backup for execution...',
1451             backup_bundle.uuid,
1452             backup_bundle.function_name,
1453         )
1454         self._helper_executor.submit(self._launch, backup_bundle)
1455
1456         # Results from backups don't matter; if they finish first
1457         # they will move the result_file to this machine and let
1458         # the original pick them up and unpickle them (and return
1459         # a result).
1460
1461     def _emergency_retry_nasty_bundle(
1462         self, bundle: BundleDetails
1463     ) -> Optional[fut.Future]:
1464         """Something unexpectedly failed with bundle.  Either retry it
1465         from the beginning or throw in the towel and give up on it."""
1466
1467         is_original = bundle.src_bundle is None
1468         bundle.worker = None
1469         avoid_last_machine = bundle.machine
1470         bundle.machine = None
1471         bundle.username = None
1472         bundle.failure_count += 1
1473         if is_original:
1474             retry_limit = 3
1475         else:
1476             retry_limit = 2
1477
1478         if bundle.failure_count > retry_limit:
1479             logger.error(
1480                 '%s: Tried this bundle too many times already (%dx); giving up.',
1481                 bundle,
1482                 retry_limit,
1483             )
1484             if is_original:
1485                 raise RemoteExecutorException(
1486                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1487                 )
1488             logger.error(
1489                 '%s: At least it\'s only a backup; better luck with the others.',
1490                 bundle,
1491             )
1492             return None
1493         else:
1494             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1495             logger.warning(msg)
1496             warnings.warn(msg)
1497             return self._launch(bundle, avoid_last_machine)
1498
1499     @overrides
1500     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1501         """Submit work to be done.  This is the user entry point of this
1502         class."""
1503         if self.already_shutdown:
1504             raise Exception('Submitted work after shutdown.')
1505         pickle = _make_cloud_pickle(function, *args, **kwargs)
1506         bundle = self._create_original_bundle(pickle, function.__name__)
1507         self.total_bundles_submitted += 1
1508         return self._helper_executor.submit(self._launch, bundle)
1509
1510     @overrides
1511     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1512         """Shutdown the executor."""
1513         if not self.already_shutdown:
1514             logging.debug('Shutting down RemoteExecutor %s', self.title)
1515             self.heartbeat_stop_event.set()
1516             self.heartbeat_thread.join()
1517             self._helper_executor.shutdown(wait)
1518             if not quiet:
1519                 print(self.histogram.__repr__(label_formatter='%ds'))
1520             self.already_shutdown = True
1521
1522
1523 class RemoteWorkerPoolProvider:
1524     @abstractmethod
1525     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1526         pass
1527
1528
1529 @persistent.persistent_autoloaded_singleton()  # type: ignore
1530 class ConfigRemoteWorkerPoolProvider(
1531     RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1532 ):
1533     def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1534         self.remote_worker_pool: List[RemoteWorkerRecord] = []
1535         for record in json_remote_worker_pool['remote_worker_records']:
1536             self.remote_worker_pool.append(
1537                 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1538             )
1539         assert len(self.remote_worker_pool) > 0
1540
1541     @overrides
1542     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1543         return self.remote_worker_pool
1544
1545     @overrides
1546     def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1547         return self.remote_worker_pool
1548
1549     @staticmethod
1550     @overrides
1551     def get_filename() -> str:
1552         return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1553
1554     @staticmethod
1555     @overrides
1556     def should_we_load_data(filename: str) -> bool:
1557         return True
1558
1559     @staticmethod
1560     @overrides
1561     def should_we_save_data(filename: str) -> bool:
1562         return False
1563
1564
1565 @singleton
1566 class DefaultExecutors(object):
1567     """A container for a default thread, process and remote executor.
1568     These are not created until needed and we take care to clean up
1569     before process exit automatically for the caller's convenience.
1570     Instead of creating your own executor, consider using the one
1571     from this pool.  e.g.::
1572
1573         @par.parallelize(method=par.Method.PROCESS)
1574         def do_work(
1575             solutions: List[Work],
1576             shard_num: int,
1577             ...
1578         ):
1579             <do the work>
1580
1581
1582         def start_do_work(all_work: List[Work]):
1583             shards = []
1584             logger.debug('Sharding work into groups of 10.')
1585             for subset in list_utils.shard(all_work, 10):
1586                 shards.append([x for x in subset])
1587
1588             logger.debug('Kicking off helper pool.')
1589             try:
1590                 for n, shard in enumerate(shards):
1591                     results.append(
1592                         do_work(
1593                             shard, n, shared_cache.get_name(), max_letter_pop_per_word
1594                         )
1595                     )
1596                 smart_future.wait_all(results)
1597             finally:
1598                 # Note: if you forget to do this it will clean itself up
1599                 # during program termination including tearing down any
1600                 # active ssh connections.
1601                 executors.DefaultExecutors().process_pool().shutdown()
1602     """
1603
1604     def __init__(self):
1605         self.thread_executor: Optional[ThreadExecutor] = None
1606         self.process_executor: Optional[ProcessExecutor] = None
1607         self.remote_executor: Optional[RemoteExecutor] = None
1608
1609     @staticmethod
1610     def _ping(host) -> bool:
1611         logger.debug('RUN> ping -c 1 %s', host)
1612         try:
1613             x = cmd_exitcode(
1614                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1615             )
1616             return x == 0
1617         except Exception:
1618             return False
1619
1620     def thread_pool(self) -> ThreadExecutor:
1621         if self.thread_executor is None:
1622             self.thread_executor = ThreadExecutor()
1623         return self.thread_executor
1624
1625     def process_pool(self) -> ProcessExecutor:
1626         if self.process_executor is None:
1627             self.process_executor = ProcessExecutor()
1628         return self.process_executor
1629
1630     def remote_pool(self) -> RemoteExecutor:
1631         if self.remote_executor is None:
1632             logger.info('Looking for some helper machines...')
1633             provider = ConfigRemoteWorkerPoolProvider()
1634             all_machines = provider.get_remote_workers()
1635             pool = []
1636
1637             # Make sure we can ping each machine.
1638             for record in all_machines:
1639                 if self._ping(record.machine):
1640                     logger.info('%s is alive / responding to pings', record.machine)
1641                     pool.append(record)
1642
1643             # The controller machine has a lot to do; go easy on it.
1644             for record in pool:
1645                 if record.machine == platform.node() and record.count > 1:
1646                     logger.info('Reducing workload for %s.', record.machine)
1647                     record.count = max(int(record.count / 2), 1)
1648
1649             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1650             policy.register_worker_pool(pool)
1651             self.remote_executor = RemoteExecutor(pool, policy)
1652         return self.remote_executor
1653
1654     def shutdown(self) -> None:
1655         if self.thread_executor is not None:
1656             self.thread_executor.shutdown(wait=True, quiet=True)
1657             self.thread_executor = None
1658         if self.process_executor is not None:
1659             self.process_executor.shutdown(wait=True, quiet=True)
1660             self.process_executor = None
1661         if self.remote_executor is not None:
1662             self.remote_executor.shutdown(wait=True, quiet=True)
1663             self.remote_executor = None