Change settings in flake8 and black.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 import concurrent.futures as fut
6 import logging
7 import os
8 import platform
9 import random
10 import subprocess
11 import threading
12 import time
13 import warnings
14 from abc import ABC, abstractmethod
15 from collections import defaultdict
16 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20 import numpy
21 from overrides import overrides
22
23 import argparse_utils
24 import config
25 import histogram as hist
26 from ansi import bg, fg, reset, underline
27 from decorator_utils import singleton
28 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
29 from thread_utils import background_thread
30
31 logger = logging.getLogger(__name__)
32
33 parser = config.add_commandline_args(
34     f"Executors ({__file__})", "Args related to processing executors."
35 )
36 parser.add_argument(
37     '--executors_threadpool_size',
38     type=int,
39     metavar='#THREADS',
40     help='Number of threads in the default threadpool, leave unset for default',
41     default=None,
42 )
43 parser.add_argument(
44     '--executors_processpool_size',
45     type=int,
46     metavar='#PROCESSES',
47     help='Number of processes in the default processpool, leave unset for default',
48     default=None,
49 )
50 parser.add_argument(
51     '--executors_schedule_remote_backups',
52     default=True,
53     action=argparse_utils.ActionNoYes,
54     help='Should we schedule duplicative backup work if a remote bundle is slow',
55 )
56 parser.add_argument(
57     '--executors_max_bundle_failures',
58     type=int,
59     default=3,
60     metavar='#FAILURES',
61     help='Maximum number of failures before giving up on a bundle',
62 )
63
64 SSH = '/usr/bin/ssh -oForwardX11=no'
65 SCP = '/usr/bin/scp -C'
66
67
68 def make_cloud_pickle(fun, *args, **kwargs):
69     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
70     return cloudpickle.dumps((fun, args, kwargs))
71
72
73 class BaseExecutor(ABC):
74     def __init__(self, *, title=''):
75         self.title = title
76         self.histogram = hist.SimpleHistogram(
77             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
78         )
79         self.task_count = 0
80
81     @abstractmethod
82     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
83         pass
84
85     @abstractmethod
86     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
87         pass
88
89     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
90         """Shutdown the executor and return True if the executor is idle
91         (i.e. there are no pending or active tasks).  Return False
92         otherwise.  Note: this should only be called by the launcher
93         process.
94
95         """
96         if self.task_count == 0:
97             self.shutdown(wait=True, quiet=quiet)
98             return True
99         return False
100
101     def adjust_task_count(self, delta: int) -> None:
102         """Change the task count.  Note: do not call this method from a
103         worker, it should only be called by the launcher process /
104         thread / machine.
105
106         """
107         self.task_count += delta
108         logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
109
110     def get_task_count(self) -> int:
111         """Change the task count.  Note: do not call this method from a
112         worker, it should only be called by the launcher process /
113         thread / machine.
114
115         """
116         return self.task_count
117
118
119 class ThreadExecutor(BaseExecutor):
120     def __init__(self, max_workers: Optional[int] = None):
121         super().__init__()
122         workers = None
123         if max_workers is not None:
124             workers = max_workers
125         elif 'executors_threadpool_size' in config.config:
126             workers = config.config['executors_threadpool_size']
127         logger.debug(f'Creating threadpool executor with {workers} workers')
128         self._thread_pool_executor = fut.ThreadPoolExecutor(
129             max_workers=workers, thread_name_prefix="thread_executor_helper"
130         )
131         self.already_shutdown = False
132
133     # This is run on a different thread; do not adjust task count here.
134     def run_local_bundle(self, fun, *args, **kwargs):
135         logger.debug(f"Running local bundle at {fun.__name__}")
136         result = fun(*args, **kwargs)
137         return result
138
139     @overrides
140     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
141         if self.already_shutdown:
142             raise Exception('Submitted work after shutdown.')
143         self.adjust_task_count(+1)
144         newargs = []
145         newargs.append(function)
146         for arg in args:
147             newargs.append(arg)
148         start = time.time()
149         result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
150         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
151         result.add_done_callback(lambda _: self.adjust_task_count(-1))
152         return result
153
154     @overrides
155     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
156         if not self.already_shutdown:
157             logger.debug(f'Shutting down threadpool executor {self.title}')
158             self._thread_pool_executor.shutdown(wait)
159             if not quiet:
160                 print(self.histogram.__repr__(label_formatter='%ds'))
161             self.already_shutdown = True
162
163
164 class ProcessExecutor(BaseExecutor):
165     def __init__(self, max_workers=None):
166         super().__init__()
167         workers = None
168         if max_workers is not None:
169             workers = max_workers
170         elif 'executors_processpool_size' in config.config:
171             workers = config.config['executors_processpool_size']
172         logger.debug(f'Creating processpool executor with {workers} workers.')
173         self._process_executor = fut.ProcessPoolExecutor(
174             max_workers=workers,
175         )
176         self.already_shutdown = False
177
178     # This is run in another process; do not adjust task count here.
179     def run_cloud_pickle(self, pickle):
180         fun, args, kwargs = cloudpickle.loads(pickle)
181         logger.debug(f"Running pickled bundle at {fun.__name__}")
182         result = fun(*args, **kwargs)
183         return result
184
185     @overrides
186     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
187         if self.already_shutdown:
188             raise Exception('Submitted work after shutdown.')
189         start = time.time()
190         self.adjust_task_count(+1)
191         pickle = make_cloud_pickle(function, *args, **kwargs)
192         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
193         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
194         result.add_done_callback(lambda _: self.adjust_task_count(-1))
195         return result
196
197     @overrides
198     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
199         if not self.already_shutdown:
200             logger.debug(f'Shutting down processpool executor {self.title}')
201             self._process_executor.shutdown(wait)
202             if not quiet:
203                 print(self.histogram.__repr__(label_formatter='%ds'))
204             self.already_shutdown = True
205
206     def __getstate__(self):
207         state = self.__dict__.copy()
208         state['_process_executor'] = None
209         return state
210
211
212 class RemoteExecutorException(Exception):
213     """Thrown when a bundle cannot be executed despite several retries."""
214
215     pass
216
217
218 @dataclass
219 class RemoteWorkerRecord:
220     username: str
221     machine: str
222     weight: int
223     count: int
224
225     def __hash__(self):
226         return hash((self.username, self.machine))
227
228     def __repr__(self):
229         return f'{self.username}@{self.machine}'
230
231
232 @dataclass
233 class BundleDetails:
234     pickled_code: bytes
235     uuid: str
236     fname: str
237     worker: Optional[RemoteWorkerRecord]
238     username: Optional[str]
239     machine: Optional[str]
240     hostname: str
241     code_file: str
242     result_file: str
243     pid: int
244     start_ts: float
245     end_ts: float
246     slower_than_local_p95: bool
247     slower_than_global_p95: bool
248     src_bundle: Optional[BundleDetails]
249     is_cancelled: threading.Event
250     was_cancelled: bool
251     backup_bundles: Optional[List[BundleDetails]]
252     failure_count: int
253
254     def __repr__(self):
255         uuid = self.uuid
256         if uuid[-9:-2] == '_backup':
257             uuid = uuid[:-9]
258             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
259         else:
260             suffix = uuid[-6:]
261
262         colorz = [
263             fg('violet red'),
264             fg('red'),
265             fg('orange'),
266             fg('peach orange'),
267             fg('yellow'),
268             fg('marigold yellow'),
269             fg('green yellow'),
270             fg('tea green'),
271             fg('cornflower blue'),
272             fg('turquoise blue'),
273             fg('tropical blue'),
274             fg('lavender purple'),
275             fg('medium purple'),
276         ]
277         c = colorz[int(uuid[-2:], 16) % len(colorz)]
278         fname = self.fname if self.fname is not None else 'nofname'
279         machine = self.machine if self.machine is not None else 'nomachine'
280         return f'{c}{suffix}/{fname}/{machine}{reset()}'
281
282
283 class RemoteExecutorStatus:
284     def __init__(self, total_worker_count: int) -> None:
285         self.worker_count: int = total_worker_count
286         self.known_workers: Set[RemoteWorkerRecord] = set()
287         self.start_time: float = time.time()
288         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
289         self.end_per_bundle: Dict[str, float] = defaultdict(float)
290         self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
291         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
292         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
293         self.finished_bundle_timings: List[float] = []
294         self.last_periodic_dump: Optional[float] = None
295         self.total_bundles_submitted: int = 0
296
297         # Protects reads and modification using self.  Also used
298         # as a memory fence for modifications to bundle.
299         self.lock: threading.Lock = threading.Lock()
300
301     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
302         with self.lock:
303             self.record_acquire_worker_already_locked(worker, uuid)
304
305     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
306         assert self.lock.locked()
307         self.known_workers.add(worker)
308         self.start_per_bundle[uuid] = None
309         x = self.in_flight_bundles_by_worker.get(worker, set())
310         x.add(uuid)
311         self.in_flight_bundles_by_worker[worker] = x
312
313     def record_bundle_details(self, details: BundleDetails) -> None:
314         with self.lock:
315             self.record_bundle_details_already_locked(details)
316
317     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
318         assert self.lock.locked()
319         self.bundle_details_by_uuid[details.uuid] = details
320
321     def record_release_worker(
322         self,
323         worker: RemoteWorkerRecord,
324         uuid: str,
325         was_cancelled: bool,
326     ) -> None:
327         with self.lock:
328             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
329
330     def record_release_worker_already_locked(
331         self,
332         worker: RemoteWorkerRecord,
333         uuid: str,
334         was_cancelled: bool,
335     ) -> None:
336         assert self.lock.locked()
337         ts = time.time()
338         self.end_per_bundle[uuid] = ts
339         self.in_flight_bundles_by_worker[worker].remove(uuid)
340         if not was_cancelled:
341             start = self.start_per_bundle[uuid]
342             assert start is not None
343             bundle_latency = ts - start
344             x = self.finished_bundle_timings_per_worker.get(worker, list())
345             x.append(bundle_latency)
346             self.finished_bundle_timings_per_worker[worker] = x
347             self.finished_bundle_timings.append(bundle_latency)
348
349     def record_processing_began(self, uuid: str):
350         with self.lock:
351             self.start_per_bundle[uuid] = time.time()
352
353     def total_in_flight(self) -> int:
354         assert self.lock.locked()
355         total_in_flight = 0
356         for worker in self.known_workers:
357             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
358         return total_in_flight
359
360     def total_idle(self) -> int:
361         assert self.lock.locked()
362         return self.worker_count - self.total_in_flight()
363
364     def __repr__(self):
365         assert self.lock.locked()
366         ts = time.time()
367         total_finished = len(self.finished_bundle_timings)
368         total_in_flight = self.total_in_flight()
369         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
370         qall = None
371         if len(self.finished_bundle_timings) > 1:
372             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
373             ret += (
374                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
375                 f'✅={total_finished}/{self.total_bundles_submitted}, '
376                 f'💻n={total_in_flight}/{self.worker_count}\n'
377             )
378         else:
379             ret += (
380                 f'⏱={ts-self.start_time:.1f}s, '
381                 f'✅={total_finished}/{self.total_bundles_submitted}, '
382                 f'💻n={total_in_flight}/{self.worker_count}\n'
383             )
384
385         for worker in self.known_workers:
386             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
387             timings = self.finished_bundle_timings_per_worker.get(worker, [])
388             count = len(timings)
389             qworker = None
390             if count > 1:
391                 qworker = numpy.quantile(timings, [0.5, 0.95])
392                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
393             else:
394                 ret += '\n'
395             if count > 0:
396                 ret += f'    ...finished {count} total bundle(s) so far\n'
397             in_flight = len(self.in_flight_bundles_by_worker[worker])
398             if in_flight > 0:
399                 ret += f'    ...{in_flight} bundles currently in flight:\n'
400                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
401                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
402                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
403                     if self.start_per_bundle[bundle_uuid] is not None:
404                         sec = ts - self.start_per_bundle[bundle_uuid]
405                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
406                     else:
407                         ret += f'       {details} setting up / copying data...'
408                         sec = 0.0
409
410                     if qworker is not None:
411                         if sec > qworker[1]:
412                             ret += f'{bg("red")}>💻p95{reset()} '
413                             if details is not None:
414                                 details.slower_than_local_p95 = True
415                         else:
416                             if details is not None:
417                                 details.slower_than_local_p95 = False
418
419                     if qall is not None:
420                         if sec > qall[1]:
421                             ret += f'{bg("red")}>∀p95{reset()} '
422                             if details is not None:
423                                 details.slower_than_global_p95 = True
424                         else:
425                             details.slower_than_global_p95 = False
426                     ret += '\n'
427         return ret
428
429     def periodic_dump(self, total_bundles_submitted: int) -> None:
430         assert self.lock.locked()
431         self.total_bundles_submitted = total_bundles_submitted
432         ts = time.time()
433         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
434             print(self)
435             self.last_periodic_dump = ts
436
437
438 class RemoteWorkerSelectionPolicy(ABC):
439     def register_worker_pool(self, workers):
440         self.workers = workers
441
442     @abstractmethod
443     def is_worker_available(self) -> bool:
444         pass
445
446     @abstractmethod
447     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
448         pass
449
450
451 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
452     @overrides
453     def is_worker_available(self) -> bool:
454         for worker in self.workers:
455             if worker.count > 0:
456                 return True
457         return False
458
459     @overrides
460     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
461         grabbag = []
462         for worker in self.workers:
463             if worker.machine != machine_to_avoid:
464                 if worker.count > 0:
465                     for _ in range(worker.count * worker.weight):
466                         grabbag.append(worker)
467
468         if len(grabbag) == 0:
469             logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
470             for worker in self.workers:
471                 if worker.count > 0:
472                     for _ in range(worker.count * worker.weight):
473                         grabbag.append(worker)
474
475         if len(grabbag) == 0:
476             logger.warning('There are no available workers?!')
477             return None
478
479         worker = random.sample(grabbag, 1)[0]
480         assert worker.count > 0
481         worker.count -= 1
482         logger.debug(f'Chose worker {worker}')
483         return worker
484
485
486 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
487     def __init__(self) -> None:
488         self.index = 0
489
490     @overrides
491     def is_worker_available(self) -> bool:
492         for worker in self.workers:
493             if worker.count > 0:
494                 return True
495         return False
496
497     @overrides
498     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
499         x = self.index
500         while True:
501             worker = self.workers[x]
502             if worker.count > 0:
503                 worker.count -= 1
504                 x += 1
505                 if x >= len(self.workers):
506                     x = 0
507                 self.index = x
508                 logger.debug(f'Selected worker {worker}')
509                 return worker
510             x += 1
511             if x >= len(self.workers):
512                 x = 0
513             if x == self.index:
514                 msg = 'Unexpectedly could not find a worker, retrying...'
515                 logger.warning(msg)
516                 return None
517
518
519 class RemoteExecutor(BaseExecutor):
520     def __init__(
521         self,
522         workers: List[RemoteWorkerRecord],
523         policy: RemoteWorkerSelectionPolicy,
524     ) -> None:
525         super().__init__()
526         self.workers = workers
527         self.policy = policy
528         self.worker_count = 0
529         for worker in self.workers:
530             self.worker_count += worker.count
531         if self.worker_count <= 0:
532             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
533             logger.critical(msg)
534             raise RemoteExecutorException(msg)
535         self.policy.register_worker_pool(self.workers)
536         self.cv = threading.Condition()
537         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
538         self._helper_executor = fut.ThreadPoolExecutor(
539             thread_name_prefix="remote_executor_helper",
540             max_workers=self.worker_count,
541         )
542         self.status = RemoteExecutorStatus(self.worker_count)
543         self.total_bundles_submitted = 0
544         self.backup_lock = threading.Lock()
545         self.last_backup = None
546         (
547             self.heartbeat_thread,
548             self.heartbeat_stop_event,
549         ) = self.run_periodic_heartbeat()
550         self.already_shutdown = False
551
552     @background_thread
553     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
554         while not stop_event.is_set():
555             time.sleep(5.0)
556             logger.debug('Running periodic heartbeat code...')
557             self.heartbeat()
558         logger.debug('Periodic heartbeat thread shutting down.')
559
560     def heartbeat(self) -> None:
561         # Note: this is invoked on a background thread, not an
562         # executor thread.  Be careful what you do with it b/c it
563         # needs to get back and dump status again periodically.
564         with self.status.lock:
565             self.status.periodic_dump(self.total_bundles_submitted)
566
567             # Look for bundles to reschedule via executor.submit
568             if config.config['executors_schedule_remote_backups']:
569                 self.maybe_schedule_backup_bundles()
570
571     def maybe_schedule_backup_bundles(self):
572         assert self.status.lock.locked()
573         num_done = len(self.status.finished_bundle_timings)
574         num_idle_workers = self.worker_count - self.task_count
575         now = time.time()
576         if (
577             num_done > 2
578             and num_idle_workers > 1
579             and (self.last_backup is None or (now - self.last_backup > 9.0))
580             and self.backup_lock.acquire(blocking=False)
581         ):
582             try:
583                 assert self.backup_lock.locked()
584
585                 bundle_to_backup = None
586                 best_score = None
587                 for (
588                     worker,
589                     bundle_uuids,
590                 ) in self.status.in_flight_bundles_by_worker.items():
591
592                     # Prefer to schedule backups of bundles running on
593                     # slower machines.
594                     base_score = 0
595                     for record in self.workers:
596                         if worker.machine == record.machine:
597                             base_score = float(record.weight)
598                             base_score = 1.0 / base_score
599                             base_score *= 200.0
600                             base_score = int(base_score)
601                             break
602
603                     for uuid in bundle_uuids:
604                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
605                         if (
606                             bundle is not None
607                             and bundle.src_bundle is None
608                             and bundle.backup_bundles is not None
609                         ):
610                             score = base_score
611
612                             # Schedule backups of bundles running
613                             # longer; especially those that are
614                             # unexpectedly slow.
615                             start_ts = self.status.start_per_bundle[uuid]
616                             if start_ts is not None:
617                                 runtime = now - start_ts
618                                 score += runtime
619                                 logger.debug(f'score[{bundle}] => {score}  # latency boost')
620
621                                 if bundle.slower_than_local_p95:
622                                     score += runtime / 2
623                                     logger.debug(f'score[{bundle}] => {score}  # >worker p95')
624
625                                 if bundle.slower_than_global_p95:
626                                     score += runtime / 4
627                                     logger.debug(f'score[{bundle}] => {score}  # >global p95')
628
629                             # Prefer backups of bundles that don't
630                             # have backups already.
631                             backup_count = len(bundle.backup_bundles)
632                             if backup_count == 0:
633                                 score *= 2
634                             elif backup_count == 1:
635                                 score /= 2
636                             elif backup_count == 2:
637                                 score /= 8
638                             else:
639                                 score = 0
640                             logger.debug(
641                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
642                             )
643
644                             if score != 0 and (best_score is None or score > best_score):
645                                 bundle_to_backup = bundle
646                                 assert bundle is not None
647                                 assert bundle.backup_bundles is not None
648                                 assert bundle.src_bundle is None
649                                 best_score = score
650
651                 # Note: this is all still happening on the heartbeat
652                 # runner thread.  That's ok because
653                 # schedule_backup_for_bundle uses the executor to
654                 # submit the bundle again which will cause it to be
655                 # picked up by a worker thread and allow this thread
656                 # to return to run future heartbeats.
657                 if bundle_to_backup is not None:
658                     self.last_backup = now
659                     logger.info(
660                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
661                     )
662                     self.schedule_backup_for_bundle(bundle_to_backup)
663             finally:
664                 self.backup_lock.release()
665
666     def is_worker_available(self) -> bool:
667         return self.policy.is_worker_available()
668
669     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
670         return self.policy.acquire_worker(machine_to_avoid)
671
672     def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
673         with self.cv:
674             while not self.is_worker_available():
675                 self.cv.wait()
676             worker = self.acquire_worker(machine_to_avoid)
677             if worker is not None:
678                 return worker
679         msg = "We should never reach this point in the code"
680         logger.critical(msg)
681         raise Exception(msg)
682
683     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
684         worker = bundle.worker
685         assert worker is not None
686         logger.debug(f'Released worker {worker}')
687         self.status.record_release_worker(
688             worker,
689             bundle.uuid,
690             was_cancelled,
691         )
692         with self.cv:
693             worker.count += 1
694             self.cv.notify()
695         self.adjust_task_count(-1)
696
697     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
698         with self.status.lock:
699             if bundle.is_cancelled.wait(timeout=0.0):
700                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
701                 bundle.was_cancelled = True
702                 return True
703         return False
704
705     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
706         """Find a worker for bundle or block until one is available."""
707         self.adjust_task_count(+1)
708         uuid = bundle.uuid
709         hostname = bundle.hostname
710         avoid_machine = override_avoid_machine
711         is_original = bundle.src_bundle is None
712
713         # Try not to schedule a backup on the same host as the original.
714         if avoid_machine is None and bundle.src_bundle is not None:
715             avoid_machine = bundle.src_bundle.machine
716         worker = None
717         while worker is None:
718             worker = self.find_available_worker_or_block(avoid_machine)
719         assert worker is not None
720
721         # Ok, found a worker.
722         bundle.worker = worker
723         machine = bundle.machine = worker.machine
724         username = bundle.username = worker.username
725         self.status.record_acquire_worker(worker, uuid)
726         logger.debug(f'{bundle}: Running bundle on {worker}...')
727
728         # Before we do any work, make sure the bundle is still viable.
729         # It may have been some time between when it was submitted and
730         # now due to lack of worker availability and someone else may
731         # have already finished it.
732         if self.check_if_cancelled(bundle):
733             try:
734                 return self.process_work_result(bundle)
735             except Exception as e:
736                 logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
737                 self.release_worker(bundle)
738                 if is_original:
739                     # Weird.  We are the original owner of this
740                     # bundle.  For it to have been cancelled, a backup
741                     # must have already started and completed before
742                     # we even for started.  Moreover, the backup says
743                     # it is done but we can't find the results it
744                     # should have copied over.  Reschedule the whole
745                     # thing.
746                     logger.exception(e)
747                     logger.error(
748                         f'{bundle}: We are the original owner thread and yet there are '
749                         + 'no results for this bundle.  This is unexpected and bad.'
750                     )
751                     return self.emergency_retry_nasty_bundle(bundle)
752                 else:
753                     # Expected(?).  We're a backup and our bundle is
754                     # cancelled before we even got started.  Something
755                     # went bad in process_work_result (I acutually don't
756                     # see what?) but probably not worth worrying
757                     # about.  Let the original thread worry about
758                     # either finding the results or complaining about
759                     # it.
760                     return None
761
762         # Send input code / data to worker machine if it's not local.
763         if hostname not in machine:
764             try:
765                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
766                 start_ts = time.time()
767                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
768                 run_silently(cmd)
769                 xfer_latency = time.time() - start_ts
770                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
771             except Exception as e:
772                 self.release_worker(bundle)
773                 if is_original:
774                     # Weird.  We tried to copy the code to the worker and it failed...
775                     # And we're the original bundle.  We have to retry.
776                     logger.exception(e)
777                     logger.error(
778                         f"{bundle}: Failed to send instructions to the worker machine?! "
779                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
780                         + "be a race condition.  Attempting an emergency retry..."
781                     )
782                     return self.emergency_retry_nasty_bundle(bundle)
783                 else:
784                     # This is actually expected; we're a backup.
785                     # There's a race condition where someone else
786                     # already finished the work and removed the source
787                     # code file before we could copy it.  No biggie.
788                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
789                     msg += 'We\'re a backup and this may be caused by the original (or some '
790                     msg += 'other backup) already finishing this work.  Ignoring this.'
791                     logger.warning(msg)
792                     return None
793
794         # Kick off the work.  Note that if this fails we let
795         # wait_for_process deal with it.
796         self.status.record_processing_began(uuid)
797         cmd = (
798             f'{SSH} {bundle.username}@{bundle.machine} '
799             f'"source py38-venv/bin/activate &&'
800             f' /home/scott/lib/python_modules/remote_worker.py'
801             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
802         )
803         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
804         p = cmd_in_background(cmd, silent=True)
805         bundle.pid = p.pid
806         logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
807         return self.wait_for_process(p, bundle, 0)
808
809     def wait_for_process(
810         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
811     ) -> Any:
812         machine = bundle.machine
813         assert p is not None
814         pid = p.pid
815         if depth > 3:
816             logger.error(
817                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
818             )
819             p.terminate()
820             self.release_worker(bundle)
821             return self.emergency_retry_nasty_bundle(bundle)
822
823         # Spin until either the ssh job we scheduled finishes the
824         # bundle or some backup worker signals that they finished it
825         # before we could.
826         while True:
827             try:
828                 p.wait(timeout=0.25)
829             except subprocess.TimeoutExpired:
830                 if self.check_if_cancelled(bundle):
831                     logger.info(f'{bundle}: looks like another worker finished bundle...')
832                     break
833             else:
834                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
835                 p = None
836                 break
837
838         # If we get here we believe the bundle is done; either the ssh
839         # subprocess finished (hopefully successfully) or we noticed
840         # that some other worker seems to have completed the bundle
841         # and we're bailing out.
842         try:
843             ret = self.process_work_result(bundle)
844             if ret is not None and p is not None:
845                 p.terminate()
846             return ret
847
848         # Something went wrong; e.g. we could not copy the results
849         # back, cleanup after ourselves on the remote machine, or
850         # unpickle the results we got from the remove machine.  If we
851         # still have an active ssh subprocess, keep waiting on it.
852         # Otherwise, time for an emergency reschedule.
853         except Exception as e:
854             logger.exception(e)
855             logger.error(f'{bundle}: Something unexpected just happened...')
856             if p is not None:
857                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
858                 logger.warning(msg)
859                 return self.wait_for_process(p, bundle, depth + 1)
860             else:
861                 self.release_worker(bundle)
862                 return self.emergency_retry_nasty_bundle(bundle)
863
864     def process_work_result(self, bundle: BundleDetails) -> Any:
865         with self.status.lock:
866             is_original = bundle.src_bundle is None
867             was_cancelled = bundle.was_cancelled
868             username = bundle.username
869             machine = bundle.machine
870             result_file = bundle.result_file
871             code_file = bundle.code_file
872
873             # Whether original or backup, if we finished first we must
874             # fetch the results if the computation happened on a
875             # remote machine.
876             bundle.end_ts = time.time()
877             if not was_cancelled:
878                 assert bundle.machine is not None
879                 if bundle.hostname not in bundle.machine:
880                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
881                     logger.info(
882                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
883                     )
884
885                     # If either of these throw they are handled in
886                     # wait_for_process.
887                     attempts = 0
888                     while True:
889                         try:
890                             run_silently(cmd)
891                         except Exception as e:
892                             attempts += 1
893                             if attempts >= 3:
894                                 raise (e)
895                         else:
896                             break
897
898                     run_silently(
899                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
900                     )
901                     logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
902                 dur = bundle.end_ts - bundle.start_ts
903                 self.histogram.add_item(dur)
904
905         # Only the original worker should unpickle the file contents
906         # though since it's the only one whose result matters.  The
907         # original is also the only job that may delete result_file
908         # from disk.  Note that the original may have been cancelled
909         # if one of the backups finished first; it still must read the
910         # result from disk.
911         if is_original:
912             logger.debug(f"{bundle}: Unpickling {result_file}.")
913             try:
914                 with open(result_file, 'rb') as rb:
915                     serialized = rb.read()
916                 result = cloudpickle.loads(serialized)
917             except Exception as e:
918                 logger.exception(e)
919                 msg = f'Failed to load {result_file}... this is bad news.'
920                 logger.critical(msg)
921                 self.release_worker(bundle)
922
923                 # Re-raise the exception; the code in wait_for_process may
924                 # decide to emergency_retry_nasty_bundle here.
925                 raise Exception(e)
926             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
927             os.remove(f'{result_file}')
928             os.remove(f'{code_file}')
929
930             # Notify any backups that the original is done so they
931             # should stop ASAP.  Do this whether or not we
932             # finished first since there could be more than one
933             # backup.
934             if bundle.backup_bundles is not None:
935                 for backup in bundle.backup_bundles:
936                     logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
937                     backup.is_cancelled.set()
938
939         # This is a backup job and, by now, we have already fetched
940         # the bundle results.
941         else:
942             # Backup results don't matter, they just need to leave the
943             # result file in the right place for their originals to
944             # read/unpickle later.
945             result = None
946
947             # Tell the original to stop if we finished first.
948             if not was_cancelled:
949                 orig_bundle = bundle.src_bundle
950                 assert orig_bundle is not None
951                 logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
952                 orig_bundle.is_cancelled.set()
953         self.release_worker(bundle, was_cancelled=was_cancelled)
954         return result
955
956     def create_original_bundle(self, pickle, fname: str):
957         from string_utils import generate_uuid
958
959         uuid = generate_uuid(omit_dashes=True)
960         code_file = f'/tmp/{uuid}.code.bin'
961         result_file = f'/tmp/{uuid}.result.bin'
962
963         logger.debug(f'Writing pickled code to {code_file}')
964         with open(f'{code_file}', 'wb') as wb:
965             wb.write(pickle)
966
967         bundle = BundleDetails(
968             pickled_code=pickle,
969             uuid=uuid,
970             fname=fname,
971             worker=None,
972             username=None,
973             machine=None,
974             hostname=platform.node(),
975             code_file=code_file,
976             result_file=result_file,
977             pid=0,
978             start_ts=time.time(),
979             end_ts=0.0,
980             slower_than_local_p95=False,
981             slower_than_global_p95=False,
982             src_bundle=None,
983             is_cancelled=threading.Event(),
984             was_cancelled=False,
985             backup_bundles=[],
986             failure_count=0,
987         )
988         self.status.record_bundle_details(bundle)
989         logger.debug(f'{bundle}: Created an original bundle')
990         return bundle
991
992     def create_backup_bundle(self, src_bundle: BundleDetails):
993         assert src_bundle.backup_bundles is not None
994         n = len(src_bundle.backup_bundles)
995         uuid = src_bundle.uuid + f'_backup#{n}'
996
997         backup_bundle = BundleDetails(
998             pickled_code=src_bundle.pickled_code,
999             uuid=uuid,
1000             fname=src_bundle.fname,
1001             worker=None,
1002             username=None,
1003             machine=None,
1004             hostname=src_bundle.hostname,
1005             code_file=src_bundle.code_file,
1006             result_file=src_bundle.result_file,
1007             pid=0,
1008             start_ts=time.time(),
1009             end_ts=0.0,
1010             slower_than_local_p95=False,
1011             slower_than_global_p95=False,
1012             src_bundle=src_bundle,
1013             is_cancelled=threading.Event(),
1014             was_cancelled=False,
1015             backup_bundles=None,  # backup backups not allowed
1016             failure_count=0,
1017         )
1018         src_bundle.backup_bundles.append(backup_bundle)
1019         self.status.record_bundle_details_already_locked(backup_bundle)
1020         logger.debug(f'{backup_bundle}: Created a backup bundle')
1021         return backup_bundle
1022
1023     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1024         assert self.status.lock.locked()
1025         assert src_bundle is not None
1026         backup_bundle = self.create_backup_bundle(src_bundle)
1027         logger.debug(
1028             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1029         )
1030         self._helper_executor.submit(self.launch, backup_bundle)
1031
1032         # Results from backups don't matter; if they finish first
1033         # they will move the result_file to this machine and let
1034         # the original pick them up and unpickle them.
1035
1036     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1037         is_original = bundle.src_bundle is None
1038         bundle.worker = None
1039         avoid_last_machine = bundle.machine
1040         bundle.machine = None
1041         bundle.username = None
1042         bundle.failure_count += 1
1043         if is_original:
1044             retry_limit = 3
1045         else:
1046             retry_limit = 2
1047
1048         if bundle.failure_count > retry_limit:
1049             logger.error(
1050                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1051             )
1052             if is_original:
1053                 raise RemoteExecutorException(
1054                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1055                 )
1056             else:
1057                 logger.error(
1058                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1059                 )
1060             return None
1061         else:
1062             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1063             logger.warning(msg)
1064             warnings.warn(msg)
1065             return self.launch(bundle, avoid_last_machine)
1066
1067     @overrides
1068     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1069         if self.already_shutdown:
1070             raise Exception('Submitted work after shutdown.')
1071         pickle = make_cloud_pickle(function, *args, **kwargs)
1072         bundle = self.create_original_bundle(pickle, function.__name__)
1073         self.total_bundles_submitted += 1
1074         return self._helper_executor.submit(self.launch, bundle)
1075
1076     @overrides
1077     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1078         if not self.already_shutdown:
1079             logging.debug(f'Shutting down RemoteExecutor {self.title}')
1080             self.heartbeat_stop_event.set()
1081             self.heartbeat_thread.join()
1082             self._helper_executor.shutdown(wait)
1083             if not quiet:
1084                 print(self.histogram.__repr__(label_formatter='%ds'))
1085             self.already_shutdown = True
1086
1087
1088 @singleton
1089 class DefaultExecutors(object):
1090     def __init__(self):
1091         self.thread_executor: Optional[ThreadExecutor] = None
1092         self.process_executor: Optional[ProcessExecutor] = None
1093         self.remote_executor: Optional[RemoteExecutor] = None
1094
1095     def ping(self, host) -> bool:
1096         logger.debug(f'RUN> ping -c 1 {host}')
1097         try:
1098             x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1099             return x == 0
1100         except Exception:
1101             return False
1102
1103     def thread_pool(self) -> ThreadExecutor:
1104         if self.thread_executor is None:
1105             self.thread_executor = ThreadExecutor()
1106         return self.thread_executor
1107
1108     def process_pool(self) -> ProcessExecutor:
1109         if self.process_executor is None:
1110             self.process_executor = ProcessExecutor()
1111         return self.process_executor
1112
1113     def remote_pool(self) -> RemoteExecutor:
1114         if self.remote_executor is None:
1115             logger.info('Looking for some helper machines...')
1116             pool: List[RemoteWorkerRecord] = []
1117             if self.ping('cheetah.house'):
1118                 logger.info('Found cheetah.house')
1119                 pool.append(
1120                     RemoteWorkerRecord(
1121                         username='scott',
1122                         machine='cheetah.house',
1123                         weight=30,
1124                         count=6,
1125                     ),
1126                 )
1127             if self.ping('meerkat.cabin'):
1128                 logger.info('Found meerkat.cabin')
1129                 pool.append(
1130                     RemoteWorkerRecord(
1131                         username='scott',
1132                         machine='meerkat.cabin',
1133                         weight=12,
1134                         count=2,
1135                     ),
1136                 )
1137             if self.ping('wannabe.house'):
1138                 logger.info('Found wannabe.house')
1139                 pool.append(
1140                     RemoteWorkerRecord(
1141                         username='scott',
1142                         machine='wannabe.house',
1143                         weight=25,
1144                         count=10,
1145                     ),
1146                 )
1147             if self.ping('puma.cabin'):
1148                 logger.info('Found puma.cabin')
1149                 pool.append(
1150                     RemoteWorkerRecord(
1151                         username='scott',
1152                         machine='puma.cabin',
1153                         weight=30,
1154                         count=6,
1155                     ),
1156                 )
1157             if self.ping('backup.house'):
1158                 logger.info('Found backup.house')
1159                 pool.append(
1160                     RemoteWorkerRecord(
1161                         username='scott',
1162                         machine='backup.house',
1163                         weight=8,
1164                         count=2,
1165                     ),
1166                 )
1167
1168             # The controller machine has a lot to do; go easy on it.
1169             for record in pool:
1170                 if record.machine == platform.node() and record.count > 1:
1171                     logger.info(f'Reducing workload for {record.machine}.')
1172                     record.count = 1
1173
1174             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1175             policy.register_worker_pool(pool)
1176             self.remote_executor = RemoteExecutor(pool, policy)
1177         return self.remote_executor
1178
1179     def shutdown(self) -> None:
1180         if self.thread_executor is not None:
1181             self.thread_executor.shutdown(wait=True, quiet=True)
1182             self.thread_executor = None
1183         if self.process_executor is not None:
1184             self.process_executor.shutdown(wait=True, quiet=True)
1185             self.process_executor = None
1186         if self.remote_executor is not None:
1187             self.remote_executor.shutdown(wait=True, quiet=True)
1188             self.remote_executor = None