mypy clean!
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.histogram = hist.SimpleHistogram(
78             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
79         )
80         self.task_count = 0
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
88         pass
89
90     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
91         """Shutdown the executor and return True if the executor is idle
92         (i.e. there are no pending or active tasks).  Return False
93         otherwise.  Note: this should only be called by the launcher
94         process.
95
96         """
97         if self.task_count == 0:
98             self.shutdown(wait=True, quiet=quiet)
99             return True
100         return False
101
102     def adjust_task_count(self, delta: int) -> None:
103         """Change the task count.  Note: do not call this method from a
104         worker, it should only be called by the launcher process /
105         thread / machine.
106
107         """
108         self.task_count += delta
109         logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
110
111     def get_task_count(self) -> int:
112         """Change the task count.  Note: do not call this method from a
113         worker, it should only be called by the launcher process /
114         thread / machine.
115
116         """
117         return self.task_count
118
119
120 class ThreadExecutor(BaseExecutor):
121     def __init__(self, max_workers: Optional[int] = None):
122         super().__init__()
123         workers = None
124         if max_workers is not None:
125             workers = max_workers
126         elif 'executors_threadpool_size' in config.config:
127             workers = config.config['executors_threadpool_size']
128         logger.debug(f'Creating threadpool executor with {workers} workers')
129         self._thread_pool_executor = fut.ThreadPoolExecutor(
130             max_workers=workers, thread_name_prefix="thread_executor_helper"
131         )
132         self.already_shutdown = False
133
134     # This is run on a different thread; do not adjust task count here.
135     def run_local_bundle(self, fun, *args, **kwargs):
136         logger.debug(f"Running local bundle at {fun.__name__}")
137         result = fun(*args, **kwargs)
138         return result
139
140     @overrides
141     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142         if self.already_shutdown:
143             raise Exception('Submitted work after shutdown.')
144         self.adjust_task_count(+1)
145         newargs = []
146         newargs.append(function)
147         for arg in args:
148             newargs.append(arg)
149         start = time.time()
150         result = self._thread_pool_executor.submit(
151             self.run_local_bundle, *newargs, **kwargs
152         )
153         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
154         result.add_done_callback(lambda _: self.adjust_task_count(-1))
155         return result
156
157     @overrides
158     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
159         if not self.already_shutdown:
160             logger.debug(f'Shutting down threadpool executor {self.title}')
161             self._thread_pool_executor.shutdown(wait)
162             if not quiet:
163                 print(self.histogram.__repr__(label_formatter='%ds'))
164             self.already_shutdown = True
165
166
167 class ProcessExecutor(BaseExecutor):
168     def __init__(self, max_workers=None):
169         super().__init__()
170         workers = None
171         if max_workers is not None:
172             workers = max_workers
173         elif 'executors_processpool_size' in config.config:
174             workers = config.config['executors_processpool_size']
175         logger.debug(f'Creating processpool executor with {workers} workers.')
176         self._process_executor = fut.ProcessPoolExecutor(
177             max_workers=workers,
178         )
179         self.already_shutdown = False
180
181     # This is run in another process; do not adjust task count here.
182     def run_cloud_pickle(self, pickle):
183         fun, args, kwargs = cloudpickle.loads(pickle)
184         logger.debug(f"Running pickled bundle at {fun.__name__}")
185         result = fun(*args, **kwargs)
186         return result
187
188     @overrides
189     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
190         if self.already_shutdown:
191             raise Exception('Submitted work after shutdown.')
192         start = time.time()
193         self.adjust_task_count(+1)
194         pickle = make_cloud_pickle(function, *args, **kwargs)
195         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
196         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
197         result.add_done_callback(lambda _: self.adjust_task_count(-1))
198         return result
199
200     @overrides
201     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
202         if not self.already_shutdown:
203             logger.debug(f'Shutting down processpool executor {self.title}')
204             self._process_executor.shutdown(wait)
205             if not quiet:
206                 print(self.histogram.__repr__(label_formatter='%ds'))
207             self.already_shutdown = True
208
209     def __getstate__(self):
210         state = self.__dict__.copy()
211         state['_process_executor'] = None
212         return state
213
214
215 class RemoteExecutorException(Exception):
216     """Thrown when a bundle cannot be executed despite several retries."""
217
218     pass
219
220
221 @dataclass
222 class RemoteWorkerRecord:
223     username: str
224     machine: str
225     weight: int
226     count: int
227
228     def __hash__(self):
229         return hash((self.username, self.machine))
230
231     def __repr__(self):
232         return f'{self.username}@{self.machine}'
233
234
235 @dataclass
236 class BundleDetails:
237     pickled_code: bytes
238     uuid: str
239     fname: str
240     worker: Optional[RemoteWorkerRecord]
241     username: Optional[str]
242     machine: Optional[str]
243     hostname: str
244     code_file: str
245     result_file: str
246     pid: int
247     start_ts: float
248     end_ts: float
249     slower_than_local_p95: bool
250     slower_than_global_p95: bool
251     src_bundle: Optional[BundleDetails]
252     is_cancelled: threading.Event
253     was_cancelled: bool
254     backup_bundles: Optional[List[BundleDetails]]
255     failure_count: int
256
257     def __repr__(self):
258         uuid = self.uuid
259         if uuid[-9:-2] == '_backup':
260             uuid = uuid[:-9]
261             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
262         else:
263             suffix = uuid[-6:]
264
265         colorz = [
266             fg('violet red'),
267             fg('red'),
268             fg('orange'),
269             fg('peach orange'),
270             fg('yellow'),
271             fg('marigold yellow'),
272             fg('green yellow'),
273             fg('tea green'),
274             fg('cornflower blue'),
275             fg('turquoise blue'),
276             fg('tropical blue'),
277             fg('lavender purple'),
278             fg('medium purple'),
279         ]
280         c = colorz[int(uuid[-2:], 16) % len(colorz)]
281         fname = self.fname if self.fname is not None else 'nofname'
282         machine = self.machine if self.machine is not None else 'nomachine'
283         return f'{c}{suffix}/{fname}/{machine}{reset()}'
284
285
286 class RemoteExecutorStatus:
287     def __init__(self, total_worker_count: int) -> None:
288         self.worker_count: int = total_worker_count
289         self.known_workers: Set[RemoteWorkerRecord] = set()
290         self.start_time: float = time.time()
291         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
292         self.end_per_bundle: Dict[str, float] = defaultdict(float)
293         self.finished_bundle_timings_per_worker: Dict[
294             RemoteWorkerRecord, List[float]
295         ] = {}
296         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
297         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
298         self.finished_bundle_timings: List[float] = []
299         self.last_periodic_dump: Optional[float] = None
300         self.total_bundles_submitted: int = 0
301
302         # Protects reads and modification using self.  Also used
303         # as a memory fence for modifications to bundle.
304         self.lock: threading.Lock = threading.Lock()
305
306     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
307         with self.lock:
308             self.record_acquire_worker_already_locked(worker, uuid)
309
310     def record_acquire_worker_already_locked(
311         self, worker: RemoteWorkerRecord, uuid: str
312     ) -> None:
313         assert self.lock.locked()
314         self.known_workers.add(worker)
315         self.start_per_bundle[uuid] = None
316         x = self.in_flight_bundles_by_worker.get(worker, set())
317         x.add(uuid)
318         self.in_flight_bundles_by_worker[worker] = x
319
320     def record_bundle_details(self, details: BundleDetails) -> None:
321         with self.lock:
322             self.record_bundle_details_already_locked(details)
323
324     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
325         assert self.lock.locked()
326         self.bundle_details_by_uuid[details.uuid] = details
327
328     def record_release_worker(
329         self,
330         worker: RemoteWorkerRecord,
331         uuid: str,
332         was_cancelled: bool,
333     ) -> None:
334         with self.lock:
335             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
336
337     def record_release_worker_already_locked(
338         self,
339         worker: RemoteWorkerRecord,
340         uuid: str,
341         was_cancelled: bool,
342     ) -> None:
343         assert self.lock.locked()
344         ts = time.time()
345         self.end_per_bundle[uuid] = ts
346         self.in_flight_bundles_by_worker[worker].remove(uuid)
347         if not was_cancelled:
348             start = self.start_per_bundle[uuid]
349             assert start
350             bundle_latency = ts - start
351             x = self.finished_bundle_timings_per_worker.get(worker, list())
352             x.append(bundle_latency)
353             self.finished_bundle_timings_per_worker[worker] = x
354             self.finished_bundle_timings.append(bundle_latency)
355
356     def record_processing_began(self, uuid: str):
357         with self.lock:
358             self.start_per_bundle[uuid] = time.time()
359
360     def total_in_flight(self) -> int:
361         assert self.lock.locked()
362         total_in_flight = 0
363         for worker in self.known_workers:
364             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
365         return total_in_flight
366
367     def total_idle(self) -> int:
368         assert self.lock.locked()
369         return self.worker_count - self.total_in_flight()
370
371     def __repr__(self):
372         assert self.lock.locked()
373         ts = time.time()
374         total_finished = len(self.finished_bundle_timings)
375         total_in_flight = self.total_in_flight()
376         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
377         qall = None
378         if len(self.finished_bundle_timings) > 1:
379             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
380             ret += (
381                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
382                 f'✅={total_finished}/{self.total_bundles_submitted}, '
383                 f'💻n={total_in_flight}/{self.worker_count}\n'
384             )
385         else:
386             ret += (
387                 f'⏱={ts-self.start_time:.1f}s, '
388                 f'✅={total_finished}/{self.total_bundles_submitted}, '
389                 f'💻n={total_in_flight}/{self.worker_count}\n'
390             )
391
392         for worker in self.known_workers:
393             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
394             timings = self.finished_bundle_timings_per_worker.get(worker, [])
395             count = len(timings)
396             qworker = None
397             if count > 1:
398                 qworker = numpy.quantile(timings, [0.5, 0.95])
399                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
400             else:
401                 ret += '\n'
402             if count > 0:
403                 ret += f'    ...finished {count} total bundle(s) so far\n'
404             in_flight = len(self.in_flight_bundles_by_worker[worker])
405             if in_flight > 0:
406                 ret += f'    ...{in_flight} bundles currently in flight:\n'
407                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
408                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
409                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
410                     if self.start_per_bundle[bundle_uuid] is not None:
411                         sec = ts - self.start_per_bundle[bundle_uuid]
412                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
413                     else:
414                         ret += f'       {details} setting up / copying data...'
415                         sec = 0.0
416
417                     if qworker is not None:
418                         if sec > qworker[1]:
419                             ret += f'{bg("red")}>💻p95{reset()} '
420                             if details is not None:
421                                 details.slower_than_local_p95 = True
422                         else:
423                             if details is not None:
424                                 details.slower_than_local_p95 = False
425
426                     if qall is not None:
427                         if sec > qall[1]:
428                             ret += f'{bg("red")}>∀p95{reset()} '
429                             if details is not None:
430                                 details.slower_than_global_p95 = True
431                         else:
432                             details.slower_than_global_p95 = False
433                     ret += '\n'
434         return ret
435
436     def periodic_dump(self, total_bundles_submitted: int) -> None:
437         assert self.lock.locked()
438         self.total_bundles_submitted = total_bundles_submitted
439         ts = time.time()
440         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
441             print(self)
442             self.last_periodic_dump = ts
443
444
445 class RemoteWorkerSelectionPolicy(ABC):
446     def register_worker_pool(self, workers):
447         self.workers = workers
448
449     @abstractmethod
450     def is_worker_available(self) -> bool:
451         pass
452
453     @abstractmethod
454     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
455         pass
456
457
458 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
459     @overrides
460     def is_worker_available(self) -> bool:
461         for worker in self.workers:
462             if worker.count > 0:
463                 return True
464         return False
465
466     @overrides
467     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
468         grabbag = []
469         for worker in self.workers:
470             if worker.machine != machine_to_avoid:
471                 if worker.count > 0:
472                     for _ in range(worker.count * worker.weight):
473                         grabbag.append(worker)
474
475         if len(grabbag) == 0:
476             logger.debug(
477                 f'There are no available workers that avoid {machine_to_avoid}...'
478             )
479             for worker in self.workers:
480                 if worker.count > 0:
481                     for _ in range(worker.count * worker.weight):
482                         grabbag.append(worker)
483
484         if len(grabbag) == 0:
485             logger.warning('There are no available workers?!')
486             return None
487
488         worker = random.sample(grabbag, 1)[0]
489         assert worker.count > 0
490         worker.count -= 1
491         logger.debug(f'Chose worker {worker}')
492         return worker
493
494
495 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
496     def __init__(self) -> None:
497         self.index = 0
498
499     @overrides
500     def is_worker_available(self) -> bool:
501         for worker in self.workers:
502             if worker.count > 0:
503                 return True
504         return False
505
506     @overrides
507     def acquire_worker(
508         self, machine_to_avoid: str = None
509     ) -> Optional[RemoteWorkerRecord]:
510         x = self.index
511         while True:
512             worker = self.workers[x]
513             if worker.count > 0:
514                 worker.count -= 1
515                 x += 1
516                 if x >= len(self.workers):
517                     x = 0
518                 self.index = x
519                 logger.debug(f'Selected worker {worker}')
520                 return worker
521             x += 1
522             if x >= len(self.workers):
523                 x = 0
524             if x == self.index:
525                 msg = 'Unexpectedly could not find a worker, retrying...'
526                 logger.warning(msg)
527                 return None
528
529
530 class RemoteExecutor(BaseExecutor):
531     def __init__(
532         self,
533         workers: List[RemoteWorkerRecord],
534         policy: RemoteWorkerSelectionPolicy,
535     ) -> None:
536         super().__init__()
537         self.workers = workers
538         self.policy = policy
539         self.worker_count = 0
540         for worker in self.workers:
541             self.worker_count += worker.count
542         if self.worker_count <= 0:
543             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
544             logger.critical(msg)
545             raise RemoteExecutorException(msg)
546         self.policy.register_worker_pool(self.workers)
547         self.cv = threading.Condition()
548         logger.debug(
549             f'Creating {self.worker_count} local threads, one per remote worker.'
550         )
551         self._helper_executor = fut.ThreadPoolExecutor(
552             thread_name_prefix="remote_executor_helper",
553             max_workers=self.worker_count,
554         )
555         self.status = RemoteExecutorStatus(self.worker_count)
556         self.total_bundles_submitted = 0
557         self.backup_lock = threading.Lock()
558         self.last_backup = None
559         (
560             self.heartbeat_thread,
561             self.heartbeat_stop_event,
562         ) = self.run_periodic_heartbeat()
563         self.already_shutdown = False
564
565     @background_thread
566     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
567         while not stop_event.is_set():
568             time.sleep(5.0)
569             logger.debug('Running periodic heartbeat code...')
570             self.heartbeat()
571         logger.debug('Periodic heartbeat thread shutting down.')
572
573     def heartbeat(self) -> None:
574         # Note: this is invoked on a background thread, not an
575         # executor thread.  Be careful what you do with it b/c it
576         # needs to get back and dump status again periodically.
577         with self.status.lock:
578             self.status.periodic_dump(self.total_bundles_submitted)
579
580             # Look for bundles to reschedule via executor.submit
581             if config.config['executors_schedule_remote_backups']:
582                 self.maybe_schedule_backup_bundles()
583
584     def maybe_schedule_backup_bundles(self):
585         assert self.status.lock.locked()
586         num_done = len(self.status.finished_bundle_timings)
587         num_idle_workers = self.worker_count - self.task_count
588         now = time.time()
589         if (
590             num_done > 2
591             and num_idle_workers > 1
592             and (self.last_backup is None or (now - self.last_backup > 9.0))
593             and self.backup_lock.acquire(blocking=False)
594         ):
595             try:
596                 assert self.backup_lock.locked()
597
598                 bundle_to_backup = None
599                 best_score = None
600                 for (
601                     worker,
602                     bundle_uuids,
603                 ) in self.status.in_flight_bundles_by_worker.items():
604
605                     # Prefer to schedule backups of bundles running on
606                     # slower machines.
607                     base_score = 0
608                     for record in self.workers:
609                         if worker.machine == record.machine:
610                             base_score = float(record.weight)
611                             base_score = 1.0 / base_score
612                             base_score *= 200.0
613                             base_score = int(base_score)
614                             break
615
616                     for uuid in bundle_uuids:
617                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
618                         if (
619                             bundle is not None
620                             and bundle.src_bundle is None
621                             and bundle.backup_bundles is not None
622                         ):
623                             score = base_score
624
625                             # Schedule backups of bundles running
626                             # longer; especially those that are
627                             # unexpectedly slow.
628                             start_ts = self.status.start_per_bundle[uuid]
629                             if start_ts is not None:
630                                 runtime = now - start_ts
631                                 score += runtime
632                                 logger.debug(
633                                     f'score[{bundle}] => {score}  # latency boost'
634                                 )
635
636                                 if bundle.slower_than_local_p95:
637                                     score += runtime / 2
638                                     logger.debug(
639                                         f'score[{bundle}] => {score}  # >worker p95'
640                                     )
641
642                                 if bundle.slower_than_global_p95:
643                                     score += runtime / 4
644                                     logger.debug(
645                                         f'score[{bundle}] => {score}  # >global p95'
646                                     )
647
648                             # Prefer backups of bundles that don't
649                             # have backups already.
650                             backup_count = len(bundle.backup_bundles)
651                             if backup_count == 0:
652                                 score *= 2
653                             elif backup_count == 1:
654                                 score /= 2
655                             elif backup_count == 2:
656                                 score /= 8
657                             else:
658                                 score = 0
659                             logger.debug(
660                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
661                             )
662
663                             if score != 0 and (
664                                 best_score is None or score > best_score
665                             ):
666                                 bundle_to_backup = bundle
667                                 assert bundle is not None
668                                 assert bundle.backup_bundles is not None
669                                 assert bundle.src_bundle is None
670                                 best_score = score
671
672                 # Note: this is all still happening on the heartbeat
673                 # runner thread.  That's ok because
674                 # schedule_backup_for_bundle uses the executor to
675                 # submit the bundle again which will cause it to be
676                 # picked up by a worker thread and allow this thread
677                 # to return to run future heartbeats.
678                 if bundle_to_backup is not None:
679                     self.last_backup = now
680                     logger.info(
681                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
682                     )
683                     self.schedule_backup_for_bundle(bundle_to_backup)
684             finally:
685                 self.backup_lock.release()
686
687     def is_worker_available(self) -> bool:
688         return self.policy.is_worker_available()
689
690     def acquire_worker(
691         self, machine_to_avoid: str = None
692     ) -> Optional[RemoteWorkerRecord]:
693         return self.policy.acquire_worker(machine_to_avoid)
694
695     def find_available_worker_or_block(
696         self, machine_to_avoid: str = None
697     ) -> RemoteWorkerRecord:
698         with self.cv:
699             while not self.is_worker_available():
700                 self.cv.wait()
701             worker = self.acquire_worker(machine_to_avoid)
702             if worker is not None:
703                 return worker
704         msg = "We should never reach this point in the code"
705         logger.critical(msg)
706         raise Exception(msg)
707
708     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
709         worker = bundle.worker
710         assert worker is not None
711         logger.debug(f'Released worker {worker}')
712         self.status.record_release_worker(
713             worker,
714             bundle.uuid,
715             was_cancelled,
716         )
717         with self.cv:
718             worker.count += 1
719             self.cv.notify()
720         self.adjust_task_count(-1)
721
722     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
723         with self.status.lock:
724             if bundle.is_cancelled.wait(timeout=0.0):
725                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
726                 bundle.was_cancelled = True
727                 return True
728         return False
729
730     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
731         """Find a worker for bundle or block until one is available."""
732         self.adjust_task_count(+1)
733         uuid = bundle.uuid
734         hostname = bundle.hostname
735         avoid_machine = override_avoid_machine
736         is_original = bundle.src_bundle is None
737
738         # Try not to schedule a backup on the same host as the original.
739         if avoid_machine is None and bundle.src_bundle is not None:
740             avoid_machine = bundle.src_bundle.machine
741         worker = None
742         while worker is None:
743             worker = self.find_available_worker_or_block(avoid_machine)
744         assert worker
745
746         # Ok, found a worker.
747         bundle.worker = worker
748         machine = bundle.machine = worker.machine
749         username = bundle.username = worker.username
750         self.status.record_acquire_worker(worker, uuid)
751         logger.debug(f'{bundle}: Running bundle on {worker}...')
752
753         # Before we do any work, make sure the bundle is still viable.
754         # It may have been some time between when it was submitted and
755         # now due to lack of worker availability and someone else may
756         # have already finished it.
757         if self.check_if_cancelled(bundle):
758             try:
759                 return self.process_work_result(bundle)
760             except Exception as e:
761                 logger.warning(
762                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
763                 )
764                 self.release_worker(bundle)
765                 if is_original:
766                     # Weird.  We are the original owner of this
767                     # bundle.  For it to have been cancelled, a backup
768                     # must have already started and completed before
769                     # we even for started.  Moreover, the backup says
770                     # it is done but we can't find the results it
771                     # should have copied over.  Reschedule the whole
772                     # thing.
773                     logger.exception(e)
774                     logger.error(
775                         f'{bundle}: We are the original owner thread and yet there are '
776                         + 'no results for this bundle.  This is unexpected and bad.'
777                     )
778                     return self.emergency_retry_nasty_bundle(bundle)
779                 else:
780                     # Expected(?).  We're a backup and our bundle is
781                     # cancelled before we even got started.  Something
782                     # went bad in process_work_result (I acutually don't
783                     # see what?) but probably not worth worrying
784                     # about.  Let the original thread worry about
785                     # either finding the results or complaining about
786                     # it.
787                     return None
788
789         # Send input code / data to worker machine if it's not local.
790         if hostname not in machine:
791             try:
792                 cmd = (
793                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
794                 )
795                 start_ts = time.time()
796                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
797                 run_silently(cmd)
798                 xfer_latency = time.time() - start_ts
799                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
800             except Exception as e:
801                 self.release_worker(bundle)
802                 if is_original:
803                     # Weird.  We tried to copy the code to the worker and it failed...
804                     # And we're the original bundle.  We have to retry.
805                     logger.exception(e)
806                     logger.error(
807                         f"{bundle}: Failed to send instructions to the worker machine?! "
808                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
809                         + "be a race condition.  Attempting an emergency retry..."
810                     )
811                     return self.emergency_retry_nasty_bundle(bundle)
812                 else:
813                     # This is actually expected; we're a backup.
814                     # There's a race condition where someone else
815                     # already finished the work and removed the source
816                     # code file before we could copy it.  No biggie.
817                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
818                     msg += 'We\'re a backup and this may be caused by the original (or some '
819                     msg += 'other backup) already finishing this work.  Ignoring this.'
820                     logger.warning(msg)
821                     return None
822
823         # Kick off the work.  Note that if this fails we let
824         # wait_for_process deal with it.
825         self.status.record_processing_began(uuid)
826         cmd = (
827             f'{SSH} {bundle.username}@{bundle.machine} '
828             f'"source py38-venv/bin/activate &&'
829             f' /home/scott/lib/python_modules/remote_worker.py'
830             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
831         )
832         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
833         p = cmd_in_background(cmd, silent=True)
834         bundle.pid = p.pid
835         logger.debug(
836             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
837         )
838         return self.wait_for_process(p, bundle, 0)
839
840     def wait_for_process(
841         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
842     ) -> Any:
843         machine = bundle.machine
844         assert p
845         pid = p.pid
846         if depth > 3:
847             logger.error(
848                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
849             )
850             p.terminate()
851             self.release_worker(bundle)
852             return self.emergency_retry_nasty_bundle(bundle)
853
854         # Spin until either the ssh job we scheduled finishes the
855         # bundle or some backup worker signals that they finished it
856         # before we could.
857         while True:
858             try:
859                 p.wait(timeout=0.25)
860             except subprocess.TimeoutExpired:
861                 if self.check_if_cancelled(bundle):
862                     logger.info(
863                         f'{bundle}: looks like another worker finished bundle...'
864                     )
865                     break
866             else:
867                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
868                 p = None
869                 break
870
871         # If we get here we believe the bundle is done; either the ssh
872         # subprocess finished (hopefully successfully) or we noticed
873         # that some other worker seems to have completed the bundle
874         # and we're bailing out.
875         try:
876             ret = self.process_work_result(bundle)
877             if ret is not None and p is not None:
878                 p.terminate()
879             return ret
880
881         # Something went wrong; e.g. we could not copy the results
882         # back, cleanup after ourselves on the remote machine, or
883         # unpickle the results we got from the remove machine.  If we
884         # still have an active ssh subprocess, keep waiting on it.
885         # Otherwise, time for an emergency reschedule.
886         except Exception as e:
887             logger.exception(e)
888             logger.error(f'{bundle}: Something unexpected just happened...')
889             if p is not None:
890                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
891                 logger.warning(msg)
892                 return self.wait_for_process(p, bundle, depth + 1)
893             else:
894                 self.release_worker(bundle)
895                 return self.emergency_retry_nasty_bundle(bundle)
896
897     def process_work_result(self, bundle: BundleDetails) -> Any:
898         with self.status.lock:
899             is_original = bundle.src_bundle is None
900             was_cancelled = bundle.was_cancelled
901             username = bundle.username
902             machine = bundle.machine
903             result_file = bundle.result_file
904             code_file = bundle.code_file
905
906             # Whether original or backup, if we finished first we must
907             # fetch the results if the computation happened on a
908             # remote machine.
909             bundle.end_ts = time.time()
910             if not was_cancelled:
911                 assert bundle.machine is not None
912                 if bundle.hostname not in bundle.machine:
913                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
914                     logger.info(
915                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
916                     )
917
918                     # If either of these throw they are handled in
919                     # wait_for_process.
920                     attempts = 0
921                     while True:
922                         try:
923                             run_silently(cmd)
924                         except Exception as e:
925                             attempts += 1
926                             if attempts >= 3:
927                                 raise (e)
928                         else:
929                             break
930
931                     run_silently(
932                         f'{SSH} {username}@{machine}'
933                         f' "/bin/rm -f {code_file} {result_file}"'
934                     )
935                     logger.debug(
936                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
937                     )
938                 dur = bundle.end_ts - bundle.start_ts
939                 self.histogram.add_item(dur)
940
941         # Only the original worker should unpickle the file contents
942         # though since it's the only one whose result matters.  The
943         # original is also the only job that may delete result_file
944         # from disk.  Note that the original may have been cancelled
945         # if one of the backups finished first; it still must read the
946         # result from disk.
947         if is_original:
948             logger.debug(f"{bundle}: Unpickling {result_file}.")
949             try:
950                 with open(result_file, 'rb') as rb:
951                     serialized = rb.read()
952                 result = cloudpickle.loads(serialized)
953             except Exception as e:
954                 logger.exception(e)
955                 msg = f'Failed to load {result_file}... this is bad news.'
956                 logger.critical(msg)
957                 self.release_worker(bundle)
958
959                 # Re-raise the exception; the code in wait_for_process may
960                 # decide to emergency_retry_nasty_bundle here.
961                 raise Exception(e)
962             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
963             os.remove(f'{result_file}')
964             os.remove(f'{code_file}')
965
966             # Notify any backups that the original is done so they
967             # should stop ASAP.  Do this whether or not we
968             # finished first since there could be more than one
969             # backup.
970             if bundle.backup_bundles is not None:
971                 for backup in bundle.backup_bundles:
972                     logger.debug(
973                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
974                     )
975                     backup.is_cancelled.set()
976
977         # This is a backup job and, by now, we have already fetched
978         # the bundle results.
979         else:
980             # Backup results don't matter, they just need to leave the
981             # result file in the right place for their originals to
982             # read/unpickle later.
983             result = None
984
985             # Tell the original to stop if we finished first.
986             if not was_cancelled:
987                 orig_bundle = bundle.src_bundle
988                 assert orig_bundle
989                 logger.debug(
990                     f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
991                 )
992                 orig_bundle.is_cancelled.set()
993         self.release_worker(bundle, was_cancelled=was_cancelled)
994         return result
995
996     def create_original_bundle(self, pickle, fname: str):
997         from string_utils import generate_uuid
998
999         uuid = generate_uuid(omit_dashes=True)
1000         code_file = f'/tmp/{uuid}.code.bin'
1001         result_file = f'/tmp/{uuid}.result.bin'
1002
1003         logger.debug(f'Writing pickled code to {code_file}')
1004         with open(f'{code_file}', 'wb') as wb:
1005             wb.write(pickle)
1006
1007         bundle = BundleDetails(
1008             pickled_code=pickle,
1009             uuid=uuid,
1010             fname=fname,
1011             worker=None,
1012             username=None,
1013             machine=None,
1014             hostname=platform.node(),
1015             code_file=code_file,
1016             result_file=result_file,
1017             pid=0,
1018             start_ts=time.time(),
1019             end_ts=0.0,
1020             slower_than_local_p95=False,
1021             slower_than_global_p95=False,
1022             src_bundle=None,
1023             is_cancelled=threading.Event(),
1024             was_cancelled=False,
1025             backup_bundles=[],
1026             failure_count=0,
1027         )
1028         self.status.record_bundle_details(bundle)
1029         logger.debug(f'{bundle}: Created an original bundle')
1030         return bundle
1031
1032     def create_backup_bundle(self, src_bundle: BundleDetails):
1033         assert src_bundle.backup_bundles is not None
1034         n = len(src_bundle.backup_bundles)
1035         uuid = src_bundle.uuid + f'_backup#{n}'
1036
1037         backup_bundle = BundleDetails(
1038             pickled_code=src_bundle.pickled_code,
1039             uuid=uuid,
1040             fname=src_bundle.fname,
1041             worker=None,
1042             username=None,
1043             machine=None,
1044             hostname=src_bundle.hostname,
1045             code_file=src_bundle.code_file,
1046             result_file=src_bundle.result_file,
1047             pid=0,
1048             start_ts=time.time(),
1049             end_ts=0.0,
1050             slower_than_local_p95=False,
1051             slower_than_global_p95=False,
1052             src_bundle=src_bundle,
1053             is_cancelled=threading.Event(),
1054             was_cancelled=False,
1055             backup_bundles=None,  # backup backups not allowed
1056             failure_count=0,
1057         )
1058         src_bundle.backup_bundles.append(backup_bundle)
1059         self.status.record_bundle_details_already_locked(backup_bundle)
1060         logger.debug(f'{backup_bundle}: Created a backup bundle')
1061         return backup_bundle
1062
1063     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1064         assert self.status.lock.locked()
1065         assert src_bundle is not None
1066         backup_bundle = self.create_backup_bundle(src_bundle)
1067         logger.debug(
1068             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1069         )
1070         self._helper_executor.submit(self.launch, backup_bundle)
1071
1072         # Results from backups don't matter; if they finish first
1073         # they will move the result_file to this machine and let
1074         # the original pick them up and unpickle them.
1075
1076     def emergency_retry_nasty_bundle(
1077         self, bundle: BundleDetails
1078     ) -> Optional[fut.Future]:
1079         is_original = bundle.src_bundle is None
1080         bundle.worker = None
1081         avoid_last_machine = bundle.machine
1082         bundle.machine = None
1083         bundle.username = None
1084         bundle.failure_count += 1
1085         if is_original:
1086             retry_limit = 3
1087         else:
1088             retry_limit = 2
1089
1090         if bundle.failure_count > retry_limit:
1091             logger.error(
1092                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1093             )
1094             if is_original:
1095                 raise RemoteExecutorException(
1096                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1097                 )
1098             else:
1099                 logger.error(
1100                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1101                 )
1102             return None
1103         else:
1104             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1105             logger.warning(msg)
1106             warnings.warn(msg)
1107             return self.launch(bundle, avoid_last_machine)
1108
1109     @overrides
1110     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1111         if self.already_shutdown:
1112             raise Exception('Submitted work after shutdown.')
1113         pickle = make_cloud_pickle(function, *args, **kwargs)
1114         bundle = self.create_original_bundle(pickle, function.__name__)
1115         self.total_bundles_submitted += 1
1116         return self._helper_executor.submit(self.launch, bundle)
1117
1118     @overrides
1119     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1120         if not self.already_shutdown:
1121             logging.debug(f'Shutting down RemoteExecutor {self.title}')
1122             self.heartbeat_stop_event.set()
1123             self.heartbeat_thread.join()
1124             self._helper_executor.shutdown(wait)
1125             if not quiet:
1126                 print(self.histogram.__repr__(label_formatter='%ds'))
1127             self.already_shutdown = True
1128
1129
1130 @singleton
1131 class DefaultExecutors(object):
1132     def __init__(self):
1133         self.thread_executor: Optional[ThreadExecutor] = None
1134         self.process_executor: Optional[ProcessExecutor] = None
1135         self.remote_executor: Optional[RemoteExecutor] = None
1136
1137     def ping(self, host) -> bool:
1138         logger.debug(f'RUN> ping -c 1 {host}')
1139         try:
1140             x = cmd_with_timeout(
1141                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1142             )
1143             return x == 0
1144         except Exception:
1145             return False
1146
1147     def thread_pool(self) -> ThreadExecutor:
1148         if self.thread_executor is None:
1149             self.thread_executor = ThreadExecutor()
1150         return self.thread_executor
1151
1152     def process_pool(self) -> ProcessExecutor:
1153         if self.process_executor is None:
1154             self.process_executor = ProcessExecutor()
1155         return self.process_executor
1156
1157     def remote_pool(self) -> RemoteExecutor:
1158         if self.remote_executor is None:
1159             logger.info('Looking for some helper machines...')
1160             pool: List[RemoteWorkerRecord] = []
1161             if self.ping('cheetah.house'):
1162                 logger.info('Found cheetah.house')
1163                 pool.append(
1164                     RemoteWorkerRecord(
1165                         username='scott',
1166                         machine='cheetah.house',
1167                         weight=30,
1168                         count=6,
1169                     ),
1170                 )
1171             if self.ping('meerkat.cabin'):
1172                 logger.info('Found meerkat.cabin')
1173                 pool.append(
1174                     RemoteWorkerRecord(
1175                         username='scott',
1176                         machine='meerkat.cabin',
1177                         weight=12,
1178                         count=2,
1179                     ),
1180                 )
1181             if self.ping('wannabe.house'):
1182                 logger.info('Found wannabe.house')
1183                 pool.append(
1184                     RemoteWorkerRecord(
1185                         username='scott',
1186                         machine='wannabe.house',
1187                         weight=25,
1188                         count=10,
1189                     ),
1190                 )
1191             if self.ping('puma.cabin'):
1192                 logger.info('Found puma.cabin')
1193                 pool.append(
1194                     RemoteWorkerRecord(
1195                         username='scott',
1196                         machine='puma.cabin',
1197                         weight=30,
1198                         count=6,
1199                     ),
1200                 )
1201             if self.ping('backup.house'):
1202                 logger.info('Found backup.house')
1203                 pool.append(
1204                     RemoteWorkerRecord(
1205                         username='scott',
1206                         machine='backup.house',
1207                         weight=8,
1208                         count=2,
1209                     ),
1210                 )
1211
1212             # The controller machine has a lot to do; go easy on it.
1213             for record in pool:
1214                 if record.machine == platform.node() and record.count > 1:
1215                     logger.info(f'Reducing workload for {record.machine}.')
1216                     record.count = 1
1217
1218             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1219             policy.register_worker_pool(pool)
1220             self.remote_executor = RemoteExecutor(pool, policy)
1221         return self.remote_executor
1222
1223     def shutdown(self) -> None:
1224         if self.thread_executor is not None:
1225             self.thread_executor.shutdown(wait=True, quiet=True)
1226             self.thread_executor = None
1227         if self.process_executor is not None:
1228             self.process_executor.shutdown(wait=True, quiet=True)
1229             self.process_executor = None
1230         if self.remote_executor is not None:
1231             self.remote_executor.shutdown(wait=True, quiet=True)
1232             self.remote_executor = None