Make the worker selection heuristics work harder to avoid backup
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.task_count = 0
78         self.histogram = hist.SimpleHistogram(
79             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
80         )
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, wait: bool = True) -> None:
88         pass
89
90     def adjust_task_count(self, delta: int) -> None:
91         self.task_count += delta
92         logger.debug(f'Executor current task count is {self.task_count}')
93
94
95 class ThreadExecutor(BaseExecutor):
96     def __init__(self, max_workers: Optional[int] = None):
97         super().__init__()
98         workers = None
99         if max_workers is not None:
100             workers = max_workers
101         elif 'executors_threadpool_size' in config.config:
102             workers = config.config['executors_threadpool_size']
103         logger.debug(f'Creating threadpool executor with {workers} workers')
104         self._thread_pool_executor = fut.ThreadPoolExecutor(
105             max_workers=workers, thread_name_prefix="thread_executor_helper"
106         )
107
108     def run_local_bundle(self, fun, *args, **kwargs):
109         logger.debug(f"Running local bundle at {fun.__name__}")
110         start = time.time()
111         result = fun(*args, **kwargs)
112         end = time.time()
113         self.adjust_task_count(-1)
114         duration = end - start
115         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
116         self.histogram.add_item(duration)
117         return result
118
119     @overrides
120     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
121         self.adjust_task_count(+1)
122         newargs = []
123         newargs.append(function)
124         for arg in args:
125             newargs.append(arg)
126         return self._thread_pool_executor.submit(
127             self.run_local_bundle, *newargs, **kwargs
128         )
129
130     @overrides
131     def shutdown(self, wait=True) -> None:
132         logger.debug(f'Shutting down threadpool executor {self.title}')
133         print(self.histogram)
134         self._thread_pool_executor.shutdown(wait)
135
136
137 class ProcessExecutor(BaseExecutor):
138     def __init__(self, max_workers=None):
139         super().__init__()
140         workers = None
141         if max_workers is not None:
142             workers = max_workers
143         elif 'executors_processpool_size' in config.config:
144             workers = config.config['executors_processpool_size']
145         logger.debug(f'Creating processpool executor with {workers} workers.')
146         self._process_executor = fut.ProcessPoolExecutor(
147             max_workers=workers,
148         )
149
150     def run_cloud_pickle(self, pickle):
151         fun, args, kwargs = cloudpickle.loads(pickle)
152         logger.debug(f"Running pickled bundle at {fun.__name__}")
153         result = fun(*args, **kwargs)
154         self.adjust_task_count(-1)
155         return result
156
157     @overrides
158     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
159         start = time.time()
160         self.adjust_task_count(+1)
161         pickle = make_cloud_pickle(function, *args, **kwargs)
162         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
163         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
164         return result
165
166     @overrides
167     def shutdown(self, wait=True) -> None:
168         logger.debug(f'Shutting down processpool executor {self.title}')
169         self._process_executor.shutdown(wait)
170         print(self.histogram)
171
172     def __getstate__(self):
173         state = self.__dict__.copy()
174         state['_process_executor'] = None
175         return state
176
177
178 class RemoteExecutorException(Exception):
179     """Thrown when a bundle cannot be executed despite several retries."""
180
181     pass
182
183
184 @dataclass
185 class RemoteWorkerRecord:
186     username: str
187     machine: str
188     weight: int
189     count: int
190
191     def __hash__(self):
192         return hash((self.username, self.machine))
193
194     def __repr__(self):
195         return f'{self.username}@{self.machine}'
196
197
198 @dataclass
199 class BundleDetails:
200     pickled_code: bytes
201     uuid: str
202     fname: str
203     worker: Optional[RemoteWorkerRecord]
204     username: Optional[str]
205     machine: Optional[str]
206     hostname: str
207     code_file: str
208     result_file: str
209     pid: int
210     start_ts: float
211     end_ts: float
212     slower_than_local_p95: bool
213     slower_than_global_p95: bool
214     src_bundle: BundleDetails
215     is_cancelled: threading.Event
216     was_cancelled: bool
217     backup_bundles: Optional[List[BundleDetails]]
218     failure_count: int
219
220     def __repr__(self):
221         uuid = self.uuid
222         if uuid[-9:-2] == '_backup':
223             uuid = uuid[:-9]
224             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
225         else:
226             suffix = uuid[-6:]
227
228         colorz = [
229             fg('violet red'),
230             fg('red'),
231             fg('orange'),
232             fg('peach orange'),
233             fg('yellow'),
234             fg('marigold yellow'),
235             fg('green yellow'),
236             fg('tea green'),
237             fg('cornflower blue'),
238             fg('turquoise blue'),
239             fg('tropical blue'),
240             fg('lavender purple'),
241             fg('medium purple'),
242         ]
243         c = colorz[int(uuid[-2:], 16) % len(colorz)]
244         fname = self.fname if self.fname is not None else 'nofname'
245         machine = self.machine if self.machine is not None else 'nomachine'
246         return f'{c}{suffix}/{fname}/{machine}{reset()}'
247
248
249 class RemoteExecutorStatus:
250     def __init__(self, total_worker_count: int) -> None:
251         self.worker_count: int = total_worker_count
252         self.known_workers: Set[RemoteWorkerRecord] = set()
253         self.start_time: float = time.time()
254         self.start_per_bundle: Dict[str, float] = defaultdict(float)
255         self.end_per_bundle: Dict[str, float] = defaultdict(float)
256         self.finished_bundle_timings_per_worker: Dict[
257             RemoteWorkerRecord, List[float]
258         ] = {}
259         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
260         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
261         self.finished_bundle_timings: List[float] = []
262         self.last_periodic_dump: Optional[float] = None
263         self.total_bundles_submitted: int = 0
264
265         # Protects reads and modification using self.  Also used
266         # as a memory fence for modifications to bundle.
267         self.lock: threading.Lock = threading.Lock()
268
269     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
270         with self.lock:
271             self.record_acquire_worker_already_locked(worker, uuid)
272
273     def record_acquire_worker_already_locked(
274         self, worker: RemoteWorkerRecord, uuid: str
275     ) -> None:
276         assert self.lock.locked()
277         self.known_workers.add(worker)
278         self.start_per_bundle[uuid] = None
279         x = self.in_flight_bundles_by_worker.get(worker, set())
280         x.add(uuid)
281         self.in_flight_bundles_by_worker[worker] = x
282
283     def record_bundle_details(self, details: BundleDetails) -> None:
284         with self.lock:
285             self.record_bundle_details_already_locked(details)
286
287     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
288         assert self.lock.locked()
289         self.bundle_details_by_uuid[details.uuid] = details
290
291     def record_release_worker(
292         self,
293         worker: RemoteWorkerRecord,
294         uuid: str,
295         was_cancelled: bool,
296     ) -> None:
297         with self.lock:
298             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
299
300     def record_release_worker_already_locked(
301         self,
302         worker: RemoteWorkerRecord,
303         uuid: str,
304         was_cancelled: bool,
305     ) -> None:
306         assert self.lock.locked()
307         ts = time.time()
308         self.end_per_bundle[uuid] = ts
309         self.in_flight_bundles_by_worker[worker].remove(uuid)
310         if not was_cancelled:
311             bundle_latency = ts - self.start_per_bundle[uuid]
312             x = self.finished_bundle_timings_per_worker.get(worker, list())
313             x.append(bundle_latency)
314             self.finished_bundle_timings_per_worker[worker] = x
315             self.finished_bundle_timings.append(bundle_latency)
316
317     def record_processing_began(self, uuid: str):
318         with self.lock:
319             self.start_per_bundle[uuid] = time.time()
320
321     def total_in_flight(self) -> int:
322         assert self.lock.locked()
323         total_in_flight = 0
324         for worker in self.known_workers:
325             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
326         return total_in_flight
327
328     def total_idle(self) -> int:
329         assert self.lock.locked()
330         return self.worker_count - self.total_in_flight()
331
332     def __repr__(self):
333         assert self.lock.locked()
334         ts = time.time()
335         total_finished = len(self.finished_bundle_timings)
336         total_in_flight = self.total_in_flight()
337         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
338         qall = None
339         if len(self.finished_bundle_timings) > 1:
340             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
341             ret += (
342                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
343                 f'✅={total_finished}/{self.total_bundles_submitted}, '
344                 f'💻n={total_in_flight}/{self.worker_count}\n'
345             )
346         else:
347             ret += (
348                 f'⏱={ts-self.start_time:.1f}s, '
349                 f'✅={total_finished}/{self.total_bundles_submitted}, '
350                 f'💻n={total_in_flight}/{self.worker_count}\n'
351             )
352
353         for worker in self.known_workers:
354             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
355             timings = self.finished_bundle_timings_per_worker.get(worker, [])
356             count = len(timings)
357             qworker = None
358             if count > 1:
359                 qworker = numpy.quantile(timings, [0.5, 0.95])
360                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
361             else:
362                 ret += '\n'
363             if count > 0:
364                 ret += f'    ...finished {count} total bundle(s) so far\n'
365             in_flight = len(self.in_flight_bundles_by_worker[worker])
366             if in_flight > 0:
367                 ret += f'    ...{in_flight} bundles currently in flight:\n'
368                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
369                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
370                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
371                     if self.start_per_bundle[bundle_uuid] is not None:
372                         sec = ts - self.start_per_bundle[bundle_uuid]
373                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
374                     else:
375                         ret += f'       {details} setting up / copying data...'
376                         sec = 0.0
377
378                     if qworker is not None:
379                         if sec > qworker[1]:
380                             ret += f'{bg("red")}>💻p95{reset()} '
381                             if details is not None:
382                                 details.slower_than_local_p95 = True
383                         else:
384                             if details is not None:
385                                 details.slower_than_local_p95 = False
386
387                     if qall is not None:
388                         if sec > qall[1]:
389                             ret += f'{bg("red")}>∀p95{reset()} '
390                             if details is not None:
391                                 details.slower_than_global_p95 = True
392                         else:
393                             details.slower_than_global_p95 = False
394                     ret += '\n'
395         return ret
396
397     def periodic_dump(self, total_bundles_submitted: int) -> None:
398         assert self.lock.locked()
399         self.total_bundles_submitted = total_bundles_submitted
400         ts = time.time()
401         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
402             print(self)
403             self.last_periodic_dump = ts
404
405
406 class RemoteWorkerSelectionPolicy(ABC):
407     def register_worker_pool(self, workers):
408         self.workers = workers
409
410     @abstractmethod
411     def is_worker_available(self) -> bool:
412         pass
413
414     @abstractmethod
415     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
416         pass
417
418
419 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
420     @overrides
421     def is_worker_available(self) -> bool:
422         for worker in self.workers:
423             if worker.count > 0:
424                 return True
425         return False
426
427     @overrides
428     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
429         grabbag = []
430         for worker in self.workers:
431             if worker.machine != machine_to_avoid:
432                 if worker.count > 0:
433                     for _ in range(worker.count * worker.weight):
434                         grabbag.append(worker)
435
436         if len(grabbag) == 0:
437             logger.debug(
438                 f'There are no available workers that avoid {machine_to_avoid}...'
439             )
440             for worker in self.workers:
441                 if worker.count > 0:
442                     for _ in range(worker.count * worker.weight):
443                         grabbag.append(worker)
444
445         if len(grabbag) == 0:
446             logger.warning('There are no available workers?!')
447             return None
448
449         worker = random.sample(grabbag, 1)[0]
450         assert worker.count > 0
451         worker.count -= 1
452         logger.debug(f'Chose worker {worker}')
453         return worker
454
455
456 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
457     def __init__(self) -> None:
458         self.index = 0
459
460     @overrides
461     def is_worker_available(self) -> bool:
462         for worker in self.workers:
463             if worker.count > 0:
464                 return True
465         return False
466
467     @overrides
468     def acquire_worker(
469         self, machine_to_avoid: str = None
470     ) -> Optional[RemoteWorkerRecord]:
471         x = self.index
472         while True:
473             worker = self.workers[x]
474             if worker.count > 0:
475                 worker.count -= 1
476                 x += 1
477                 if x >= len(self.workers):
478                     x = 0
479                 self.index = x
480                 logger.debug(f'Selected worker {worker}')
481                 return worker
482             x += 1
483             if x >= len(self.workers):
484                 x = 0
485             if x == self.index:
486                 msg = 'Unexpectedly could not find a worker, retrying...'
487                 logger.warning(msg)
488                 return None
489
490
491 class RemoteExecutor(BaseExecutor):
492     def __init__(
493         self,
494         workers: List[RemoteWorkerRecord],
495         policy: RemoteWorkerSelectionPolicy,
496     ) -> None:
497         super().__init__()
498         self.workers = workers
499         self.policy = policy
500         self.worker_count = 0
501         for worker in self.workers:
502             self.worker_count += worker.count
503         if self.worker_count <= 0:
504             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
505             logger.critical(msg)
506             raise RemoteExecutorException(msg)
507         self.policy.register_worker_pool(self.workers)
508         self.cv = threading.Condition()
509         logger.debug(
510             f'Creating {self.worker_count} local threads, one per remote worker.'
511         )
512         self._helper_executor = fut.ThreadPoolExecutor(
513             thread_name_prefix="remote_executor_helper",
514             max_workers=self.worker_count,
515         )
516         self.status = RemoteExecutorStatus(self.worker_count)
517         self.total_bundles_submitted = 0
518         self.backup_lock = threading.Lock()
519         self.last_backup = None
520         (
521             self.heartbeat_thread,
522             self.heartbeat_stop_event,
523         ) = self.run_periodic_heartbeat()
524
525     @background_thread
526     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
527         while not stop_event.is_set():
528             time.sleep(5.0)
529             logger.debug('Running periodic heartbeat code...')
530             self.heartbeat()
531         logger.debug('Periodic heartbeat thread shutting down.')
532
533     def heartbeat(self) -> None:
534         # Note: this is invoked on a background thread, not an
535         # executor thread.  Be careful what you do with it b/c it
536         # needs to get back and dump status again periodically.
537         with self.status.lock:
538             self.status.periodic_dump(self.total_bundles_submitted)
539
540             # Look for bundles to reschedule via executor.submit
541             if config.config['executors_schedule_remote_backups']:
542                 self.maybe_schedule_backup_bundles()
543
544     def maybe_schedule_backup_bundles(self):
545         assert self.status.lock.locked()
546         num_done = len(self.status.finished_bundle_timings)
547         num_idle_workers = self.worker_count - self.task_count
548         now = time.time()
549         if (
550             num_done > 2
551             and num_idle_workers > 1
552             and (self.last_backup is None or (now - self.last_backup > 9.0))
553             and self.backup_lock.acquire(blocking=False)
554         ):
555             try:
556                 assert self.backup_lock.locked()
557
558                 bundle_to_backup = None
559                 best_score = None
560                 for (
561                     worker,
562                     bundle_uuids,
563                 ) in self.status.in_flight_bundles_by_worker.items():
564
565                     # Prefer to schedule backups of bundles running on
566                     # slower machines.
567                     base_score = 0
568                     for record in self.workers:
569                         if worker.machine == record.machine:
570                             base_score = float(record.weight)
571                             base_score = 1.0 / base_score
572                             base_score *= 200.0
573                             base_score = int(base_score)
574                             break
575
576                     for uuid in bundle_uuids:
577                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
578                         if (
579                             bundle is not None
580                             and bundle.src_bundle is None
581                             and bundle.backup_bundles is not None
582                         ):
583                             score = base_score
584
585                             # Schedule backups of bundles running
586                             # longer; especially those that are
587                             # unexpectedly slow.
588                             start_ts = self.status.start_per_bundle[uuid]
589                             if start_ts is not None:
590                                 runtime = now - start_ts
591                                 score += runtime
592                                 logger.debug(
593                                     f'score[{bundle}] => {score}  # latency boost'
594                                 )
595
596                                 if bundle.slower_than_local_p95:
597                                     score += runtime / 2
598                                     logger.debug(
599                                         f'score[{bundle}] => {score}  # >worker p95'
600                                     )
601
602                                 if bundle.slower_than_global_p95:
603                                     score += runtime / 4
604                                     logger.debug(
605                                         f'score[{bundle}] => {score}  # >global p95'
606                                     )
607
608                             # Prefer backups of bundles that don't
609                             # have backups already.
610                             backup_count = len(bundle.backup_bundles)
611                             if backup_count == 0:
612                                 score *= 2
613                             elif backup_count == 1:
614                                 score /= 2
615                             elif backup_count == 2:
616                                 score /= 8
617                             else:
618                                 score = 0
619                             logger.debug(
620                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
621                             )
622
623                             if score != 0 and (
624                                 best_score is None or score > best_score
625                             ):
626                                 bundle_to_backup = bundle
627                                 assert bundle is not None
628                                 assert bundle.backup_bundles is not None
629                                 assert bundle.src_bundle is None
630                                 best_score = score
631
632                 # Note: this is all still happening on the heartbeat
633                 # runner thread.  That's ok because
634                 # schedule_backup_for_bundle uses the executor to
635                 # submit the bundle again which will cause it to be
636                 # picked up by a worker thread and allow this thread
637                 # to return to run future heartbeats.
638                 if bundle_to_backup is not None:
639                     self.last_backup = now
640                     logger.info(
641                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
642                     )
643                     self.schedule_backup_for_bundle(bundle_to_backup)
644             finally:
645                 self.backup_lock.release()
646
647     def is_worker_available(self) -> bool:
648         return self.policy.is_worker_available()
649
650     def acquire_worker(
651         self, machine_to_avoid: str = None
652     ) -> Optional[RemoteWorkerRecord]:
653         return self.policy.acquire_worker(machine_to_avoid)
654
655     def find_available_worker_or_block(
656         self, machine_to_avoid: str = None
657     ) -> RemoteWorkerRecord:
658         with self.cv:
659             while not self.is_worker_available():
660                 self.cv.wait()
661             worker = self.acquire_worker(machine_to_avoid)
662             if worker is not None:
663                 return worker
664         msg = "We should never reach this point in the code"
665         logger.critical(msg)
666         raise Exception(msg)
667
668     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
669         worker = bundle.worker
670         assert worker is not None
671         logger.debug(f'Released worker {worker}')
672         self.status.record_release_worker(
673             worker,
674             bundle.uuid,
675             was_cancelled,
676         )
677         with self.cv:
678             worker.count += 1
679             self.cv.notify()
680         self.adjust_task_count(-1)
681
682     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
683         with self.status.lock:
684             if bundle.is_cancelled.wait(timeout=0.0):
685                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
686                 bundle.was_cancelled = True
687                 return True
688         return False
689
690     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
691         """Find a worker for bundle or block until one is available."""
692         self.adjust_task_count(+1)
693         uuid = bundle.uuid
694         hostname = bundle.hostname
695         avoid_machine = override_avoid_machine
696         is_original = bundle.src_bundle is None
697
698         # Try not to schedule a backup on the same host as the original.
699         if avoid_machine is None and bundle.src_bundle is not None:
700             avoid_machine = bundle.src_bundle.machine
701         worker = None
702         while worker is None:
703             worker = self.find_available_worker_or_block(avoid_machine)
704         assert worker
705
706         # Ok, found a worker.
707         bundle.worker = worker
708         machine = bundle.machine = worker.machine
709         username = bundle.username = worker.username
710         self.status.record_acquire_worker(worker, uuid)
711         logger.debug(f'{bundle}: Running bundle on {worker}...')
712
713         # Before we do any work, make sure the bundle is still viable.
714         # It may have been some time between when it was submitted and
715         # now due to lack of worker availability and someone else may
716         # have already finished it.
717         if self.check_if_cancelled(bundle):
718             try:
719                 return self.process_work_result(bundle)
720             except Exception as e:
721                 logger.warning(
722                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
723                 )
724                 self.release_worker(bundle)
725                 if is_original:
726                     # Weird.  We are the original owner of this
727                     # bundle.  For it to have been cancelled, a backup
728                     # must have already started and completed before
729                     # we even for started.  Moreover, the backup says
730                     # it is done but we can't find the results it
731                     # should have copied over.  Reschedule the whole
732                     # thing.
733                     logger.exception(e)
734                     logger.error(
735                         f'{bundle}: We are the original owner thread and yet there are '
736                         + 'no results for this bundle.  This is unexpected and bad.'
737                     )
738                     return self.emergency_retry_nasty_bundle(bundle)
739                 else:
740                     # Expected(?).  We're a backup and our bundle is
741                     # cancelled before we even got started.  Something
742                     # went bad in process_work_result (I acutually don't
743                     # see what?) but probably not worth worrying
744                     # about.  Let the original thread worry about
745                     # either finding the results or complaining about
746                     # it.
747                     return None
748
749         # Send input code / data to worker machine if it's not local.
750         if hostname not in machine:
751             try:
752                 cmd = (
753                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
754                 )
755                 start_ts = time.time()
756                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
757                 run_silently(cmd)
758                 xfer_latency = time.time() - start_ts
759                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
760             except Exception as e:
761                 self.release_worker(bundle)
762                 if is_original:
763                     # Weird.  We tried to copy the code to the worker and it failed...
764                     # And we're the original bundle.  We have to retry.
765                     logger.exception(e)
766                     logger.error(
767                         f"{bundle}: Failed to send instructions to the worker machine?! "
768                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
769                         + "be a race condition.  Attempting an emergency retry..."
770                     )
771                     return self.emergency_retry_nasty_bundle(bundle)
772                 else:
773                     # This is actually expected; we're a backup.
774                     # There's a race condition where someone else
775                     # already finished the work and removed the source
776                     # code file before we could copy it.  No biggie.
777                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
778                     msg += 'We\'re a backup and this may be caused by the original (or some '
779                     msg += 'other backup) already finishing this work.  Ignoring this.'
780                     logger.warning(msg)
781                     return None
782
783         # Kick off the work.  Note that if this fails we let
784         # wait_for_process deal with it.
785         self.status.record_processing_began(uuid)
786         cmd = (
787             f'{SSH} {bundle.username}@{bundle.machine} '
788             f'"source py38-venv/bin/activate &&'
789             f' /home/scott/lib/python_modules/remote_worker.py'
790             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
791         )
792         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
793         p = cmd_in_background(cmd, silent=True)
794         bundle.pid = p.pid
795         logger.debug(
796             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
797         )
798         return self.wait_for_process(p, bundle, 0)
799
800     def wait_for_process(
801         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
802     ) -> Any:
803         machine = bundle.machine
804         pid = p.pid
805         if depth > 3:
806             logger.error(
807                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
808             )
809             p.terminate()
810             self.release_worker(bundle)
811             return self.emergency_retry_nasty_bundle(bundle)
812
813         # Spin until either the ssh job we scheduled finishes the
814         # bundle or some backup worker signals that they finished it
815         # before we could.
816         while True:
817             try:
818                 p.wait(timeout=0.25)
819             except subprocess.TimeoutExpired:
820                 if self.check_if_cancelled(bundle):
821                     logger.info(
822                         f'{bundle}: looks like another worker finished bundle...'
823                     )
824                     break
825             else:
826                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
827                 p = None
828                 break
829
830         # If we get here we believe the bundle is done; either the ssh
831         # subprocess finished (hopefully successfully) or we noticed
832         # that some other worker seems to have completed the bundle
833         # and we're bailing out.
834         try:
835             ret = self.process_work_result(bundle)
836             if ret is not None and p is not None:
837                 p.terminate()
838             return ret
839
840         # Something went wrong; e.g. we could not copy the results
841         # back, cleanup after ourselves on the remote machine, or
842         # unpickle the results we got from the remove machine.  If we
843         # still have an active ssh subprocess, keep waiting on it.
844         # Otherwise, time for an emergency reschedule.
845         except Exception as e:
846             logger.exception(e)
847             logger.error(f'{bundle}: Something unexpected just happened...')
848             if p is not None:
849                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
850                 logger.warning(msg)
851                 return self.wait_for_process(p, bundle, depth + 1)
852             else:
853                 self.release_worker(bundle)
854                 return self.emergency_retry_nasty_bundle(bundle)
855
856     def process_work_result(self, bundle: BundleDetails) -> Any:
857         with self.status.lock:
858             is_original = bundle.src_bundle is None
859             was_cancelled = bundle.was_cancelled
860             username = bundle.username
861             machine = bundle.machine
862             result_file = bundle.result_file
863             code_file = bundle.code_file
864
865             # Whether original or backup, if we finished first we must
866             # fetch the results if the computation happened on a
867             # remote machine.
868             bundle.end_ts = time.time()
869             if not was_cancelled:
870                 assert bundle.machine is not None
871                 if bundle.hostname not in bundle.machine:
872                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
873                     logger.info(
874                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
875                     )
876
877                     # If either of these throw they are handled in
878                     # wait_for_process.
879                     attempts = 0
880                     while True:
881                         try:
882                             run_silently(cmd)
883                         except Exception as e:
884                             attempts += 1
885                             if attempts >= 3:
886                                 raise (e)
887                         else:
888                             break
889
890                     run_silently(
891                         f'{SSH} {username}@{machine}'
892                         f' "/bin/rm -f {code_file} {result_file}"'
893                     )
894                     logger.debug(
895                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
896                     )
897                 dur = bundle.end_ts - bundle.start_ts
898                 self.histogram.add_item(dur)
899
900         # Only the original worker should unpickle the file contents
901         # though since it's the only one whose result matters.  The
902         # original is also the only job that may delete result_file
903         # from disk.  Note that the original may have been cancelled
904         # if one of the backups finished first; it still must read the
905         # result from disk.
906         if is_original:
907             logger.debug(f"{bundle}: Unpickling {result_file}.")
908             try:
909                 with open(result_file, 'rb') as rb:
910                     serialized = rb.read()
911                 result = cloudpickle.loads(serialized)
912             except Exception as e:
913                 logger.exception(e)
914                 msg = f'Failed to load {result_file}... this is bad news.'
915                 logger.critical(msg)
916                 self.release_worker(bundle)
917
918                 # Re-raise the exception; the code in wait_for_process may
919                 # decide to emergency_retry_nasty_bundle here.
920                 raise Exception(e)
921             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
922             os.remove(f'{result_file}')
923             os.remove(f'{code_file}')
924
925             # Notify any backups that the original is done so they
926             # should stop ASAP.  Do this whether or not we
927             # finished first since there could be more than one
928             # backup.
929             if bundle.backup_bundles is not None:
930                 for backup in bundle.backup_bundles:
931                     logger.debug(
932                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
933                     )
934                     backup.is_cancelled.set()
935
936         # This is a backup job and, by now, we have already fetched
937         # the bundle results.
938         else:
939             # Backup results don't matter, they just need to leave the
940             # result file in the right place for their originals to
941             # read/unpickle later.
942             result = None
943
944             # Tell the original to stop if we finished first.
945             if not was_cancelled:
946                 logger.debug(
947                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
948                 )
949                 bundle.src_bundle.is_cancelled.set()
950         self.release_worker(bundle, was_cancelled=was_cancelled)
951         return result
952
953     def create_original_bundle(self, pickle, fname: str):
954         from string_utils import generate_uuid
955
956         uuid = generate_uuid(omit_dashes=True)
957         code_file = f'/tmp/{uuid}.code.bin'
958         result_file = f'/tmp/{uuid}.result.bin'
959
960         logger.debug(f'Writing pickled code to {code_file}')
961         with open(f'{code_file}', 'wb') as wb:
962             wb.write(pickle)
963
964         bundle = BundleDetails(
965             pickled_code=pickle,
966             uuid=uuid,
967             fname=fname,
968             worker=None,
969             username=None,
970             machine=None,
971             hostname=platform.node(),
972             code_file=code_file,
973             result_file=result_file,
974             pid=0,
975             start_ts=time.time(),
976             end_ts=0.0,
977             slower_than_local_p95=False,
978             slower_than_global_p95=False,
979             src_bundle=None,
980             is_cancelled=threading.Event(),
981             was_cancelled=False,
982             backup_bundles=[],
983             failure_count=0,
984         )
985         self.status.record_bundle_details(bundle)
986         logger.debug(f'{bundle}: Created an original bundle')
987         return bundle
988
989     def create_backup_bundle(self, src_bundle: BundleDetails):
990         assert src_bundle.backup_bundles is not None
991         n = len(src_bundle.backup_bundles)
992         uuid = src_bundle.uuid + f'_backup#{n}'
993
994         backup_bundle = BundleDetails(
995             pickled_code=src_bundle.pickled_code,
996             uuid=uuid,
997             fname=src_bundle.fname,
998             worker=None,
999             username=None,
1000             machine=None,
1001             hostname=src_bundle.hostname,
1002             code_file=src_bundle.code_file,
1003             result_file=src_bundle.result_file,
1004             pid=0,
1005             start_ts=time.time(),
1006             end_ts=0.0,
1007             slower_than_local_p95=False,
1008             slower_than_global_p95=False,
1009             src_bundle=src_bundle,
1010             is_cancelled=threading.Event(),
1011             was_cancelled=False,
1012             backup_bundles=None,  # backup backups not allowed
1013             failure_count=0,
1014         )
1015         src_bundle.backup_bundles.append(backup_bundle)
1016         self.status.record_bundle_details_already_locked(backup_bundle)
1017         logger.debug(f'{backup_bundle}: Created a backup bundle')
1018         return backup_bundle
1019
1020     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1021         assert self.status.lock.locked()
1022         assert src_bundle is not None
1023         backup_bundle = self.create_backup_bundle(src_bundle)
1024         logger.debug(
1025             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1026         )
1027         self._helper_executor.submit(self.launch, backup_bundle)
1028
1029         # Results from backups don't matter; if they finish first
1030         # they will move the result_file to this machine and let
1031         # the original pick them up and unpickle them.
1032
1033     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1034         is_original = bundle.src_bundle is None
1035         bundle.worker = None
1036         avoid_last_machine = bundle.machine
1037         bundle.machine = None
1038         bundle.username = None
1039         bundle.failure_count += 1
1040         if is_original:
1041             retry_limit = 3
1042         else:
1043             retry_limit = 2
1044
1045         if bundle.failure_count > retry_limit:
1046             logger.error(
1047                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1048             )
1049             if is_original:
1050                 raise RemoteExecutorException(
1051                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1052                 )
1053             else:
1054                 logger.error(
1055                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1056                 )
1057             return None
1058         else:
1059             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1060             logger.warning(msg)
1061             warnings.warn(msg)
1062             return self.launch(bundle, avoid_last_machine)
1063
1064     @overrides
1065     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1066         pickle = make_cloud_pickle(function, *args, **kwargs)
1067         bundle = self.create_original_bundle(pickle, function.__name__)
1068         self.total_bundles_submitted += 1
1069         return self._helper_executor.submit(self.launch, bundle)
1070
1071     @overrides
1072     def shutdown(self, wait=True) -> None:
1073         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1074         self.heartbeat_stop_event.set()
1075         self.heartbeat_thread.join()
1076         self._helper_executor.shutdown(wait)
1077         print(self.histogram)
1078
1079
1080 @singleton
1081 class DefaultExecutors(object):
1082     def __init__(self):
1083         self.thread_executor: Optional[ThreadExecutor] = None
1084         self.process_executor: Optional[ProcessExecutor] = None
1085         self.remote_executor: Optional[RemoteExecutor] = None
1086
1087     def ping(self, host) -> bool:
1088         logger.debug(f'RUN> ping -c 1 {host}')
1089         try:
1090             x = cmd_with_timeout(
1091                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1092             )
1093             return x == 0
1094         except Exception:
1095             return False
1096
1097     def thread_pool(self) -> ThreadExecutor:
1098         if self.thread_executor is None:
1099             self.thread_executor = ThreadExecutor()
1100         return self.thread_executor
1101
1102     def process_pool(self) -> ProcessExecutor:
1103         if self.process_executor is None:
1104             self.process_executor = ProcessExecutor()
1105         return self.process_executor
1106
1107     def remote_pool(self) -> RemoteExecutor:
1108         if self.remote_executor is None:
1109             logger.info('Looking for some helper machines...')
1110             pool: List[RemoteWorkerRecord] = []
1111             if self.ping('cheetah.house'):
1112                 logger.info('Found cheetah.house')
1113                 pool.append(
1114                     RemoteWorkerRecord(
1115                         username='scott',
1116                         machine='cheetah.house',
1117                         weight=30,
1118                         count=6,
1119                     ),
1120                 )
1121             if self.ping('meerkat.cabin'):
1122                 logger.info('Found meerkat.cabin')
1123                 pool.append(
1124                     RemoteWorkerRecord(
1125                         username='scott',
1126                         machine='meerkat.cabin',
1127                         weight=12,
1128                         count=2,
1129                     ),
1130                 )
1131             if self.ping('wannabe.house'):
1132                 logger.info('Found wannabe.house')
1133                 pool.append(
1134                     RemoteWorkerRecord(
1135                         username='scott',
1136                         machine='wannabe.house',
1137                         weight=25,
1138                         count=10,
1139                     ),
1140                 )
1141             if self.ping('puma.cabin'):
1142                 logger.info('Found puma.cabin')
1143                 pool.append(
1144                     RemoteWorkerRecord(
1145                         username='scott',
1146                         machine='puma.cabin',
1147                         weight=30,
1148                         count=6,
1149                     ),
1150                 )
1151             if self.ping('backup.house'):
1152                 logger.info('Found backup.house')
1153                 pool.append(
1154                     RemoteWorkerRecord(
1155                         username='scott',
1156                         machine='backup.house',
1157                         weight=8,
1158                         count=2,
1159                     ),
1160                 )
1161
1162             # The controller machine has a lot to do; go easy on it.
1163             for record in pool:
1164                 if record.machine == platform.node() and record.count > 1:
1165                     logger.info(f'Reducing workload for {record.machine}.')
1166                     record.count = 1
1167
1168             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1169             policy.register_worker_pool(pool)
1170             self.remote_executor = RemoteExecutor(pool, policy)
1171         return self.remote_executor
1172
1173     def shutdown(self) -> None:
1174         if self.thread_executor is not None:
1175             self.thread_executor.shutdown()
1176             self.thread_executor = None
1177         if self.process_executor is not None:
1178             self.process_executor.shutdown()
1179             self.process_executor = None
1180         if self.remote_executor is not None:
1181             self.remote_executor.shutdown()
1182             self.remote_executor = None