Scale back warnings.warn and add stacklevels= where appropriate.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Executors ({__file__})",
34     "Args related to processing executors."
35 )
36 parser.add_argument(
37     '--executors_threadpool_size',
38     type=int,
39     metavar='#THREADS',
40     help='Number of threads in the default threadpool, leave unset for default',
41     default=None
42 )
43 parser.add_argument(
44     '--executors_processpool_size',
45     type=int,
46     metavar='#PROCESSES',
47     help='Number of processes in the default processpool, leave unset for default',
48     default=None,
49 )
50 parser.add_argument(
51     '--executors_schedule_remote_backups',
52     default=True,
53     action=argparse_utils.ActionNoYes,
54     help='Should we schedule duplicative backup work if a remote bundle is slow',
55 )
56 parser.add_argument(
57     '--executors_max_bundle_failures',
58     type=int,
59     default=3,
60     metavar='#FAILURES',
61     help='Maximum number of failures before giving up on a bundle',
62 )
63
64 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
65 SSH = 'ssh -oForwardX11=no'
66
67
68 def make_cloud_pickle(fun, *args, **kwargs):
69     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
70     return cloudpickle.dumps((fun, args, kwargs))
71
72
73 class BaseExecutor(ABC):
74     def __init__(self, *, title=''):
75         self.title = title
76         self.task_count = 0
77         self.histogram = hist.SimpleHistogram(
78             hist.SimpleHistogram.n_evenly_spaced_buckets(
79                 int(0), int(500), 50
80             )
81         )
82
83     @abstractmethod
84     def submit(self,
85                function: Callable,
86                *args,
87                **kwargs) -> fut.Future:
88         pass
89
90     @abstractmethod
91     def shutdown(self,
92                  wait: bool = True) -> None:
93         pass
94
95     def adjust_task_count(self, delta: int) -> None:
96         self.task_count += delta
97         logger.debug(f'Executor current task count is {self.task_count}')
98
99
100 class ThreadExecutor(BaseExecutor):
101     def __init__(self,
102                  max_workers: Optional[int] = None):
103         super().__init__()
104         workers = None
105         if max_workers is not None:
106             workers = max_workers
107         elif 'executors_threadpool_size' in config.config:
108             workers = config.config['executors_threadpool_size']
109         logger.debug(f'Creating threadpool executor with {workers} workers')
110         self._thread_pool_executor = fut.ThreadPoolExecutor(
111             max_workers=workers,
112             thread_name_prefix="thread_executor_helper"
113         )
114
115     def run_local_bundle(self, fun, *args, **kwargs):
116         logger.debug(f"Running local bundle at {fun.__name__}")
117         start = time.time()
118         result = fun(*args, **kwargs)
119         end = time.time()
120         self.adjust_task_count(-1)
121         duration = end - start
122         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
123         self.histogram.add_item(duration)
124         return result
125
126     @overrides
127     def submit(self,
128                function: Callable,
129                *args,
130                **kwargs) -> fut.Future:
131         self.adjust_task_count(+1)
132         newargs = []
133         newargs.append(function)
134         for arg in args:
135             newargs.append(arg)
136         return self._thread_pool_executor.submit(
137             self.run_local_bundle,
138             *newargs,
139             **kwargs)
140
141     @overrides
142     def shutdown(self,
143                  wait = True) -> None:
144         logger.debug(f'Shutting down threadpool executor {self.title}')
145         print(self.histogram)
146         self._thread_pool_executor.shutdown(wait)
147
148
149 class ProcessExecutor(BaseExecutor):
150     def __init__(self,
151                  max_workers=None):
152         super().__init__()
153         workers = None
154         if max_workers is not None:
155             workers = max_workers
156         elif 'executors_processpool_size' in config.config:
157             workers = config.config['executors_processpool_size']
158         logger.debug(f'Creating processpool executor with {workers} workers.')
159         self._process_executor = fut.ProcessPoolExecutor(
160             max_workers=workers,
161         )
162
163     def run_cloud_pickle(self, pickle):
164         fun, args, kwargs = cloudpickle.loads(pickle)
165         logger.debug(f"Running pickled bundle at {fun.__name__}")
166         result = fun(*args, **kwargs)
167         self.adjust_task_count(-1)
168         return result
169
170     @overrides
171     def submit(self,
172                function: Callable,
173                *args,
174                **kwargs) -> fut.Future:
175         start = time.time()
176         self.adjust_task_count(+1)
177         pickle = make_cloud_pickle(function, *args, **kwargs)
178         result = self._process_executor.submit(
179             self.run_cloud_pickle,
180             pickle
181         )
182         result.add_done_callback(
183             lambda _: self.histogram.add_item(
184                 time.time() - start
185             )
186         )
187         return result
188
189     @overrides
190     def shutdown(self, wait=True) -> None:
191         logger.debug(f'Shutting down processpool executor {self.title}')
192         self._process_executor.shutdown(wait)
193         print(self.histogram)
194
195     def __getstate__(self):
196         state = self.__dict__.copy()
197         state['_process_executor'] = None
198         return state
199
200
201 class RemoteExecutorException(Exception):
202     """Thrown when a bundle cannot be executed despite several retries."""
203     pass
204
205
206 @dataclass
207 class RemoteWorkerRecord:
208     username: str
209     machine: str
210     weight: int
211     count: int
212
213     def __hash__(self):
214         return hash((self.username, self.machine))
215
216     def __repr__(self):
217         return f'{self.username}@{self.machine}'
218
219
220 @dataclass
221 class BundleDetails:
222     pickled_code: bytes
223     uuid: str
224     fname: str
225     worker: Optional[RemoteWorkerRecord]
226     username: Optional[str]
227     machine: Optional[str]
228     hostname: str
229     code_file: str
230     result_file: str
231     pid: int
232     start_ts: float
233     end_ts: float
234     slower_than_local_p95: bool
235     slower_than_global_p95: bool
236     src_bundle: BundleDetails
237     is_cancelled: threading.Event
238     was_cancelled: bool
239     backup_bundles: Optional[List[BundleDetails]]
240     failure_count: int
241
242     def __repr__(self):
243         uuid = self.uuid
244         if uuid[-9:-2] == '_backup':
245             uuid = uuid[:-9]
246             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
247         else:
248             suffix = uuid[-6:]
249
250         colorz = [
251             fg('violet red'),
252             fg('red'),
253             fg('orange'),
254             fg('peach orange'),
255             fg('yellow'),
256             fg('marigold yellow'),
257             fg('green yellow'),
258             fg('tea green'),
259             fg('cornflower blue'),
260             fg('turquoise blue'),
261             fg('tropical blue'),
262             fg('lavender purple'),
263             fg('medium purple'),
264         ]
265         c = colorz[int(uuid[-2:], 16) % len(colorz)]
266         fname = self.fname if self.fname is not None else 'nofname'
267         machine = self.machine if self.machine is not None else 'nomachine'
268         return f'{c}{suffix}/{fname}/{machine}{reset()}'
269
270
271 class RemoteExecutorStatus:
272     def __init__(self, total_worker_count: int) -> None:
273         self.worker_count = total_worker_count
274         self.known_workers: Set[RemoteWorkerRecord] = set()
275         self.start_per_bundle: Dict[str, float] = defaultdict(float)
276         self.end_per_bundle: Dict[str, float] = defaultdict(float)
277         self.finished_bundle_timings_per_worker: Dict[
278             RemoteWorkerRecord,
279             List[float]
280         ] = {}
281         self.in_flight_bundles_by_worker: Dict[
282             RemoteWorkerRecord,
283             Set[str]
284         ] = {}
285         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
286         self.finished_bundle_timings: List[float] = []
287         self.last_periodic_dump: Optional[float] = None
288         self.total_bundles_submitted = 0
289
290         # Protects reads and modification using self.  Also used
291         # as a memory fence for modifications to bundle.
292         self.lock = threading.Lock()
293
294     def record_acquire_worker(
295             self,
296             worker: RemoteWorkerRecord,
297             uuid: str
298     ) -> None:
299         with self.lock:
300             self.record_acquire_worker_already_locked(
301                 worker,
302                 uuid
303             )
304
305     def record_acquire_worker_already_locked(
306             self,
307             worker: RemoteWorkerRecord,
308             uuid: str
309     ) -> None:
310         assert self.lock.locked()
311         self.known_workers.add(worker)
312         self.start_per_bundle[uuid] = None
313         x = self.in_flight_bundles_by_worker.get(worker, set())
314         x.add(uuid)
315         self.in_flight_bundles_by_worker[worker] = x
316
317     def record_bundle_details(
318             self,
319             details: BundleDetails) -> None:
320         with self.lock:
321             self.record_bundle_details_already_locked(details)
322
323     def record_bundle_details_already_locked(
324             self,
325             details: BundleDetails) -> None:
326         assert self.lock.locked()
327         self.bundle_details_by_uuid[details.uuid] = details
328
329     def record_release_worker(
330             self,
331             worker: RemoteWorkerRecord,
332             uuid: str,
333             was_cancelled: bool,
334     ) -> None:
335         with self.lock:
336             self.record_release_worker_already_locked(
337                 worker, uuid, was_cancelled
338             )
339
340     def record_release_worker_already_locked(
341             self,
342             worker: RemoteWorkerRecord,
343             uuid: str,
344             was_cancelled: bool,
345     ) -> None:
346         assert self.lock.locked()
347         ts = time.time()
348         self.end_per_bundle[uuid] = ts
349         self.in_flight_bundles_by_worker[worker].remove(uuid)
350         if not was_cancelled:
351             bundle_latency = ts - self.start_per_bundle[uuid]
352             x = self.finished_bundle_timings_per_worker.get(worker, list())
353             x.append(bundle_latency)
354             self.finished_bundle_timings_per_worker[worker] = x
355             self.finished_bundle_timings.append(bundle_latency)
356
357     def record_processing_began(self, uuid: str):
358         with self.lock:
359             self.start_per_bundle[uuid] = time.time()
360
361     def total_in_flight(self) -> int:
362         assert self.lock.locked()
363         total_in_flight = 0
364         for worker in self.known_workers:
365             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
366         return total_in_flight
367
368     def total_idle(self) -> int:
369         assert self.lock.locked()
370         return self.worker_count - self.total_in_flight()
371
372     def __repr__(self):
373         assert self.lock.locked()
374         ts = time.time()
375         total_finished = len(self.finished_bundle_timings)
376         total_in_flight = self.total_in_flight()
377         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
378         qall = None
379         if len(self.finished_bundle_timings) > 1:
380             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
381             ret += (
382                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
383                 f'✅={total_finished}/{self.total_bundles_submitted}, '
384                 f'💻n={total_in_flight}/{self.worker_count}\n'
385             )
386         else:
387             ret += (
388                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
389                 f'💻n={total_in_flight}/{self.worker_count}\n'
390             )
391
392         for worker in self.known_workers:
393             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
394             timings = self.finished_bundle_timings_per_worker.get(worker, [])
395             count = len(timings)
396             qworker = None
397             if count > 1:
398                 qworker = numpy.quantile(timings, [0.5, 0.95])
399                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
400             else:
401                 ret += '\n'
402             if count > 0:
403                 ret += f'    ...finished {count} total bundle(s) so far\n'
404             in_flight = len(self.in_flight_bundles_by_worker[worker])
405             if in_flight > 0:
406                 ret += f'    ...{in_flight} bundles currently in flight:\n'
407                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
408                     details = self.bundle_details_by_uuid.get(
409                         bundle_uuid,
410                         None
411                     )
412                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
413                     if self.start_per_bundle[bundle_uuid] is not None:
414                         sec = ts - self.start_per_bundle[bundle_uuid]
415                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
416                     else:
417                         ret += f'       {details} setting up / copying data...'
418                         sec = 0.0
419
420                     if qworker is not None:
421                         if sec > qworker[1]:
422                             ret += f'{bg("red")}>💻p95{reset()} '
423                             if details is not None:
424                                 details.slower_than_local_p95 = True
425                         else:
426                             if details is not None:
427                                 details.slower_than_local_p95 = False
428
429                     if qall is not None:
430                         if sec > qall[1]:
431                             ret += f'{bg("red")}>∀p95{reset()} '
432                             if details is not None:
433                                 details.slower_than_global_p95 = True
434                         else:
435                             details.slower_than_global_p95 = False
436                     ret += '\n'
437         return ret
438
439     def periodic_dump(self, total_bundles_submitted: int) -> None:
440         assert self.lock.locked()
441         self.total_bundles_submitted = total_bundles_submitted
442         ts = time.time()
443         if (
444                 self.last_periodic_dump is None
445                 or ts - self.last_periodic_dump > 5.0
446         ):
447             print(self)
448             self.last_periodic_dump = ts
449
450
451 class RemoteWorkerSelectionPolicy(ABC):
452     def register_worker_pool(self, workers):
453         self.workers = workers
454
455     @abstractmethod
456     def is_worker_available(self) -> bool:
457         pass
458
459     @abstractmethod
460     def acquire_worker(
461             self,
462             machine_to_avoid = None
463     ) -> Optional[RemoteWorkerRecord]:
464         pass
465
466
467 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
468     @overrides
469     def is_worker_available(self) -> bool:
470         for worker in self.workers:
471             if worker.count > 0:
472                 return True
473         return False
474
475     @overrides
476     def acquire_worker(
477             self,
478             machine_to_avoid = None
479     ) -> Optional[RemoteWorkerRecord]:
480         grabbag = []
481         for worker in self.workers:
482             for x in range(0, worker.count):
483                 for y in range(0, worker.weight):
484                     grabbag.append(worker)
485
486         for _ in range(0, 5):
487             random.shuffle(grabbag)
488             worker = grabbag[0]
489             if worker.machine != machine_to_avoid or _ > 2:
490                 if worker.count > 0:
491                     worker.count -= 1
492                     logger.debug(f'Selected worker {worker}')
493                     return worker
494         msg = 'Unexpectedly could not find a worker, retrying...'
495         logger.warning(msg)
496         return None
497
498
499 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
500     def __init__(self) -> None:
501         self.index = 0
502
503     @overrides
504     def is_worker_available(self) -> bool:
505         for worker in self.workers:
506             if worker.count > 0:
507                 return True
508         return False
509
510     @overrides
511     def acquire_worker(
512             self,
513             machine_to_avoid: str = None
514     ) -> Optional[RemoteWorkerRecord]:
515         x = self.index
516         while True:
517             worker = self.workers[x]
518             if worker.count > 0:
519                 worker.count -= 1
520                 x += 1
521                 if x >= len(self.workers):
522                     x = 0
523                 self.index = x
524                 logger.debug(f'Selected worker {worker}')
525                 return worker
526             x += 1
527             if x >= len(self.workers):
528                 x = 0
529             if x == self.index:
530                 msg = 'Unexpectedly could not find a worker, retrying...'
531                 logger.warning(msg)
532                 return None
533
534
535 class RemoteExecutor(BaseExecutor):
536     def __init__(self,
537                  workers: List[RemoteWorkerRecord],
538                  policy: RemoteWorkerSelectionPolicy) -> None:
539         super().__init__()
540         self.workers = workers
541         self.policy = policy
542         self.worker_count = 0
543         for worker in self.workers:
544             self.worker_count += worker.count
545         if self.worker_count <= 0:
546             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
547             logger.critical(msg)
548             raise RemoteExecutorException(msg)
549         self.policy.register_worker_pool(self.workers)
550         self.cv = threading.Condition()
551         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
552         self._helper_executor = fut.ThreadPoolExecutor(
553             thread_name_prefix="remote_executor_helper",
554             max_workers=self.worker_count,
555         )
556         self.status = RemoteExecutorStatus(self.worker_count)
557         self.total_bundles_submitted = 0
558         self.backup_lock = threading.Lock()
559         self.last_backup = None
560
561     def is_worker_available(self) -> bool:
562         return self.policy.is_worker_available()
563
564     def acquire_worker(
565             self,
566             machine_to_avoid: str = None
567     ) -> Optional[RemoteWorkerRecord]:
568         return self.policy.acquire_worker(machine_to_avoid)
569
570     def find_available_worker_or_block(
571             self,
572             machine_to_avoid: str = None
573     ) -> RemoteWorkerRecord:
574         with self.cv:
575             while not self.is_worker_available():
576                 self.cv.wait()
577             worker = self.acquire_worker(machine_to_avoid)
578             if worker is not None:
579                 return worker
580         msg = "We should never reach this point in the code"
581         logger.critical(msg)
582         raise Exception(msg)
583
584     def release_worker(self, worker: RemoteWorkerRecord) -> None:
585         logger.debug(f'Released worker {worker}')
586         with self.cv:
587             worker.count += 1
588             self.cv.notify()
589
590     def heartbeat(self) -> None:
591         with self.status.lock:
592             # Regular progress report
593             self.status.periodic_dump(self.total_bundles_submitted)
594
595             # Look for bundles to reschedule.
596             num_done = len(self.status.finished_bundle_timings)
597             num_idle_workers = self.worker_count - self.task_count
598             now = time.time()
599             if (
600                     config.config['executors_schedule_remote_backups']
601                     and num_done > 2
602                     and num_idle_workers > 1
603                     and (self.last_backup is None or (now - self.last_backup > 1.0))
604                     and self.backup_lock.acquire(blocking=False)
605             ):
606                 try:
607                     assert self.backup_lock.locked()
608
609                     bundle_to_backup = None
610                     best_score = None
611                     for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
612                         # Prefer to schedule backups of bundles on slower machines.
613                         base_score = 0
614                         for record in self.workers:
615                             if worker.machine == record.machine:
616                                 base_score = float(record.weight)
617                                 base_score = 1.0 / base_score
618                                 base_score *= 200.0
619                                 base_score = int(base_score)
620                                 break
621
622                         for uuid in bundle_uuids:
623                             bundle = self.status.bundle_details_by_uuid.get(uuid, None)
624                             if (
625                                     bundle is not None
626                                     and bundle.src_bundle is None
627                                     and bundle.backup_bundles is not None
628                             ):
629                                 score = base_score
630
631                                 # Schedule backups of bundles running longer; especially those
632                                 # that are unexpectedly slow.
633                                 start_ts = self.status.start_per_bundle[uuid]
634                                 if start_ts is not None:
635                                     runtime = now - start_ts
636                                     score += runtime
637                                     logger.debug(f'score[{bundle}] => {score}  # latency boost')
638
639                                     if bundle.slower_than_local_p95:
640                                         score += runtime / 2
641                                         logger.debug(f'score[{bundle}] => {score}  # >worker p95')
642
643                                     if bundle.slower_than_global_p95:
644                                         score += runtime / 2
645                                         logger.debug(f'score[{bundle}] => {score}  # >global p95')
646
647                                 # Prefer backups of bundles that don't have backups already.
648                                 backup_count = len(bundle.backup_bundles)
649                                 if backup_count == 0:
650                                     score *= 2
651                                 elif backup_count == 1:
652                                     score /= 2
653                                 elif backup_count == 2:
654                                     score /= 8
655                                 else:
656                                     score = 0
657                                 logger.debug(f'score[{bundle}] => {score}  # {backup_count} dup backup factor')
658
659                                 if (
660                                         score != 0
661                                         and (best_score is None or score > best_score)
662                                 ):
663                                     bundle_to_backup = bundle
664                                     assert bundle is not None
665                                     assert bundle.backup_bundles is not None
666                                     assert bundle.src_bundle is None
667                                     best_score = score
668
669                     if bundle_to_backup is not None:
670                         self.last_backup = now
671                         logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====')
672                         self.schedule_backup_for_bundle(bundle_to_backup)
673                 finally:
674                     self.backup_lock.release()
675
676     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
677         with self.status.lock:
678             if bundle.is_cancelled.wait(timeout=0.0):
679                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
680                 bundle.was_cancelled = True
681                 return True
682         return False
683
684     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
685         """Find a worker for bundle or block until one is available."""
686         self.adjust_task_count(+1)
687         uuid = bundle.uuid
688         hostname = bundle.hostname
689         avoid_machine = override_avoid_machine
690         is_original = bundle.src_bundle is None
691
692         # Try not to schedule a backup on the same host as the original.
693         if avoid_machine is None and bundle.src_bundle is not None:
694             avoid_machine = bundle.src_bundle.machine
695         worker = None
696         while worker is None:
697             worker = self.find_available_worker_or_block(avoid_machine)
698
699         # Ok, found a worker.
700         bundle.worker = worker
701         machine = bundle.machine = worker.machine
702         username = bundle.username = worker.username
703
704         self.status.record_acquire_worker(worker, uuid)
705         logger.debug(f'{bundle}: Running bundle on {worker}...')
706
707         # Before we do any work, make sure the bundle is still viable.
708         if self.check_if_cancelled(bundle):
709             try:
710                 return self.post_launch_work(bundle)
711             except Exception as e:
712                 logger.exception(e)
713                 logger.error(
714                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
715                 )
716                 assert bundle.worker is not None
717                 self.status.record_release_worker(
718                     bundle.worker,
719                     bundle.uuid,
720                     True,
721                 )
722                 self.release_worker(bundle.worker)
723                 self.adjust_task_count(-1)
724                 if is_original:
725                     # Weird.  We are the original owner of this
726                     # bundle.  For it to have been cancelled, a backup
727                     # must have already started and completed before
728                     # we even for started.  Moreover, the backup says
729                     # it is done but we can't find the results it
730                     # should have copied over.  Reschedule the whole
731                     # thing.
732                     return self.emergency_retry_nasty_bundle(bundle)
733                 else:
734                     # Expected(?).  We're a backup and our bundle is
735                     # cancelled before we even got started.  Something
736                     # went bad in post_launch_work (I acutually don't
737                     # see what?) but probably not worth worrying
738                     # about.
739                     return None
740
741         # Send input code / data to worker machine if it's not local.
742         if hostname not in machine:
743             try:
744                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
745                 start_ts = time.time()
746                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
747                 run_silently(cmd)
748                 xfer_latency = time.time() - start_ts
749                 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
750             except Exception as e:
751                 assert bundle.worker is not None
752                 self.status.record_release_worker(
753                     bundle.worker,
754                     bundle.uuid,
755                     True,
756                 )
757                 self.release_worker(bundle.worker)
758                 self.adjust_task_count(-1)
759                 if is_original:
760                     # Weird.  We tried to copy the code to the worker and it failed...
761                     # And we're the original bundle.  We have to retry.
762                     logger.exception(e)
763                     logger.error(
764                         f'{bundle}: Failed to send instructions to the worker machine?! ' +
765                         'This is not expected; we\'re the original bundle so this shouldn\'t ' +
766                         'be a race condition.  Attempting an emergency retry...'
767                     )
768                     return self.emergency_retry_nasty_bundle(bundle)
769                 else:
770                     # This is actually expected; we're a backup.
771                     # There's a race condition where someone else
772                     # already finished the work and removed the source
773                     # code file before we could copy it.  No biggie.
774                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
775                     msg += 'We\'re a backup and this may be caused by the original (or some '
776                     msg += 'other backup) already finishing this work.  Ignoring this.'
777                     logger.warning(msg)
778                     return None
779
780         # Kick off the work.  Note that if this fails we let
781         # wait_for_process deal with it.
782         self.status.record_processing_began(uuid)
783         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
784                f'"source py38-venv/bin/activate &&'
785                f' /home/scott/lib/python_modules/remote_worker.py'
786                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
787         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
788         p = cmd_in_background(cmd, silent=True)
789         bundle.pid = pid = p.pid
790         logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.')
791         return self.wait_for_process(p, bundle, 0)
792
793     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
794         machine = bundle.machine
795         pid = p.pid
796         if depth > 3:
797             logger.error(
798                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
799             )
800             p.terminate()
801             self.status.record_release_worker(
802                 bundle.worker,
803                 bundle.uuid,
804                 True,
805             )
806             self.release_worker(bundle.worker)
807             self.adjust_task_count(-1)
808             return self.emergency_retry_nasty_bundle(bundle)
809
810         # Spin until either the ssh job we scheduled finishes the
811         # bundle or some backup worker signals that they finished it
812         # before we could.
813         while True:
814             try:
815                 p.wait(timeout=0.25)
816             except subprocess.TimeoutExpired:
817                 self.heartbeat()
818                 if self.check_if_cancelled(bundle):
819                     logger.info(
820                         f'{bundle}: another worker finished bundle, checking it out...'
821                     )
822                     break
823             else:
824                 logger.info(
825                     f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..."
826                 )
827                 p = None
828                 break
829
830         # If we get here we believe the bundle is done; either the ssh
831         # subprocess finished (hopefully successfully) or we noticed
832         # that some other worker seems to have completed the bundle
833         # and we're bailing out.
834         try:
835             ret = self.post_launch_work(bundle)
836             if ret is not None and p is not None:
837                 p.terminate()
838             return ret
839
840         # Something went wrong; e.g. we could not copy the results
841         # back, cleanup after ourselves on the remote machine, or
842         # unpickle the results we got from the remove machine.  If we
843         # still have an active ssh subprocess, keep waiting on it.
844         # Otherwise, time for an emergency reschedule.
845         except Exception as e:
846             logger.exception(e)
847             logger.error(f'{bundle}: Something unexpected just happened...')
848             if p is not None:
849                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
850                 logger.warning(msg)
851                 return self.wait_for_process(p, bundle, depth + 1)
852             else:
853                 self.status.record_release_worker(
854                     bundle.worker,
855                     bundle.uuid,
856                     True,
857                 )
858                 self.release_worker(bundle.worker)
859                 self.adjust_task_count(-1)
860                 return self.emergency_retry_nasty_bundle(bundle)
861
862     def post_launch_work(self, bundle: BundleDetails) -> Any:
863         with self.status.lock:
864             is_original = bundle.src_bundle is None
865             was_cancelled = bundle.was_cancelled
866             username = bundle.username
867             machine = bundle.machine
868             result_file = bundle.result_file
869             code_file = bundle.code_file
870
871             # Whether original or backup, if we finished first we must
872             # fetch the results if the computation happened on a
873             # remote machine.
874             bundle.end_ts = time.time()
875             if not was_cancelled:
876                 assert bundle.machine is not None
877                 if bundle.hostname not in bundle.machine:
878                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
879                     logger.info(
880                         f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
881                     )
882
883                     # If either of these throw they are handled in
884                     # wait_for_process.
885                     run_silently(cmd)
886                     run_silently(f'{SSH} {username}@{machine}'
887                                  f' "/bin/rm -f {code_file} {result_file}"')
888                 dur = bundle.end_ts - bundle.start_ts
889                 self.histogram.add_item(dur)
890
891         # Only the original worker should unpickle the file contents
892         # though since it's the only one whose result matters.  The
893         # original is also the only job that may delete result_file
894         # from disk.  Note that the original may have been cancelled
895         # if one of the backups finished first; it still must read the
896         # result from disk.
897         if is_original:
898             logger.debug(f"{bundle}: Unpickling {result_file}.")
899             try:
900                 with open(result_file, 'rb') as rb:
901                     serialized = rb.read()
902                 result = cloudpickle.loads(serialized)
903             except Exception as e:
904                 msg = f'Failed to load {result_file}, this is bad news.'
905                 logger.critical(msg)
906                 self.status.record_release_worker(
907                     bundle.worker,
908                     bundle.uuid,
909                     True,
910                 )
911                 self.release_worker(bundle.worker)
912
913                 # Re-raise the exception; the code in wait_for_process may
914                 # decide to emergency_retry_nasty_bundle here.
915                 raise Exception(e)
916
917             logger.debug(
918                 f'Removing local (master) {code_file} and {result_file}.'
919             )
920             os.remove(f'{result_file}')
921             os.remove(f'{code_file}')
922
923             # Notify any backups that the original is done so they
924             # should stop ASAP.  Do this whether or not we
925             # finished first since there could be more than one
926             # backup.
927             if bundle.backup_bundles is not None:
928                 for backup in bundle.backup_bundles:
929                     logger.debug(
930                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
931                     )
932                     backup.is_cancelled.set()
933
934         # This is a backup job and, by now, we have already fetched
935         # the bundle results.
936         else:
937             # Backup results don't matter, they just need to leave the
938             # result file in the right place for their originals to
939             # read/unpickle later.
940             result = None
941
942             # Tell the original to stop if we finished first.
943             if not was_cancelled:
944                 logger.debug(
945                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
946                 )
947                 bundle.src_bundle.is_cancelled.set()
948
949         assert bundle.worker is not None
950         self.status.record_release_worker(
951             bundle.worker,
952             bundle.uuid,
953             was_cancelled,
954         )
955         self.release_worker(bundle.worker)
956         self.adjust_task_count(-1)
957         return result
958
959     def create_original_bundle(self, pickle, fname: str):
960         from string_utils import generate_uuid
961         uuid = generate_uuid(omit_dashes=True)
962         code_file = f'/tmp/{uuid}.code.bin'
963         result_file = f'/tmp/{uuid}.result.bin'
964
965         logger.debug(f'Writing pickled code to {code_file}')
966         with open(f'{code_file}', 'wb') as wb:
967             wb.write(pickle)
968
969         bundle = BundleDetails(
970             pickled_code = pickle,
971             uuid = uuid,
972             fname = fname,
973             worker = None,
974             username = None,
975             machine = None,
976             hostname = platform.node(),
977             code_file = code_file,
978             result_file = result_file,
979             pid = 0,
980             start_ts = time.time(),
981             end_ts = 0.0,
982             slower_than_local_p95 = False,
983             slower_than_global_p95 = False,
984             src_bundle = None,
985             is_cancelled = threading.Event(),
986             was_cancelled = False,
987             backup_bundles = [],
988             failure_count = 0,
989         )
990         self.status.record_bundle_details(bundle)
991         logger.debug(f'{bundle}: Created an original bundle')
992         return bundle
993
994     def create_backup_bundle(self, src_bundle: BundleDetails):
995         assert src_bundle.backup_bundles is not None
996         n = len(src_bundle.backup_bundles)
997         uuid = src_bundle.uuid + f'_backup#{n}'
998
999         backup_bundle = BundleDetails(
1000             pickled_code = src_bundle.pickled_code,
1001             uuid = uuid,
1002             fname = src_bundle.fname,
1003             worker = None,
1004             username = None,
1005             machine = None,
1006             hostname = src_bundle.hostname,
1007             code_file = src_bundle.code_file,
1008             result_file = src_bundle.result_file,
1009             pid = 0,
1010             start_ts = time.time(),
1011             end_ts = 0.0,
1012             slower_than_local_p95 = False,
1013             slower_than_global_p95 = False,
1014             src_bundle = src_bundle,
1015             is_cancelled = threading.Event(),
1016             was_cancelled = False,
1017             backup_bundles = None,    # backup backups not allowed
1018             failure_count = 0,
1019         )
1020         src_bundle.backup_bundles.append(backup_bundle)
1021         self.status.record_bundle_details_already_locked(backup_bundle)
1022         logger.debug(f'{backup_bundle}: Created a backup bundle')
1023         return backup_bundle
1024
1025     def schedule_backup_for_bundle(self,
1026                                    src_bundle: BundleDetails):
1027         assert self.status.lock.locked()
1028         assert src_bundle is not None
1029         backup_bundle = self.create_backup_bundle(src_bundle)
1030         logger.debug(
1031             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1032         )
1033         self._helper_executor.submit(self.launch, backup_bundle)
1034
1035         # Results from backups don't matter; if they finish first
1036         # they will move the result_file to this machine and let
1037         # the original pick them up and unpickle them.
1038
1039     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1040         is_original = bundle.src_bundle is None
1041         bundle.worker = None
1042         avoid_last_machine = bundle.machine
1043         bundle.machine = None
1044         bundle.username = None
1045         bundle.failure_count += 1
1046         if is_original:
1047             retry_limit = 3
1048         else:
1049             retry_limit = 2
1050
1051         if bundle.failure_count > retry_limit:
1052             logger.error(
1053                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1054             )
1055             if is_original:
1056                 raise RemoteExecutorException(
1057                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1058                 )
1059             else:
1060                 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
1061             return None
1062         else:
1063             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1064             logger.warning(msg)
1065             warnings.warn(msg)
1066             return self.launch(bundle, avoid_last_machine)
1067
1068     @overrides
1069     def submit(self,
1070                function: Callable,
1071                *args,
1072                **kwargs) -> fut.Future:
1073         pickle = make_cloud_pickle(function, *args, **kwargs)
1074         bundle = self.create_original_bundle(pickle, function.__name__)
1075         self.total_bundles_submitted += 1
1076         return self._helper_executor.submit(self.launch, bundle)
1077
1078     @overrides
1079     def shutdown(self, wait=True) -> None:
1080         self._helper_executor.shutdown(wait)
1081         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1082         print(self.histogram)
1083
1084
1085 @singleton
1086 class DefaultExecutors(object):
1087     def __init__(self):
1088         self.thread_executor: Optional[ThreadExecutor] = None
1089         self.process_executor: Optional[ProcessExecutor] = None
1090         self.remote_executor: Optional[RemoteExecutor] = None
1091
1092     def ping(self, host) -> bool:
1093         logger.debug(f'RUN> ping -c 1 {host}')
1094         try:
1095             x = cmd_with_timeout(
1096                 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1097                 timeout_seconds=1.0
1098             )
1099             return x == 0
1100         except Exception:
1101             return False
1102
1103     def thread_pool(self) -> ThreadExecutor:
1104         if self.thread_executor is None:
1105             self.thread_executor = ThreadExecutor()
1106         return self.thread_executor
1107
1108     def process_pool(self) -> ProcessExecutor:
1109         if self.process_executor is None:
1110             self.process_executor = ProcessExecutor()
1111         return self.process_executor
1112
1113     def remote_pool(self) -> RemoteExecutor:
1114         if self.remote_executor is None:
1115             logger.info('Looking for some helper machines...')
1116             pool: List[RemoteWorkerRecord] = []
1117             if self.ping('cheetah.house'):
1118                 logger.info('Found cheetah.house')
1119                 pool.append(
1120                     RemoteWorkerRecord(
1121                         username = 'scott',
1122                         machine = 'cheetah.house',
1123                         weight = 25,
1124                         count = 6,
1125                     ),
1126                 )
1127             if self.ping('meerkat.cabin'):
1128                 logger.info('Found meerkat.cabin')
1129                 pool.append(
1130                     RemoteWorkerRecord(
1131                         username = 'scott',
1132                         machine = 'meerkat.cabin',
1133                         weight = 5,
1134                         count = 2,
1135                     ),
1136                 )
1137             # if self.ping('kiosk.house'):
1138             #     logger.info('Found kiosk.house')
1139             #     pool.append(
1140             #         RemoteWorkerRecord(
1141             #             username = 'pi',
1142             #             machine = 'kiosk.house',
1143             #             weight = 1,
1144             #             count = 2,
1145             #         ),
1146             #     )
1147             if self.ping('hero.house'):
1148                 logger.info('Found hero.house')
1149                 pool.append(
1150                     RemoteWorkerRecord(
1151                         username = 'scott',
1152                         machine = 'hero.house',
1153                         weight = 30,
1154                         count = 10,
1155                     ),
1156                 )
1157             if self.ping('puma.cabin'):
1158                 logger.info('Found puma.cabin')
1159                 pool.append(
1160                     RemoteWorkerRecord(
1161                         username = 'scott',
1162                         machine = 'puma.cabin',
1163                         weight = 25,
1164                         count = 6,
1165                     ),
1166                 )
1167             if self.ping('backup.house'):
1168                 logger.info('Found backup.house')
1169                 pool.append(
1170                     RemoteWorkerRecord(
1171                         username = 'scott',
1172                         machine = 'backup.house',
1173                         weight = 3,
1174                         count = 2,
1175                     ),
1176                 )
1177
1178             # The controller machine has a lot to do; go easy on it.
1179             for record in pool:
1180                 if record.machine == platform.node() and record.count > 1:
1181                     logger.info(f'Reducing workload for {record.machine}.')
1182                     record.count = 1
1183
1184             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1185             policy.register_worker_pool(pool)
1186             self.remote_executor = RemoteExecutor(pool, policy)
1187         return self.remote_executor