Adds a quiet option to executor shutdown.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.histogram = hist.SimpleHistogram(
78             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
79         )
80         self.task_count = 0
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
88         pass
89
90     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
91         """Shutdown the executor and return True if the executor is idle
92         (i.e. there are no pending or active tasks).  Return False
93         otherwise.  Note: this should only be called by the launcher
94         process.
95
96         """
97         if self.task_count == 0:
98             self.shutdown(wait=True, quiet=quiet)
99             return True
100         return False
101
102     def adjust_task_count(self, delta: int) -> None:
103         """Change the task count.  Note: do not call this method from a
104         worker, it should only be called by the launcher process /
105         thread / machine.
106
107         """
108         self.task_count += delta
109         logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
110
111     def get_task_count(self) -> int:
112         """Change the task count.  Note: do not call this method from a
113         worker, it should only be called by the launcher process /
114         thread / machine.
115
116         """
117         return self.task_count
118
119
120 class ThreadExecutor(BaseExecutor):
121     def __init__(self, max_workers: Optional[int] = None):
122         super().__init__()
123         workers = None
124         if max_workers is not None:
125             workers = max_workers
126         elif 'executors_threadpool_size' in config.config:
127             workers = config.config['executors_threadpool_size']
128         logger.debug(f'Creating threadpool executor with {workers} workers')
129         self._thread_pool_executor = fut.ThreadPoolExecutor(
130             max_workers=workers, thread_name_prefix="thread_executor_helper"
131         )
132         self.already_shutdown = False
133
134     # This is run on a different thread; do not adjust task count here.
135     def run_local_bundle(self, fun, *args, **kwargs):
136         logger.debug(f"Running local bundle at {fun.__name__}")
137         result = fun(*args, **kwargs)
138         return result
139
140     @overrides
141     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142         if self.already_shutdown:
143             raise Exception('Submitted work after shutdown.')
144         self.adjust_task_count(+1)
145         newargs = []
146         newargs.append(function)
147         for arg in args:
148             newargs.append(arg)
149         start = time.time()
150         result = self._thread_pool_executor.submit(
151             self.run_local_bundle, *newargs, **kwargs
152         )
153         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
154         result.add_done_callback(lambda _: self.adjust_task_count(-1))
155         return result
156
157     @overrides
158     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
159         if not self.already_shutdown:
160             logger.debug(f'Shutting down threadpool executor {self.title}')
161             self._thread_pool_executor.shutdown(wait)
162             if not quiet:
163                 print(self.histogram.__repr__(label_formatter='%ds'))
164             self.already_shutdown = True
165
166
167 class ProcessExecutor(BaseExecutor):
168     def __init__(self, max_workers=None):
169         super().__init__()
170         workers = None
171         if max_workers is not None:
172             workers = max_workers
173         elif 'executors_processpool_size' in config.config:
174             workers = config.config['executors_processpool_size']
175         logger.debug(f'Creating processpool executor with {workers} workers.')
176         self._process_executor = fut.ProcessPoolExecutor(
177             max_workers=workers,
178         )
179         self.already_shutdown = False
180
181     # This is run in another process; do not adjust task count here.
182     def run_cloud_pickle(self, pickle):
183         fun, args, kwargs = cloudpickle.loads(pickle)
184         logger.debug(f"Running pickled bundle at {fun.__name__}")
185         result = fun(*args, **kwargs)
186         return result
187
188     @overrides
189     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
190         if self.already_shutdown:
191             raise Exception('Submitted work after shutdown.')
192         start = time.time()
193         self.adjust_task_count(+1)
194         pickle = make_cloud_pickle(function, *args, **kwargs)
195         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
196         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
197         result.add_done_callback(lambda _: self.adjust_task_count(-1))
198         return result
199
200     @overrides
201     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
202         if not self.already_shutdown:
203             logger.debug(f'Shutting down processpool executor {self.title}')
204             self._process_executor.shutdown(wait)
205             if not quiet:
206                 print(self.histogram.__repr__(label_formatter='%ds'))
207             self.already_shutdown = True
208
209     def __getstate__(self):
210         state = self.__dict__.copy()
211         state['_process_executor'] = None
212         return state
213
214
215 class RemoteExecutorException(Exception):
216     """Thrown when a bundle cannot be executed despite several retries."""
217
218     pass
219
220
221 @dataclass
222 class RemoteWorkerRecord:
223     username: str
224     machine: str
225     weight: int
226     count: int
227
228     def __hash__(self):
229         return hash((self.username, self.machine))
230
231     def __repr__(self):
232         return f'{self.username}@{self.machine}'
233
234
235 @dataclass
236 class BundleDetails:
237     pickled_code: bytes
238     uuid: str
239     fname: str
240     worker: Optional[RemoteWorkerRecord]
241     username: Optional[str]
242     machine: Optional[str]
243     hostname: str
244     code_file: str
245     result_file: str
246     pid: int
247     start_ts: float
248     end_ts: float
249     slower_than_local_p95: bool
250     slower_than_global_p95: bool
251     src_bundle: BundleDetails
252     is_cancelled: threading.Event
253     was_cancelled: bool
254     backup_bundles: Optional[List[BundleDetails]]
255     failure_count: int
256
257     def __repr__(self):
258         uuid = self.uuid
259         if uuid[-9:-2] == '_backup':
260             uuid = uuid[:-9]
261             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
262         else:
263             suffix = uuid[-6:]
264
265         colorz = [
266             fg('violet red'),
267             fg('red'),
268             fg('orange'),
269             fg('peach orange'),
270             fg('yellow'),
271             fg('marigold yellow'),
272             fg('green yellow'),
273             fg('tea green'),
274             fg('cornflower blue'),
275             fg('turquoise blue'),
276             fg('tropical blue'),
277             fg('lavender purple'),
278             fg('medium purple'),
279         ]
280         c = colorz[int(uuid[-2:], 16) % len(colorz)]
281         fname = self.fname if self.fname is not None else 'nofname'
282         machine = self.machine if self.machine is not None else 'nomachine'
283         return f'{c}{suffix}/{fname}/{machine}{reset()}'
284
285
286 class RemoteExecutorStatus:
287     def __init__(self, total_worker_count: int) -> None:
288         self.worker_count: int = total_worker_count
289         self.known_workers: Set[RemoteWorkerRecord] = set()
290         self.start_time: float = time.time()
291         self.start_per_bundle: Dict[str, float] = defaultdict(float)
292         self.end_per_bundle: Dict[str, float] = defaultdict(float)
293         self.finished_bundle_timings_per_worker: Dict[
294             RemoteWorkerRecord, List[float]
295         ] = {}
296         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
297         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
298         self.finished_bundle_timings: List[float] = []
299         self.last_periodic_dump: Optional[float] = None
300         self.total_bundles_submitted: int = 0
301
302         # Protects reads and modification using self.  Also used
303         # as a memory fence for modifications to bundle.
304         self.lock: threading.Lock = threading.Lock()
305
306     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
307         with self.lock:
308             self.record_acquire_worker_already_locked(worker, uuid)
309
310     def record_acquire_worker_already_locked(
311         self, worker: RemoteWorkerRecord, uuid: str
312     ) -> None:
313         assert self.lock.locked()
314         self.known_workers.add(worker)
315         self.start_per_bundle[uuid] = None
316         x = self.in_flight_bundles_by_worker.get(worker, set())
317         x.add(uuid)
318         self.in_flight_bundles_by_worker[worker] = x
319
320     def record_bundle_details(self, details: BundleDetails) -> None:
321         with self.lock:
322             self.record_bundle_details_already_locked(details)
323
324     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
325         assert self.lock.locked()
326         self.bundle_details_by_uuid[details.uuid] = details
327
328     def record_release_worker(
329         self,
330         worker: RemoteWorkerRecord,
331         uuid: str,
332         was_cancelled: bool,
333     ) -> None:
334         with self.lock:
335             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
336
337     def record_release_worker_already_locked(
338         self,
339         worker: RemoteWorkerRecord,
340         uuid: str,
341         was_cancelled: bool,
342     ) -> None:
343         assert self.lock.locked()
344         ts = time.time()
345         self.end_per_bundle[uuid] = ts
346         self.in_flight_bundles_by_worker[worker].remove(uuid)
347         if not was_cancelled:
348             bundle_latency = ts - self.start_per_bundle[uuid]
349             x = self.finished_bundle_timings_per_worker.get(worker, list())
350             x.append(bundle_latency)
351             self.finished_bundle_timings_per_worker[worker] = x
352             self.finished_bundle_timings.append(bundle_latency)
353
354     def record_processing_began(self, uuid: str):
355         with self.lock:
356             self.start_per_bundle[uuid] = time.time()
357
358     def total_in_flight(self) -> int:
359         assert self.lock.locked()
360         total_in_flight = 0
361         for worker in self.known_workers:
362             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
363         return total_in_flight
364
365     def total_idle(self) -> int:
366         assert self.lock.locked()
367         return self.worker_count - self.total_in_flight()
368
369     def __repr__(self):
370         assert self.lock.locked()
371         ts = time.time()
372         total_finished = len(self.finished_bundle_timings)
373         total_in_flight = self.total_in_flight()
374         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
375         qall = None
376         if len(self.finished_bundle_timings) > 1:
377             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
378             ret += (
379                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
380                 f'✅={total_finished}/{self.total_bundles_submitted}, '
381                 f'💻n={total_in_flight}/{self.worker_count}\n'
382             )
383         else:
384             ret += (
385                 f'⏱={ts-self.start_time:.1f}s, '
386                 f'✅={total_finished}/{self.total_bundles_submitted}, '
387                 f'💻n={total_in_flight}/{self.worker_count}\n'
388             )
389
390         for worker in self.known_workers:
391             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
392             timings = self.finished_bundle_timings_per_worker.get(worker, [])
393             count = len(timings)
394             qworker = None
395             if count > 1:
396                 qworker = numpy.quantile(timings, [0.5, 0.95])
397                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
398             else:
399                 ret += '\n'
400             if count > 0:
401                 ret += f'    ...finished {count} total bundle(s) so far\n'
402             in_flight = len(self.in_flight_bundles_by_worker[worker])
403             if in_flight > 0:
404                 ret += f'    ...{in_flight} bundles currently in flight:\n'
405                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
406                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
407                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
408                     if self.start_per_bundle[bundle_uuid] is not None:
409                         sec = ts - self.start_per_bundle[bundle_uuid]
410                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
411                     else:
412                         ret += f'       {details} setting up / copying data...'
413                         sec = 0.0
414
415                     if qworker is not None:
416                         if sec > qworker[1]:
417                             ret += f'{bg("red")}>💻p95{reset()} '
418                             if details is not None:
419                                 details.slower_than_local_p95 = True
420                         else:
421                             if details is not None:
422                                 details.slower_than_local_p95 = False
423
424                     if qall is not None:
425                         if sec > qall[1]:
426                             ret += f'{bg("red")}>∀p95{reset()} '
427                             if details is not None:
428                                 details.slower_than_global_p95 = True
429                         else:
430                             details.slower_than_global_p95 = False
431                     ret += '\n'
432         return ret
433
434     def periodic_dump(self, total_bundles_submitted: int) -> None:
435         assert self.lock.locked()
436         self.total_bundles_submitted = total_bundles_submitted
437         ts = time.time()
438         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
439             print(self)
440             self.last_periodic_dump = ts
441
442
443 class RemoteWorkerSelectionPolicy(ABC):
444     def register_worker_pool(self, workers):
445         self.workers = workers
446
447     @abstractmethod
448     def is_worker_available(self) -> bool:
449         pass
450
451     @abstractmethod
452     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
453         pass
454
455
456 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
457     @overrides
458     def is_worker_available(self) -> bool:
459         for worker in self.workers:
460             if worker.count > 0:
461                 return True
462         return False
463
464     @overrides
465     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
466         grabbag = []
467         for worker in self.workers:
468             if worker.machine != machine_to_avoid:
469                 if worker.count > 0:
470                     for _ in range(worker.count * worker.weight):
471                         grabbag.append(worker)
472
473         if len(grabbag) == 0:
474             logger.debug(
475                 f'There are no available workers that avoid {machine_to_avoid}...'
476             )
477             for worker in self.workers:
478                 if worker.count > 0:
479                     for _ in range(worker.count * worker.weight):
480                         grabbag.append(worker)
481
482         if len(grabbag) == 0:
483             logger.warning('There are no available workers?!')
484             return None
485
486         worker = random.sample(grabbag, 1)[0]
487         assert worker.count > 0
488         worker.count -= 1
489         logger.debug(f'Chose worker {worker}')
490         return worker
491
492
493 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
494     def __init__(self) -> None:
495         self.index = 0
496
497     @overrides
498     def is_worker_available(self) -> bool:
499         for worker in self.workers:
500             if worker.count > 0:
501                 return True
502         return False
503
504     @overrides
505     def acquire_worker(
506         self, machine_to_avoid: str = None
507     ) -> Optional[RemoteWorkerRecord]:
508         x = self.index
509         while True:
510             worker = self.workers[x]
511             if worker.count > 0:
512                 worker.count -= 1
513                 x += 1
514                 if x >= len(self.workers):
515                     x = 0
516                 self.index = x
517                 logger.debug(f'Selected worker {worker}')
518                 return worker
519             x += 1
520             if x >= len(self.workers):
521                 x = 0
522             if x == self.index:
523                 msg = 'Unexpectedly could not find a worker, retrying...'
524                 logger.warning(msg)
525                 return None
526
527
528 class RemoteExecutor(BaseExecutor):
529     def __init__(
530         self,
531         workers: List[RemoteWorkerRecord],
532         policy: RemoteWorkerSelectionPolicy,
533     ) -> None:
534         super().__init__()
535         self.workers = workers
536         self.policy = policy
537         self.worker_count = 0
538         for worker in self.workers:
539             self.worker_count += worker.count
540         if self.worker_count <= 0:
541             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
542             logger.critical(msg)
543             raise RemoteExecutorException(msg)
544         self.policy.register_worker_pool(self.workers)
545         self.cv = threading.Condition()
546         logger.debug(
547             f'Creating {self.worker_count} local threads, one per remote worker.'
548         )
549         self._helper_executor = fut.ThreadPoolExecutor(
550             thread_name_prefix="remote_executor_helper",
551             max_workers=self.worker_count,
552         )
553         self.status = RemoteExecutorStatus(self.worker_count)
554         self.total_bundles_submitted = 0
555         self.backup_lock = threading.Lock()
556         self.last_backup = None
557         (
558             self.heartbeat_thread,
559             self.heartbeat_stop_event,
560         ) = self.run_periodic_heartbeat()
561         self.already_shutdown = False
562
563     @background_thread
564     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
565         while not stop_event.is_set():
566             time.sleep(5.0)
567             logger.debug('Running periodic heartbeat code...')
568             self.heartbeat()
569         logger.debug('Periodic heartbeat thread shutting down.')
570
571     def heartbeat(self) -> None:
572         # Note: this is invoked on a background thread, not an
573         # executor thread.  Be careful what you do with it b/c it
574         # needs to get back and dump status again periodically.
575         with self.status.lock:
576             self.status.periodic_dump(self.total_bundles_submitted)
577
578             # Look for bundles to reschedule via executor.submit
579             if config.config['executors_schedule_remote_backups']:
580                 self.maybe_schedule_backup_bundles()
581
582     def maybe_schedule_backup_bundles(self):
583         assert self.status.lock.locked()
584         num_done = len(self.status.finished_bundle_timings)
585         num_idle_workers = self.worker_count - self.task_count
586         now = time.time()
587         if (
588             num_done > 2
589             and num_idle_workers > 1
590             and (self.last_backup is None or (now - self.last_backup > 9.0))
591             and self.backup_lock.acquire(blocking=False)
592         ):
593             try:
594                 assert self.backup_lock.locked()
595
596                 bundle_to_backup = None
597                 best_score = None
598                 for (
599                     worker,
600                     bundle_uuids,
601                 ) in self.status.in_flight_bundles_by_worker.items():
602
603                     # Prefer to schedule backups of bundles running on
604                     # slower machines.
605                     base_score = 0
606                     for record in self.workers:
607                         if worker.machine == record.machine:
608                             base_score = float(record.weight)
609                             base_score = 1.0 / base_score
610                             base_score *= 200.0
611                             base_score = int(base_score)
612                             break
613
614                     for uuid in bundle_uuids:
615                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
616                         if (
617                             bundle is not None
618                             and bundle.src_bundle is None
619                             and bundle.backup_bundles is not None
620                         ):
621                             score = base_score
622
623                             # Schedule backups of bundles running
624                             # longer; especially those that are
625                             # unexpectedly slow.
626                             start_ts = self.status.start_per_bundle[uuid]
627                             if start_ts is not None:
628                                 runtime = now - start_ts
629                                 score += runtime
630                                 logger.debug(
631                                     f'score[{bundle}] => {score}  # latency boost'
632                                 )
633
634                                 if bundle.slower_than_local_p95:
635                                     score += runtime / 2
636                                     logger.debug(
637                                         f'score[{bundle}] => {score}  # >worker p95'
638                                     )
639
640                                 if bundle.slower_than_global_p95:
641                                     score += runtime / 4
642                                     logger.debug(
643                                         f'score[{bundle}] => {score}  # >global p95'
644                                     )
645
646                             # Prefer backups of bundles that don't
647                             # have backups already.
648                             backup_count = len(bundle.backup_bundles)
649                             if backup_count == 0:
650                                 score *= 2
651                             elif backup_count == 1:
652                                 score /= 2
653                             elif backup_count == 2:
654                                 score /= 8
655                             else:
656                                 score = 0
657                             logger.debug(
658                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
659                             )
660
661                             if score != 0 and (
662                                 best_score is None or score > best_score
663                             ):
664                                 bundle_to_backup = bundle
665                                 assert bundle is not None
666                                 assert bundle.backup_bundles is not None
667                                 assert bundle.src_bundle is None
668                                 best_score = score
669
670                 # Note: this is all still happening on the heartbeat
671                 # runner thread.  That's ok because
672                 # schedule_backup_for_bundle uses the executor to
673                 # submit the bundle again which will cause it to be
674                 # picked up by a worker thread and allow this thread
675                 # to return to run future heartbeats.
676                 if bundle_to_backup is not None:
677                     self.last_backup = now
678                     logger.info(
679                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
680                     )
681                     self.schedule_backup_for_bundle(bundle_to_backup)
682             finally:
683                 self.backup_lock.release()
684
685     def is_worker_available(self) -> bool:
686         return self.policy.is_worker_available()
687
688     def acquire_worker(
689         self, machine_to_avoid: str = None
690     ) -> Optional[RemoteWorkerRecord]:
691         return self.policy.acquire_worker(machine_to_avoid)
692
693     def find_available_worker_or_block(
694         self, machine_to_avoid: str = None
695     ) -> RemoteWorkerRecord:
696         with self.cv:
697             while not self.is_worker_available():
698                 self.cv.wait()
699             worker = self.acquire_worker(machine_to_avoid)
700             if worker is not None:
701                 return worker
702         msg = "We should never reach this point in the code"
703         logger.critical(msg)
704         raise Exception(msg)
705
706     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
707         worker = bundle.worker
708         assert worker is not None
709         logger.debug(f'Released worker {worker}')
710         self.status.record_release_worker(
711             worker,
712             bundle.uuid,
713             was_cancelled,
714         )
715         with self.cv:
716             worker.count += 1
717             self.cv.notify()
718         self.adjust_task_count(-1)
719
720     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
721         with self.status.lock:
722             if bundle.is_cancelled.wait(timeout=0.0):
723                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
724                 bundle.was_cancelled = True
725                 return True
726         return False
727
728     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
729         """Find a worker for bundle or block until one is available."""
730         self.adjust_task_count(+1)
731         uuid = bundle.uuid
732         hostname = bundle.hostname
733         avoid_machine = override_avoid_machine
734         is_original = bundle.src_bundle is None
735
736         # Try not to schedule a backup on the same host as the original.
737         if avoid_machine is None and bundle.src_bundle is not None:
738             avoid_machine = bundle.src_bundle.machine
739         worker = None
740         while worker is None:
741             worker = self.find_available_worker_or_block(avoid_machine)
742         assert worker
743
744         # Ok, found a worker.
745         bundle.worker = worker
746         machine = bundle.machine = worker.machine
747         username = bundle.username = worker.username
748         self.status.record_acquire_worker(worker, uuid)
749         logger.debug(f'{bundle}: Running bundle on {worker}...')
750
751         # Before we do any work, make sure the bundle is still viable.
752         # It may have been some time between when it was submitted and
753         # now due to lack of worker availability and someone else may
754         # have already finished it.
755         if self.check_if_cancelled(bundle):
756             try:
757                 return self.process_work_result(bundle)
758             except Exception as e:
759                 logger.warning(
760                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
761                 )
762                 self.release_worker(bundle)
763                 if is_original:
764                     # Weird.  We are the original owner of this
765                     # bundle.  For it to have been cancelled, a backup
766                     # must have already started and completed before
767                     # we even for started.  Moreover, the backup says
768                     # it is done but we can't find the results it
769                     # should have copied over.  Reschedule the whole
770                     # thing.
771                     logger.exception(e)
772                     logger.error(
773                         f'{bundle}: We are the original owner thread and yet there are '
774                         + 'no results for this bundle.  This is unexpected and bad.'
775                     )
776                     return self.emergency_retry_nasty_bundle(bundle)
777                 else:
778                     # Expected(?).  We're a backup and our bundle is
779                     # cancelled before we even got started.  Something
780                     # went bad in process_work_result (I acutually don't
781                     # see what?) but probably not worth worrying
782                     # about.  Let the original thread worry about
783                     # either finding the results or complaining about
784                     # it.
785                     return None
786
787         # Send input code / data to worker machine if it's not local.
788         if hostname not in machine:
789             try:
790                 cmd = (
791                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
792                 )
793                 start_ts = time.time()
794                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
795                 run_silently(cmd)
796                 xfer_latency = time.time() - start_ts
797                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
798             except Exception as e:
799                 self.release_worker(bundle)
800                 if is_original:
801                     # Weird.  We tried to copy the code to the worker and it failed...
802                     # And we're the original bundle.  We have to retry.
803                     logger.exception(e)
804                     logger.error(
805                         f"{bundle}: Failed to send instructions to the worker machine?! "
806                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
807                         + "be a race condition.  Attempting an emergency retry..."
808                     )
809                     return self.emergency_retry_nasty_bundle(bundle)
810                 else:
811                     # This is actually expected; we're a backup.
812                     # There's a race condition where someone else
813                     # already finished the work and removed the source
814                     # code file before we could copy it.  No biggie.
815                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
816                     msg += 'We\'re a backup and this may be caused by the original (or some '
817                     msg += 'other backup) already finishing this work.  Ignoring this.'
818                     logger.warning(msg)
819                     return None
820
821         # Kick off the work.  Note that if this fails we let
822         # wait_for_process deal with it.
823         self.status.record_processing_began(uuid)
824         cmd = (
825             f'{SSH} {bundle.username}@{bundle.machine} '
826             f'"source py38-venv/bin/activate &&'
827             f' /home/scott/lib/python_modules/remote_worker.py'
828             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
829         )
830         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
831         p = cmd_in_background(cmd, silent=True)
832         bundle.pid = p.pid
833         logger.debug(
834             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
835         )
836         return self.wait_for_process(p, bundle, 0)
837
838     def wait_for_process(
839         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
840     ) -> Any:
841         machine = bundle.machine
842         pid = p.pid
843         if depth > 3:
844             logger.error(
845                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
846             )
847             p.terminate()
848             self.release_worker(bundle)
849             return self.emergency_retry_nasty_bundle(bundle)
850
851         # Spin until either the ssh job we scheduled finishes the
852         # bundle or some backup worker signals that they finished it
853         # before we could.
854         while True:
855             try:
856                 p.wait(timeout=0.25)
857             except subprocess.TimeoutExpired:
858                 if self.check_if_cancelled(bundle):
859                     logger.info(
860                         f'{bundle}: looks like another worker finished bundle...'
861                     )
862                     break
863             else:
864                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
865                 p = None
866                 break
867
868         # If we get here we believe the bundle is done; either the ssh
869         # subprocess finished (hopefully successfully) or we noticed
870         # that some other worker seems to have completed the bundle
871         # and we're bailing out.
872         try:
873             ret = self.process_work_result(bundle)
874             if ret is not None and p is not None:
875                 p.terminate()
876             return ret
877
878         # Something went wrong; e.g. we could not copy the results
879         # back, cleanup after ourselves on the remote machine, or
880         # unpickle the results we got from the remove machine.  If we
881         # still have an active ssh subprocess, keep waiting on it.
882         # Otherwise, time for an emergency reschedule.
883         except Exception as e:
884             logger.exception(e)
885             logger.error(f'{bundle}: Something unexpected just happened...')
886             if p is not None:
887                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
888                 logger.warning(msg)
889                 return self.wait_for_process(p, bundle, depth + 1)
890             else:
891                 self.release_worker(bundle)
892                 return self.emergency_retry_nasty_bundle(bundle)
893
894     def process_work_result(self, bundle: BundleDetails) -> Any:
895         with self.status.lock:
896             is_original = bundle.src_bundle is None
897             was_cancelled = bundle.was_cancelled
898             username = bundle.username
899             machine = bundle.machine
900             result_file = bundle.result_file
901             code_file = bundle.code_file
902
903             # Whether original or backup, if we finished first we must
904             # fetch the results if the computation happened on a
905             # remote machine.
906             bundle.end_ts = time.time()
907             if not was_cancelled:
908                 assert bundle.machine is not None
909                 if bundle.hostname not in bundle.machine:
910                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
911                     logger.info(
912                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
913                     )
914
915                     # If either of these throw they are handled in
916                     # wait_for_process.
917                     attempts = 0
918                     while True:
919                         try:
920                             run_silently(cmd)
921                         except Exception as e:
922                             attempts += 1
923                             if attempts >= 3:
924                                 raise (e)
925                         else:
926                             break
927
928                     run_silently(
929                         f'{SSH} {username}@{machine}'
930                         f' "/bin/rm -f {code_file} {result_file}"'
931                     )
932                     logger.debug(
933                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
934                     )
935                 dur = bundle.end_ts - bundle.start_ts
936                 self.histogram.add_item(dur)
937
938         # Only the original worker should unpickle the file contents
939         # though since it's the only one whose result matters.  The
940         # original is also the only job that may delete result_file
941         # from disk.  Note that the original may have been cancelled
942         # if one of the backups finished first; it still must read the
943         # result from disk.
944         if is_original:
945             logger.debug(f"{bundle}: Unpickling {result_file}.")
946             try:
947                 with open(result_file, 'rb') as rb:
948                     serialized = rb.read()
949                 result = cloudpickle.loads(serialized)
950             except Exception as e:
951                 logger.exception(e)
952                 msg = f'Failed to load {result_file}... this is bad news.'
953                 logger.critical(msg)
954                 self.release_worker(bundle)
955
956                 # Re-raise the exception; the code in wait_for_process may
957                 # decide to emergency_retry_nasty_bundle here.
958                 raise Exception(e)
959             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
960             os.remove(f'{result_file}')
961             os.remove(f'{code_file}')
962
963             # Notify any backups that the original is done so they
964             # should stop ASAP.  Do this whether or not we
965             # finished first since there could be more than one
966             # backup.
967             if bundle.backup_bundles is not None:
968                 for backup in bundle.backup_bundles:
969                     logger.debug(
970                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
971                     )
972                     backup.is_cancelled.set()
973
974         # This is a backup job and, by now, we have already fetched
975         # the bundle results.
976         else:
977             # Backup results don't matter, they just need to leave the
978             # result file in the right place for their originals to
979             # read/unpickle later.
980             result = None
981
982             # Tell the original to stop if we finished first.
983             if not was_cancelled:
984                 logger.debug(
985                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
986                 )
987                 bundle.src_bundle.is_cancelled.set()
988         self.release_worker(bundle, was_cancelled=was_cancelled)
989         return result
990
991     def create_original_bundle(self, pickle, fname: str):
992         from string_utils import generate_uuid
993
994         uuid = generate_uuid(omit_dashes=True)
995         code_file = f'/tmp/{uuid}.code.bin'
996         result_file = f'/tmp/{uuid}.result.bin'
997
998         logger.debug(f'Writing pickled code to {code_file}')
999         with open(f'{code_file}', 'wb') as wb:
1000             wb.write(pickle)
1001
1002         bundle = BundleDetails(
1003             pickled_code=pickle,
1004             uuid=uuid,
1005             fname=fname,
1006             worker=None,
1007             username=None,
1008             machine=None,
1009             hostname=platform.node(),
1010             code_file=code_file,
1011             result_file=result_file,
1012             pid=0,
1013             start_ts=time.time(),
1014             end_ts=0.0,
1015             slower_than_local_p95=False,
1016             slower_than_global_p95=False,
1017             src_bundle=None,
1018             is_cancelled=threading.Event(),
1019             was_cancelled=False,
1020             backup_bundles=[],
1021             failure_count=0,
1022         )
1023         self.status.record_bundle_details(bundle)
1024         logger.debug(f'{bundle}: Created an original bundle')
1025         return bundle
1026
1027     def create_backup_bundle(self, src_bundle: BundleDetails):
1028         assert src_bundle.backup_bundles is not None
1029         n = len(src_bundle.backup_bundles)
1030         uuid = src_bundle.uuid + f'_backup#{n}'
1031
1032         backup_bundle = BundleDetails(
1033             pickled_code=src_bundle.pickled_code,
1034             uuid=uuid,
1035             fname=src_bundle.fname,
1036             worker=None,
1037             username=None,
1038             machine=None,
1039             hostname=src_bundle.hostname,
1040             code_file=src_bundle.code_file,
1041             result_file=src_bundle.result_file,
1042             pid=0,
1043             start_ts=time.time(),
1044             end_ts=0.0,
1045             slower_than_local_p95=False,
1046             slower_than_global_p95=False,
1047             src_bundle=src_bundle,
1048             is_cancelled=threading.Event(),
1049             was_cancelled=False,
1050             backup_bundles=None,  # backup backups not allowed
1051             failure_count=0,
1052         )
1053         src_bundle.backup_bundles.append(backup_bundle)
1054         self.status.record_bundle_details_already_locked(backup_bundle)
1055         logger.debug(f'{backup_bundle}: Created a backup bundle')
1056         return backup_bundle
1057
1058     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1059         assert self.status.lock.locked()
1060         assert src_bundle is not None
1061         backup_bundle = self.create_backup_bundle(src_bundle)
1062         logger.debug(
1063             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1064         )
1065         self._helper_executor.submit(self.launch, backup_bundle)
1066
1067         # Results from backups don't matter; if they finish first
1068         # they will move the result_file to this machine and let
1069         # the original pick them up and unpickle them.
1070
1071     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1072         is_original = bundle.src_bundle is None
1073         bundle.worker = None
1074         avoid_last_machine = bundle.machine
1075         bundle.machine = None
1076         bundle.username = None
1077         bundle.failure_count += 1
1078         if is_original:
1079             retry_limit = 3
1080         else:
1081             retry_limit = 2
1082
1083         if bundle.failure_count > retry_limit:
1084             logger.error(
1085                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1086             )
1087             if is_original:
1088                 raise RemoteExecutorException(
1089                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1090                 )
1091             else:
1092                 logger.error(
1093                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1094                 )
1095             return None
1096         else:
1097             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1098             logger.warning(msg)
1099             warnings.warn(msg)
1100             return self.launch(bundle, avoid_last_machine)
1101
1102     @overrides
1103     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1104         if self.already_shutdown:
1105             raise Exception('Submitted work after shutdown.')
1106         pickle = make_cloud_pickle(function, *args, **kwargs)
1107         bundle = self.create_original_bundle(pickle, function.__name__)
1108         self.total_bundles_submitted += 1
1109         return self._helper_executor.submit(self.launch, bundle)
1110
1111     @overrides
1112     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1113         if not self.already_shutdown:
1114             logging.debug(f'Shutting down RemoteExecutor {self.title}')
1115             self.heartbeat_stop_event.set()
1116             self.heartbeat_thread.join()
1117             self._helper_executor.shutdown(wait)
1118             if not quiet:
1119                 print(self.histogram.__repr__(label_formatter='%ds'))
1120             self.already_shutdown = True
1121
1122
1123 @singleton
1124 class DefaultExecutors(object):
1125     def __init__(self):
1126         self.thread_executor: Optional[ThreadExecutor] = None
1127         self.process_executor: Optional[ProcessExecutor] = None
1128         self.remote_executor: Optional[RemoteExecutor] = None
1129
1130     def ping(self, host) -> bool:
1131         logger.debug(f'RUN> ping -c 1 {host}')
1132         try:
1133             x = cmd_with_timeout(
1134                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1135             )
1136             return x == 0
1137         except Exception:
1138             return False
1139
1140     def thread_pool(self) -> ThreadExecutor:
1141         if self.thread_executor is None:
1142             self.thread_executor = ThreadExecutor()
1143         return self.thread_executor
1144
1145     def process_pool(self) -> ProcessExecutor:
1146         if self.process_executor is None:
1147             self.process_executor = ProcessExecutor()
1148         return self.process_executor
1149
1150     def remote_pool(self) -> RemoteExecutor:
1151         if self.remote_executor is None:
1152             logger.info('Looking for some helper machines...')
1153             pool: List[RemoteWorkerRecord] = []
1154             if self.ping('cheetah.house'):
1155                 logger.info('Found cheetah.house')
1156                 pool.append(
1157                     RemoteWorkerRecord(
1158                         username='scott',
1159                         machine='cheetah.house',
1160                         weight=30,
1161                         count=6,
1162                     ),
1163                 )
1164             if self.ping('meerkat.cabin'):
1165                 logger.info('Found meerkat.cabin')
1166                 pool.append(
1167                     RemoteWorkerRecord(
1168                         username='scott',
1169                         machine='meerkat.cabin',
1170                         weight=12,
1171                         count=2,
1172                     ),
1173                 )
1174             if self.ping('wannabe.house'):
1175                 logger.info('Found wannabe.house')
1176                 pool.append(
1177                     RemoteWorkerRecord(
1178                         username='scott',
1179                         machine='wannabe.house',
1180                         weight=25,
1181                         count=10,
1182                     ),
1183                 )
1184             if self.ping('puma.cabin'):
1185                 logger.info('Found puma.cabin')
1186                 pool.append(
1187                     RemoteWorkerRecord(
1188                         username='scott',
1189                         machine='puma.cabin',
1190                         weight=30,
1191                         count=6,
1192                     ),
1193                 )
1194             if self.ping('backup.house'):
1195                 logger.info('Found backup.house')
1196                 pool.append(
1197                     RemoteWorkerRecord(
1198                         username='scott',
1199                         machine='backup.house',
1200                         weight=8,
1201                         count=2,
1202                     ),
1203                 )
1204
1205             # The controller machine has a lot to do; go easy on it.
1206             for record in pool:
1207                 if record.machine == platform.node() and record.count > 1:
1208                     logger.info(f'Reducing workload for {record.machine}.')
1209                     record.count = 1
1210
1211             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1212             policy.register_worker_pool(pool)
1213             self.remote_executor = RemoteExecutor(pool, policy)
1214         return self.remote_executor
1215
1216     def shutdown(self) -> None:
1217         if self.thread_executor is not None:
1218             self.thread_executor.shutdown(wait=True, quiet=True)
1219             self.thread_executor = None
1220         if self.process_executor is not None:
1221             self.process_executor.shutdown(wait=True, quiet=True)
1222             self.process_executor = None
1223         if self.remote_executor is not None:
1224             self.remote_executor.shutdown(wait=True, quiet=True)
1225             self.remote_executor = None