Start using warnings from stdlib.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Executors ({__file__})",
34     "Args related to processing executors."
35 )
36 parser.add_argument(
37     '--executors_threadpool_size',
38     type=int,
39     metavar='#THREADS',
40     help='Number of threads in the default threadpool, leave unset for default',
41     default=None
42 )
43 parser.add_argument(
44     '--executors_processpool_size',
45     type=int,
46     metavar='#PROCESSES',
47     help='Number of processes in the default processpool, leave unset for default',
48     default=None,
49 )
50 parser.add_argument(
51     '--executors_schedule_remote_backups',
52     default=True,
53     action=argparse_utils.ActionNoYes,
54     help='Should we schedule duplicative backup work if a remote bundle is slow',
55 )
56 parser.add_argument(
57     '--executors_max_bundle_failures',
58     type=int,
59     default=3,
60     metavar='#FAILURES',
61     help='Maximum number of failures before giving up on a bundle',
62 )
63
64 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
65 SSH = 'ssh -oForwardX11=no'
66
67
68 def make_cloud_pickle(fun, *args, **kwargs):
69     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
70     return cloudpickle.dumps((fun, args, kwargs))
71
72
73 class BaseExecutor(ABC):
74     def __init__(self, *, title=''):
75         self.title = title
76         self.task_count = 0
77         self.histogram = hist.SimpleHistogram(
78             hist.SimpleHistogram.n_evenly_spaced_buckets(
79                 int(0), int(500), 50
80             )
81         )
82
83     @abstractmethod
84     def submit(self,
85                function: Callable,
86                *args,
87                **kwargs) -> fut.Future:
88         pass
89
90     @abstractmethod
91     def shutdown(self,
92                  wait: bool = True) -> None:
93         pass
94
95     def adjust_task_count(self, delta: int) -> None:
96         self.task_count += delta
97         logger.debug(f'Executor current task count is {self.task_count}')
98
99
100 class ThreadExecutor(BaseExecutor):
101     def __init__(self,
102                  max_workers: Optional[int] = None):
103         super().__init__()
104         workers = None
105         if max_workers is not None:
106             workers = max_workers
107         elif 'executors_threadpool_size' in config.config:
108             workers = config.config['executors_threadpool_size']
109         logger.debug(f'Creating threadpool executor with {workers} workers')
110         self._thread_pool_executor = fut.ThreadPoolExecutor(
111             max_workers=workers,
112             thread_name_prefix="thread_executor_helper"
113         )
114
115     def run_local_bundle(self, fun, *args, **kwargs):
116         logger.debug(f"Running local bundle at {fun.__name__}")
117         start = time.time()
118         result = fun(*args, **kwargs)
119         end = time.time()
120         self.adjust_task_count(-1)
121         duration = end - start
122         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
123         self.histogram.add_item(duration)
124         return result
125
126     @overrides
127     def submit(self,
128                function: Callable,
129                *args,
130                **kwargs) -> fut.Future:
131         self.adjust_task_count(+1)
132         newargs = []
133         newargs.append(function)
134         for arg in args:
135             newargs.append(arg)
136         return self._thread_pool_executor.submit(
137             self.run_local_bundle,
138             *newargs,
139             **kwargs)
140
141     @overrides
142     def shutdown(self,
143                  wait = True) -> None:
144         logger.debug(f'Shutting down threadpool executor {self.title}')
145         print(self.histogram)
146         self._thread_pool_executor.shutdown(wait)
147
148
149 class ProcessExecutor(BaseExecutor):
150     def __init__(self,
151                  max_workers=None):
152         super().__init__()
153         workers = None
154         if max_workers is not None:
155             workers = max_workers
156         elif 'executors_processpool_size' in config.config:
157             workers = config.config['executors_processpool_size']
158         logger.debug(f'Creating processpool executor with {workers} workers.')
159         self._process_executor = fut.ProcessPoolExecutor(
160             max_workers=workers,
161         )
162
163     def run_cloud_pickle(self, pickle):
164         fun, args, kwargs = cloudpickle.loads(pickle)
165         logger.debug(f"Running pickled bundle at {fun.__name__}")
166         result = fun(*args, **kwargs)
167         self.adjust_task_count(-1)
168         return result
169
170     @overrides
171     def submit(self,
172                function: Callable,
173                *args,
174                **kwargs) -> fut.Future:
175         start = time.time()
176         self.adjust_task_count(+1)
177         pickle = make_cloud_pickle(function, *args, **kwargs)
178         result = self._process_executor.submit(
179             self.run_cloud_pickle,
180             pickle
181         )
182         result.add_done_callback(
183             lambda _: self.histogram.add_item(
184                 time.time() - start
185             )
186         )
187         return result
188
189     @overrides
190     def shutdown(self, wait=True) -> None:
191         logger.debug(f'Shutting down processpool executor {self.title}')
192         self._process_executor.shutdown(wait)
193         print(self.histogram)
194
195     def __getstate__(self):
196         state = self.__dict__.copy()
197         state['_process_executor'] = None
198         return state
199
200
201 class RemoteExecutorException(Exception):
202     """Thrown when a bundle cannot be executed despite several retries."""
203     pass
204
205
206 @dataclass
207 class RemoteWorkerRecord:
208     username: str
209     machine: str
210     weight: int
211     count: int
212
213     def __hash__(self):
214         return hash((self.username, self.machine))
215
216     def __repr__(self):
217         return f'{self.username}@{self.machine}'
218
219
220 @dataclass
221 class BundleDetails:
222     pickled_code: bytes
223     uuid: str
224     fname: str
225     worker: Optional[RemoteWorkerRecord]
226     username: Optional[str]
227     machine: Optional[str]
228     hostname: str
229     code_file: str
230     result_file: str
231     pid: int
232     start_ts: float
233     end_ts: float
234     slower_than_local_p95: bool
235     slower_than_global_p95: bool
236     src_bundle: BundleDetails
237     is_cancelled: threading.Event
238     was_cancelled: bool
239     backup_bundles: Optional[List[BundleDetails]]
240     failure_count: int
241
242     def __repr__(self):
243         uuid = self.uuid
244         if uuid[-9:-2] == '_backup':
245             uuid = uuid[:-9]
246             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
247         else:
248             suffix = uuid[-6:]
249
250         colorz = [
251             fg('violet red'),
252             fg('red'),
253             fg('orange'),
254             fg('peach orange'),
255             fg('yellow'),
256             fg('marigold yellow'),
257             fg('green yellow'),
258             fg('tea green'),
259             fg('cornflower blue'),
260             fg('turquoise blue'),
261             fg('tropical blue'),
262             fg('lavender purple'),
263             fg('medium purple'),
264         ]
265         c = colorz[int(uuid[-2:], 16) % len(colorz)]
266         fname = self.fname if self.fname is not None else 'nofname'
267         machine = self.machine if self.machine is not None else 'nomachine'
268         return f'{c}{suffix}/{fname}/{machine}{reset()}'
269
270
271 class RemoteExecutorStatus:
272     def __init__(self, total_worker_count: int) -> None:
273         self.worker_count = total_worker_count
274         self.known_workers: Set[RemoteWorkerRecord] = set()
275         self.start_per_bundle: Dict[str, float] = defaultdict(float)
276         self.end_per_bundle: Dict[str, float] = defaultdict(float)
277         self.finished_bundle_timings_per_worker: Dict[
278             RemoteWorkerRecord,
279             List[float]
280         ] = {}
281         self.in_flight_bundles_by_worker: Dict[
282             RemoteWorkerRecord,
283             Set[str]
284         ] = {}
285         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
286         self.finished_bundle_timings: List[float] = []
287         self.last_periodic_dump: Optional[float] = None
288         self.total_bundles_submitted = 0
289
290         # Protects reads and modification using self.  Also used
291         # as a memory fence for modifications to bundle.
292         self.lock = threading.Lock()
293
294     def record_acquire_worker(
295             self,
296             worker: RemoteWorkerRecord,
297             uuid: str
298     ) -> None:
299         with self.lock:
300             self.record_acquire_worker_already_locked(
301                 worker,
302                 uuid
303             )
304
305     def record_acquire_worker_already_locked(
306             self,
307             worker: RemoteWorkerRecord,
308             uuid: str
309     ) -> None:
310         assert self.lock.locked()
311         self.known_workers.add(worker)
312         self.start_per_bundle[uuid] = None
313         x = self.in_flight_bundles_by_worker.get(worker, set())
314         x.add(uuid)
315         self.in_flight_bundles_by_worker[worker] = x
316
317     def record_bundle_details(
318             self,
319             details: BundleDetails) -> None:
320         with self.lock:
321             self.record_bundle_details_already_locked(details)
322
323     def record_bundle_details_already_locked(
324             self,
325             details: BundleDetails) -> None:
326         assert self.lock.locked()
327         self.bundle_details_by_uuid[details.uuid] = details
328
329     def record_release_worker(
330             self,
331             worker: RemoteWorkerRecord,
332             uuid: str,
333             was_cancelled: bool,
334     ) -> None:
335         with self.lock:
336             self.record_release_worker_already_locked(
337                 worker, uuid, was_cancelled
338             )
339
340     def record_release_worker_already_locked(
341             self,
342             worker: RemoteWorkerRecord,
343             uuid: str,
344             was_cancelled: bool,
345     ) -> None:
346         assert self.lock.locked()
347         ts = time.time()
348         self.end_per_bundle[uuid] = ts
349         self.in_flight_bundles_by_worker[worker].remove(uuid)
350         if not was_cancelled:
351             bundle_latency = ts - self.start_per_bundle[uuid]
352             x = self.finished_bundle_timings_per_worker.get(worker, list())
353             x.append(bundle_latency)
354             self.finished_bundle_timings_per_worker[worker] = x
355             self.finished_bundle_timings.append(bundle_latency)
356
357     def record_processing_began(self, uuid: str):
358         with self.lock:
359             self.start_per_bundle[uuid] = time.time()
360
361     def total_in_flight(self) -> int:
362         assert self.lock.locked()
363         total_in_flight = 0
364         for worker in self.known_workers:
365             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
366         return total_in_flight
367
368     def total_idle(self) -> int:
369         assert self.lock.locked()
370         return self.worker_count - self.total_in_flight()
371
372     def __repr__(self):
373         assert self.lock.locked()
374         ts = time.time()
375         total_finished = len(self.finished_bundle_timings)
376         total_in_flight = self.total_in_flight()
377         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
378         qall = None
379         if len(self.finished_bundle_timings) > 1:
380             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
381             ret += (
382                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
383                 f'✅={total_finished}/{self.total_bundles_submitted}, '
384                 f'💻n={total_in_flight}/{self.worker_count}\n'
385             )
386         else:
387             ret += (
388                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
389                 f'💻n={total_in_flight}/{self.worker_count}\n'
390             )
391
392         for worker in self.known_workers:
393             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
394             timings = self.finished_bundle_timings_per_worker.get(worker, [])
395             count = len(timings)
396             qworker = None
397             if count > 1:
398                 qworker = numpy.quantile(timings, [0.5, 0.95])
399                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
400             else:
401                 ret += '\n'
402             if count > 0:
403                 ret += f'    ...finished {count} total bundle(s) so far\n'
404             in_flight = len(self.in_flight_bundles_by_worker[worker])
405             if in_flight > 0:
406                 ret += f'    ...{in_flight} bundles currently in flight:\n'
407                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
408                     details = self.bundle_details_by_uuid.get(
409                         bundle_uuid,
410                         None
411                     )
412                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
413                     if self.start_per_bundle[bundle_uuid] is not None:
414                         sec = ts - self.start_per_bundle[bundle_uuid]
415                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
416                     else:
417                         ret += f'       {details} setting up / copying data...'
418                         sec = 0.0
419
420                     if qworker is not None:
421                         if sec > qworker[1]:
422                             ret += f'{bg("red")}>💻p95{reset()} '
423                             if details is not None:
424                                 details.slower_than_local_p95 = True
425                         else:
426                             if details is not None:
427                                 details.slower_than_local_p95 = False
428
429                     if qall is not None:
430                         if sec > qall[1]:
431                             ret += f'{bg("red")}>∀p95{reset()} '
432                             if details is not None:
433                                 details.slower_than_global_p95 = True
434                         else:
435                             details.slower_than_global_p95 = False
436                     ret += '\n'
437         return ret
438
439     def periodic_dump(self, total_bundles_submitted: int) -> None:
440         assert self.lock.locked()
441         self.total_bundles_submitted = total_bundles_submitted
442         ts = time.time()
443         if (
444                 self.last_periodic_dump is None
445                 or ts - self.last_periodic_dump > 5.0
446         ):
447             print(self)
448             self.last_periodic_dump = ts
449
450
451 class RemoteWorkerSelectionPolicy(ABC):
452     def register_worker_pool(self, workers):
453         self.workers = workers
454
455     @abstractmethod
456     def is_worker_available(self) -> bool:
457         pass
458
459     @abstractmethod
460     def acquire_worker(
461             self,
462             machine_to_avoid = None
463     ) -> Optional[RemoteWorkerRecord]:
464         pass
465
466
467 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
468     @overrides
469     def is_worker_available(self) -> bool:
470         for worker in self.workers:
471             if worker.count > 0:
472                 return True
473         return False
474
475     @overrides
476     def acquire_worker(
477             self,
478             machine_to_avoid = None
479     ) -> Optional[RemoteWorkerRecord]:
480         grabbag = []
481         for worker in self.workers:
482             for x in range(0, worker.count):
483                 for y in range(0, worker.weight):
484                     grabbag.append(worker)
485
486         for _ in range(0, 5):
487             random.shuffle(grabbag)
488             worker = grabbag[0]
489             if worker.machine != machine_to_avoid or _ > 2:
490                 if worker.count > 0:
491                     worker.count -= 1
492                     logger.debug(f'Selected worker {worker}')
493                     return worker
494         msg = 'Unexpectedly could not find a worker, retrying...'
495         logger.warning(msg)
496         warnings.warn(msg)
497         return None
498
499
500 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
501     def __init__(self) -> None:
502         self.index = 0
503
504     @overrides
505     def is_worker_available(self) -> bool:
506         for worker in self.workers:
507             if worker.count > 0:
508                 return True
509         return False
510
511     @overrides
512     def acquire_worker(
513             self,
514             machine_to_avoid: str = None
515     ) -> Optional[RemoteWorkerRecord]:
516         x = self.index
517         while True:
518             worker = self.workers[x]
519             if worker.count > 0:
520                 worker.count -= 1
521                 x += 1
522                 if x >= len(self.workers):
523                     x = 0
524                 self.index = x
525                 logger.debug(f'Selected worker {worker}')
526                 return worker
527             x += 1
528             if x >= len(self.workers):
529                 x = 0
530             if x == self.index:
531                 msg = 'Unexpectedly could not find a worker, retrying...'
532                 logger.warning(msg)
533                 warnings.warn(msg)
534                 return None
535
536
537 class RemoteExecutor(BaseExecutor):
538     def __init__(self,
539                  workers: List[RemoteWorkerRecord],
540                  policy: RemoteWorkerSelectionPolicy) -> None:
541         super().__init__()
542         self.workers = workers
543         self.policy = policy
544         self.worker_count = 0
545         for worker in self.workers:
546             self.worker_count += worker.count
547         if self.worker_count <= 0:
548             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
549             logger.critical(msg)
550             raise RemoteExecutorException(msg)
551         self.policy.register_worker_pool(self.workers)
552         self.cv = threading.Condition()
553         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
554         self._helper_executor = fut.ThreadPoolExecutor(
555             thread_name_prefix="remote_executor_helper",
556             max_workers=self.worker_count,
557         )
558         self.status = RemoteExecutorStatus(self.worker_count)
559         self.total_bundles_submitted = 0
560         self.backup_lock = threading.Lock()
561         self.last_backup = None
562
563     def is_worker_available(self) -> bool:
564         return self.policy.is_worker_available()
565
566     def acquire_worker(
567             self,
568             machine_to_avoid: str = None
569     ) -> Optional[RemoteWorkerRecord]:
570         return self.policy.acquire_worker(machine_to_avoid)
571
572     def find_available_worker_or_block(
573             self,
574             machine_to_avoid: str = None
575     ) -> RemoteWorkerRecord:
576         with self.cv:
577             while not self.is_worker_available():
578                 self.cv.wait()
579             worker = self.acquire_worker(machine_to_avoid)
580             if worker is not None:
581                 return worker
582         msg = "We should never reach this point in the code"
583         logger.critical(msg)
584         raise Exception(msg)
585
586     def release_worker(self, worker: RemoteWorkerRecord) -> None:
587         logger.debug(f'Released worker {worker}')
588         with self.cv:
589             worker.count += 1
590             self.cv.notify()
591
592     def heartbeat(self) -> None:
593         with self.status.lock:
594             # Regular progress report
595             self.status.periodic_dump(self.total_bundles_submitted)
596
597             # Look for bundles to reschedule.
598             num_done = len(self.status.finished_bundle_timings)
599             num_idle_workers = self.worker_count - self.task_count
600             now = time.time()
601             if (
602                     config.config['executors_schedule_remote_backups']
603                     and num_done > 2
604                     and num_idle_workers > 1
605                     and (self.last_backup is None or (now - self.last_backup > 1.0))
606                     and self.backup_lock.acquire(blocking=False)
607             ):
608                 try:
609                     assert self.backup_lock.locked()
610
611                     bundle_to_backup = None
612                     best_score = None
613                     for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
614                         # Prefer to schedule backups of bundles on slower machines.
615                         base_score = 0
616                         for record in self.workers:
617                             if worker.machine == record.machine:
618                                 base_score = float(record.weight)
619                                 base_score = 1.0 / base_score
620                                 base_score *= 200.0
621                                 base_score = int(base_score)
622                                 break
623
624                         for uuid in bundle_uuids:
625                             bundle = self.status.bundle_details_by_uuid.get(uuid, None)
626                             if (
627                                     bundle is not None
628                                     and bundle.src_bundle is None
629                                     and bundle.backup_bundles is not None
630                             ):
631                                 score = base_score
632
633                                 # Schedule backups of bundles running longer; especially those
634                                 # that are unexpectedly slow.
635                                 start_ts = self.status.start_per_bundle[uuid]
636                                 if start_ts is not None:
637                                     runtime = now - start_ts
638                                     score += runtime
639                                     logger.debug(f'score[{bundle}] => {score}  # latency boost')
640
641                                     if bundle.slower_than_local_p95:
642                                         score += runtime / 2
643                                         logger.debug(f'score[{bundle}] => {score}  # >worker p95')
644
645                                     if bundle.slower_than_global_p95:
646                                         score += runtime / 2
647                                         logger.debug(f'score[{bundle}] => {score}  # >global p95')
648
649                                 # Prefer backups of bundles that don't have backups already.
650                                 backup_count = len(bundle.backup_bundles)
651                                 if backup_count == 0:
652                                     score *= 2
653                                 elif backup_count == 1:
654                                     score /= 2
655                                 elif backup_count == 2:
656                                     score /= 8
657                                 else:
658                                     score = 0
659                                 logger.debug(f'score[{bundle}] => {score}  # {backup_count} dup backup factor')
660
661                                 if (
662                                         score != 0
663                                         and (best_score is None or score > best_score)
664                                 ):
665                                     bundle_to_backup = bundle
666                                     assert bundle is not None
667                                     assert bundle.backup_bundles is not None
668                                     assert bundle.src_bundle is None
669                                     best_score = score
670
671                     if bundle_to_backup is not None:
672                         self.last_backup = now
673                         logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====')
674                         self.schedule_backup_for_bundle(bundle_to_backup)
675                 finally:
676                     self.backup_lock.release()
677
678     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
679         with self.status.lock:
680             if bundle.is_cancelled.wait(timeout=0.0):
681                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
682                 bundle.was_cancelled = True
683                 return True
684         return False
685
686     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
687         """Find a worker for bundle or block until one is available."""
688         self.adjust_task_count(+1)
689         uuid = bundle.uuid
690         hostname = bundle.hostname
691         avoid_machine = override_avoid_machine
692         is_original = bundle.src_bundle is None
693
694         # Try not to schedule a backup on the same host as the original.
695         if avoid_machine is None and bundle.src_bundle is not None:
696             avoid_machine = bundle.src_bundle.machine
697         worker = None
698         while worker is None:
699             worker = self.find_available_worker_or_block(avoid_machine)
700
701         # Ok, found a worker.
702         bundle.worker = worker
703         machine = bundle.machine = worker.machine
704         username = bundle.username = worker.username
705
706         self.status.record_acquire_worker(worker, uuid)
707         logger.debug(f'{bundle}: Running bundle on {worker}...')
708
709         # Before we do any work, make sure the bundle is still viable.
710         if self.check_if_cancelled(bundle):
711             try:
712                 return self.post_launch_work(bundle)
713             except Exception as e:
714                 logger.exception(e)
715                 logger.error(
716                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
717                 )
718                 assert bundle.worker is not None
719                 self.status.record_release_worker(
720                     bundle.worker,
721                     bundle.uuid,
722                     True,
723                 )
724                 self.release_worker(bundle.worker)
725                 self.adjust_task_count(-1)
726                 if is_original:
727                     # Weird.  We are the original owner of this
728                     # bundle.  For it to have been cancelled, a backup
729                     # must have already started and completed before
730                     # we even for started.  Moreover, the backup says
731                     # it is done but we can't find the results it
732                     # should have copied over.  Reschedule the whole
733                     # thing.
734                     return self.emergency_retry_nasty_bundle(bundle)
735                 else:
736                     # Expected(?).  We're a backup and our bundle is
737                     # cancelled before we even got started.  Something
738                     # went bad in post_launch_work (I acutually don't
739                     # see what?) but probably not worth worrying
740                     # about.
741                     return None
742
743         # Send input code / data to worker machine if it's not local.
744         if hostname not in machine:
745             try:
746                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
747                 start_ts = time.time()
748                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
749                 run_silently(cmd)
750                 xfer_latency = time.time() - start_ts
751                 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
752             except Exception as e:
753                 assert bundle.worker is not None
754                 self.status.record_release_worker(
755                     bundle.worker,
756                     bundle.uuid,
757                     True,
758                 )
759                 self.release_worker(bundle.worker)
760                 self.adjust_task_count(-1)
761                 if is_original:
762                     # Weird.  We tried to copy the code to the worker and it failed...
763                     # And we're the original bundle.  We have to retry.
764                     logger.exception(e)
765                     logger.error(
766                         f'{bundle}: Failed to send instructions to the worker machine?! ' +
767                         'This is not expected; we\'re the original bundle so this shouldn\'t ' +
768                         'be a race condition.  Attempting an emergency retry...'
769                     )
770                     return self.emergency_retry_nasty_bundle(bundle)
771                 else:
772                     # This is actually expected; we're a backup.
773                     # There's a race condition where someone else
774                     # already finished the work and removed the source
775                     # code file before we could copy it.  No biggie.
776                     msg = f'{bundle}: Failed to send instructions to the worker machine... ' +
777                         'We\'re a backup and this may be caused by the original (or some ' +
778                         'other backup) already finishing this work.  Ignoring this.'
779                     logger.warning(msg)
780                     warnings.warn(msg)
781                     return None
782
783         # Kick off the work.  Note that if this fails we let
784         # wait_for_process deal with it.
785         self.status.record_processing_began(uuid)
786         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
787                f'"source py38-venv/bin/activate &&'
788                f' /home/scott/lib/python_modules/remote_worker.py'
789                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
790         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
791         p = cmd_in_background(cmd, silent=True)
792         bundle.pid = pid = p.pid
793         logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.')
794         return self.wait_for_process(p, bundle, 0)
795
796     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
797         machine = bundle.machine
798         pid = p.pid
799         if depth > 3:
800             logger.error(
801                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
802             )
803             p.terminate()
804             self.status.record_release_worker(
805                 bundle.worker,
806                 bundle.uuid,
807                 True,
808             )
809             self.release_worker(bundle.worker)
810             self.adjust_task_count(-1)
811             return self.emergency_retry_nasty_bundle(bundle)
812
813         # Spin until either the ssh job we scheduled finishes the
814         # bundle or some backup worker signals that they finished it
815         # before we could.
816         while True:
817             try:
818                 p.wait(timeout=0.25)
819             except subprocess.TimeoutExpired:
820                 self.heartbeat()
821                 if self.check_if_cancelled(bundle):
822                     logger.info(
823                         f'{bundle}: another worker finished bundle, checking it out...'
824                     )
825                     break
826             else:
827                 logger.info(
828                     f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..."
829                 )
830                 p = None
831                 break
832
833         # If we get here we believe the bundle is done; either the ssh
834         # subprocess finished (hopefully successfully) or we noticed
835         # that some other worker seems to have completed the bundle
836         # and we're bailing out.
837         try:
838             ret = self.post_launch_work(bundle)
839             if ret is not None and p is not None:
840                 p.terminate()
841             return ret
842
843         # Something went wrong; e.g. we could not copy the results
844         # back, cleanup after ourselves on the remote machine, or
845         # unpickle the results we got from the remove machine.  If we
846         # still have an active ssh subprocess, keep waiting on it.
847         # Otherwise, time for an emergency reschedule.
848         except Exception as e:
849             logger.exception(e)
850             logger.error(f'{bundle}: Something unexpected just happened...')
851             if p is not None:
852                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
853                 logger.warning(msg)
854                 warnings.warn(msg)
855                 return self.wait_for_process(p, bundle, depth + 1)
856             else:
857                 self.status.record_release_worker(
858                     bundle.worker,
859                     bundle.uuid,
860                     True,
861                 )
862                 self.release_worker(bundle.worker)
863                 self.adjust_task_count(-1)
864                 return self.emergency_retry_nasty_bundle(bundle)
865
866     def post_launch_work(self, bundle: BundleDetails) -> Any:
867         with self.status.lock:
868             is_original = bundle.src_bundle is None
869             was_cancelled = bundle.was_cancelled
870             username = bundle.username
871             machine = bundle.machine
872             result_file = bundle.result_file
873             code_file = bundle.code_file
874
875             # Whether original or backup, if we finished first we must
876             # fetch the results if the computation happened on a
877             # remote machine.
878             bundle.end_ts = time.time()
879             if not was_cancelled:
880                 assert bundle.machine is not None
881                 if bundle.hostname not in bundle.machine:
882                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
883                     logger.info(
884                         f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
885                     )
886
887                     # If either of these throw they are handled in
888                     # wait_for_process.
889                     run_silently(cmd)
890                     run_silently(f'{SSH} {username}@{machine}'
891                                  f' "/bin/rm -f {code_file} {result_file}"')
892                 dur = bundle.end_ts - bundle.start_ts
893                 self.histogram.add_item(dur)
894
895         # Only the original worker should unpickle the file contents
896         # though since it's the only one whose result matters.  The
897         # original is also the only job that may delete result_file
898         # from disk.  Note that the original may have been cancelled
899         # if one of the backups finished first; it still must read the
900         # result from disk.
901         if is_original:
902             logger.debug(f"{bundle}: Unpickling {result_file}.")
903             try:
904                 with open(result_file, 'rb') as rb:
905                     serialized = rb.read()
906                 result = cloudpickle.loads(serialized)
907             except Exception as e:
908                 msg = f'Failed to load {result_file}, this is bad news.'
909                 logger.critical(msg)
910                 self.status.record_release_worker(
911                     bundle.worker,
912                     bundle.uuid,
913                     True,
914                 )
915                 self.release_worker(bundle.worker)
916
917                 # Re-raise the exception; the code in wait_for_process may
918                 # decide to emergency_retry_nasty_bundle here.
919                 raise Exception(e)
920
921             logger.debug(
922                 f'Removing local (master) {code_file} and {result_file}.'
923             )
924             os.remove(f'{result_file}')
925             os.remove(f'{code_file}')
926
927             # Notify any backups that the original is done so they
928             # should stop ASAP.  Do this whether or not we
929             # finished first since there could be more than one
930             # backup.
931             if bundle.backup_bundles is not None:
932                 for backup in bundle.backup_bundles:
933                     logger.debug(
934                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
935                     )
936                     backup.is_cancelled.set()
937
938         # This is a backup job and, by now, we have already fetched
939         # the bundle results.
940         else:
941             # Backup results don't matter, they just need to leave the
942             # result file in the right place for their originals to
943             # read/unpickle later.
944             result = None
945
946             # Tell the original to stop if we finished first.
947             if not was_cancelled:
948                 logger.debug(
949                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
950                 )
951                 bundle.src_bundle.is_cancelled.set()
952
953         assert bundle.worker is not None
954         self.status.record_release_worker(
955             bundle.worker,
956             bundle.uuid,
957             was_cancelled,
958         )
959         self.release_worker(bundle.worker)
960         self.adjust_task_count(-1)
961         return result
962
963     def create_original_bundle(self, pickle, fname: str):
964         from string_utils import generate_uuid
965         uuid = generate_uuid(as_hex=True)
966         code_file = f'/tmp/{uuid}.code.bin'
967         result_file = f'/tmp/{uuid}.result.bin'
968
969         logger.debug(f'Writing pickled code to {code_file}')
970         with open(f'{code_file}', 'wb') as wb:
971             wb.write(pickle)
972
973         bundle = BundleDetails(
974             pickled_code = pickle,
975             uuid = uuid,
976             fname = fname,
977             worker = None,
978             username = None,
979             machine = None,
980             hostname = platform.node(),
981             code_file = code_file,
982             result_file = result_file,
983             pid = 0,
984             start_ts = time.time(),
985             end_ts = 0.0,
986             slower_than_local_p95 = False,
987             slower_than_global_p95 = False,
988             src_bundle = None,
989             is_cancelled = threading.Event(),
990             was_cancelled = False,
991             backup_bundles = [],
992             failure_count = 0,
993         )
994         self.status.record_bundle_details(bundle)
995         logger.debug(f'{bundle}: Created an original bundle')
996         return bundle
997
998     def create_backup_bundle(self, src_bundle: BundleDetails):
999         assert src_bundle.backup_bundles is not None
1000         n = len(src_bundle.backup_bundles)
1001         uuid = src_bundle.uuid + f'_backup#{n}'
1002
1003         backup_bundle = BundleDetails(
1004             pickled_code = src_bundle.pickled_code,
1005             uuid = uuid,
1006             fname = src_bundle.fname,
1007             worker = None,
1008             username = None,
1009             machine = None,
1010             hostname = src_bundle.hostname,
1011             code_file = src_bundle.code_file,
1012             result_file = src_bundle.result_file,
1013             pid = 0,
1014             start_ts = time.time(),
1015             end_ts = 0.0,
1016             slower_than_local_p95 = False,
1017             slower_than_global_p95 = False,
1018             src_bundle = src_bundle,
1019             is_cancelled = threading.Event(),
1020             was_cancelled = False,
1021             backup_bundles = None,    # backup backups not allowed
1022             failure_count = 0,
1023         )
1024         src_bundle.backup_bundles.append(backup_bundle)
1025         self.status.record_bundle_details_already_locked(backup_bundle)
1026         logger.debug(f'{backup_bundle}: Created a backup bundle')
1027         return backup_bundle
1028
1029     def schedule_backup_for_bundle(self,
1030                                    src_bundle: BundleDetails):
1031         assert self.status.lock.locked()
1032         assert src_bundle is not None
1033         backup_bundle = self.create_backup_bundle(src_bundle)
1034         logger.debug(
1035             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1036         )
1037         self._helper_executor.submit(self.launch, backup_bundle)
1038
1039         # Results from backups don't matter; if they finish first
1040         # they will move the result_file to this machine and let
1041         # the original pick them up and unpickle them.
1042
1043     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1044         is_original = bundle.src_bundle is None
1045         bundle.worker = None
1046         avoid_last_machine = bundle.machine
1047         bundle.machine = None
1048         bundle.username = None
1049         bundle.failure_count += 1
1050         if is_original:
1051             retry_limit = 3
1052         else:
1053             retry_limit = 2
1054
1055         if bundle.failure_count > retry_limit:
1056             logger.error(
1057                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1058             )
1059             if is_original:
1060                 raise RemoteExecutorException(
1061                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1062                 )
1063             else:
1064                 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
1065             return None
1066         else:
1067             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1068             logger.warning(msg)
1069             warnings.warn(msg)
1070             return self.launch(bundle, avoid_last_machine)
1071
1072     @overrides
1073     def submit(self,
1074                function: Callable,
1075                *args,
1076                **kwargs) -> fut.Future:
1077         pickle = make_cloud_pickle(function, *args, **kwargs)
1078         bundle = self.create_original_bundle(pickle, function.__name__)
1079         self.total_bundles_submitted += 1
1080         return self._helper_executor.submit(self.launch, bundle)
1081
1082     @overrides
1083     def shutdown(self, wait=True) -> None:
1084         self._helper_executor.shutdown(wait)
1085         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1086         print(self.histogram)
1087
1088
1089 @singleton
1090 class DefaultExecutors(object):
1091     def __init__(self):
1092         self.thread_executor: Optional[ThreadExecutor] = None
1093         self.process_executor: Optional[ProcessExecutor] = None
1094         self.remote_executor: Optional[RemoteExecutor] = None
1095
1096     def ping(self, host) -> bool:
1097         logger.debug(f'RUN> ping -c 1 {host}')
1098         try:
1099             x = cmd_with_timeout(
1100                 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1101                 timeout_seconds=1.0
1102             )
1103             return x == 0
1104         except Exception:
1105             return False
1106
1107     def thread_pool(self) -> ThreadExecutor:
1108         if self.thread_executor is None:
1109             self.thread_executor = ThreadExecutor()
1110         return self.thread_executor
1111
1112     def process_pool(self) -> ProcessExecutor:
1113         if self.process_executor is None:
1114             self.process_executor = ProcessExecutor()
1115         return self.process_executor
1116
1117     def remote_pool(self) -> RemoteExecutor:
1118         if self.remote_executor is None:
1119             logger.info('Looking for some helper machines...')
1120             pool: List[RemoteWorkerRecord] = []
1121             if self.ping('cheetah.house'):
1122                 logger.info('Found cheetah.house')
1123                 pool.append(
1124                     RemoteWorkerRecord(
1125                         username = 'scott',
1126                         machine = 'cheetah.house',
1127                         weight = 25,
1128                         count = 6,
1129                     ),
1130                 )
1131             if self.ping('meerkat.cabin'):
1132                 logger.info('Found meerkat.cabin')
1133                 pool.append(
1134                     RemoteWorkerRecord(
1135                         username = 'scott',
1136                         machine = 'meerkat.cabin',
1137                         weight = 5,
1138                         count = 2,
1139                     ),
1140                 )
1141             # if self.ping('kiosk.house'):
1142             #     logger.info('Found kiosk.house')
1143             #     pool.append(
1144             #         RemoteWorkerRecord(
1145             #             username = 'pi',
1146             #             machine = 'kiosk.house',
1147             #             weight = 1,
1148             #             count = 2,
1149             #         ),
1150             #     )
1151             if self.ping('hero.house'):
1152                 logger.info('Found hero.house')
1153                 pool.append(
1154                     RemoteWorkerRecord(
1155                         username = 'scott',
1156                         machine = 'hero.house',
1157                         weight = 30,
1158                         count = 10,
1159                     ),
1160                 )
1161             if self.ping('puma.cabin'):
1162                 logger.info('Found puma.cabin')
1163                 pool.append(
1164                     RemoteWorkerRecord(
1165                         username = 'scott',
1166                         machine = 'puma.cabin',
1167                         weight = 25,
1168                         count = 6,
1169                     ),
1170                 )
1171             if self.ping('backup.house'):
1172                 logger.info('Found backup.house')
1173                 pool.append(
1174                     RemoteWorkerRecord(
1175                         username = 'scott',
1176                         machine = 'backup.house',
1177                         weight = 3,
1178                         count = 2,
1179                     ),
1180                 )
1181
1182             # The controller machine has a lot to do; go easy on it.
1183             for record in pool:
1184                 if record.machine == platform.node() and record.count > 1:
1185                     logger.info(f'Reducing workload for {record.machine}.')
1186                     record.count = 1
1187
1188             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1189             policy.register_worker_pool(pool)
1190             self.remote_executor = RemoteExecutor(pool, policy)
1191         return self.remote_executor