Add doctests to some of this stuff.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20
21 from ansi import bg, fg, underline, reset
22 import argparse_utils
23 import config
24 from exec_utils import run_silently, cmd_in_background
25 from decorator_utils import singleton
26 import histogram as hist
27
28 logger = logging.getLogger(__name__)
29
30 parser = config.add_commandline_args(
31     f"Executors ({__file__})",
32     "Args related to processing executors."
33 )
34 parser.add_argument(
35     '--executors_threadpool_size',
36     type=int,
37     metavar='#THREADS',
38     help='Number of threads in the default threadpool, leave unset for default',
39     default=None
40 )
41 parser.add_argument(
42     '--executors_processpool_size',
43     type=int,
44     metavar='#PROCESSES',
45     help='Number of processes in the default processpool, leave unset for default',
46     default=None,
47 )
48 parser.add_argument(
49     '--executors_schedule_remote_backups',
50     default=True,
51     action=argparse_utils.ActionNoYes,
52     help='Should we schedule duplicative backup work if a remote bundle is slow',
53 )
54 parser.add_argument(
55     '--executors_max_bundle_failures',
56     type=int,
57     default=3,
58     metavar='#FAILURES',
59     help='Maximum number of failures before giving up on a bundle',
60 )
61
62 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
63 SSH = 'ssh -oForwardX11=no'
64
65
66 def make_cloud_pickle(fun, *args, **kwargs):
67     logger.info(f"Making cloudpickled bundle at {fun.__name__}")
68     return cloudpickle.dumps((fun, args, kwargs))
69
70
71 class BaseExecutor(ABC):
72     def __init__(self, *, title=''):
73         self.title = title
74         self.task_count = 0
75         self.histogram = hist.SimpleHistogram(
76             hist.SimpleHistogram.n_evenly_spaced_buckets(
77                 int(0), int(500), 50
78             )
79         )
80
81     @abstractmethod
82     def submit(self,
83                function: Callable,
84                *args,
85                **kwargs) -> fut.Future:
86         pass
87
88     @abstractmethod
89     def shutdown(self,
90                  wait: bool = True) -> None:
91         pass
92
93     def adjust_task_count(self, delta: int) -> None:
94         self.task_count += delta
95         logger.debug(f'Executor current task count is {self.task_count}')
96
97
98 class ThreadExecutor(BaseExecutor):
99     def __init__(self,
100                  max_workers: Optional[int] = None):
101         super().__init__()
102         workers = None
103         if max_workers is not None:
104             workers = max_workers
105         elif 'executors_threadpool_size' in config.config:
106             workers = config.config['executors_threadpool_size']
107         logger.debug(f'Creating threadpool executor with {workers} workers')
108         self._thread_pool_executor = fut.ThreadPoolExecutor(
109             max_workers=workers,
110             thread_name_prefix="thread_executor_helper"
111         )
112
113     def run_local_bundle(self, fun, *args, **kwargs):
114         logger.debug(f"Running local bundle at {fun.__name__}")
115         start = time.time()
116         result = fun(*args, **kwargs)
117         end = time.time()
118         self.adjust_task_count(-1)
119         duration = end - start
120         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
121         self.histogram.add_item(duration)
122         return result
123
124     def submit(self,
125                function: Callable,
126                *args,
127                **kwargs) -> fut.Future:
128         self.adjust_task_count(+1)
129         newargs = []
130         newargs.append(function)
131         for arg in args:
132             newargs.append(arg)
133         return self._thread_pool_executor.submit(
134             self.run_local_bundle,
135             *newargs,
136             **kwargs)
137
138     def shutdown(self,
139                  wait = True) -> None:
140         logger.debug(f'Shutting down threadpool executor {self.title}')
141         print(self.histogram)
142         self._thread_pool_executor.shutdown(wait)
143
144
145 class ProcessExecutor(BaseExecutor):
146     def __init__(self,
147                  max_workers=None):
148         super().__init__()
149         workers = None
150         if max_workers is not None:
151             workers = max_workers
152         elif 'executors_processpool_size' in config.config:
153             workers = config.config['executors_processpool_size']
154         logger.debug(f'Creating processpool executor with {workers} workers.')
155         self._process_executor = fut.ProcessPoolExecutor(
156             max_workers=workers,
157         )
158
159     def run_cloud_pickle(self, pickle):
160         fun, args, kwargs = cloudpickle.loads(pickle)
161         logger.debug(f"Running pickled bundle at {fun.__name__}")
162         result = fun(*args, **kwargs)
163         self.adjust_task_count(-1)
164         return result
165
166     def submit(self,
167                function: Callable,
168                *args,
169                **kwargs) -> fut.Future:
170         start = time.time()
171         self.adjust_task_count(+1)
172         pickle = make_cloud_pickle(function, *args, **kwargs)
173         result = self._process_executor.submit(
174             self.run_cloud_pickle,
175             pickle
176         )
177         result.add_done_callback(
178             lambda _: self.histogram.add_item(
179                 time.time() - start
180             )
181         )
182         return result
183
184     def shutdown(self, wait=True) -> None:
185         logger.debug(f'Shutting down processpool executor {self.title}')
186         self._process_executor.shutdown(wait)
187         print(self.histogram)
188
189     def __getstate__(self):
190         state = self.__dict__.copy()
191         state['_process_executor'] = None
192         return state
193
194
195 @dataclass
196 class RemoteWorkerRecord:
197     username: str
198     machine: str
199     weight: int
200     count: int
201
202     def __hash__(self):
203         return hash((self.username, self.machine))
204
205     def __repr__(self):
206         return f'{self.username}@{self.machine}'
207
208
209 @dataclass
210 class BundleDetails:
211     pickled_code: bytes
212     uuid: str
213     worker: Optional[RemoteWorkerRecord]
214     username: Optional[str]
215     machine: Optional[str]
216     hostname: str
217     code_file: str
218     result_file: str
219     pid: int
220     start_ts: float
221     end_ts: float
222     too_slow: bool
223     super_slow: bool
224     src_bundle: BundleDetails
225     is_cancelled: threading.Event
226     was_cancelled: bool
227     backup_bundles: Optional[List[BundleDetails]]
228     failure_count: int
229
230
231 class RemoteExecutorStatus:
232     def __init__(self, total_worker_count: int) -> None:
233         self.worker_count = total_worker_count
234         self.known_workers: Set[RemoteWorkerRecord] = set()
235         self.start_per_bundle: Dict[str, float] = defaultdict(float)
236         self.end_per_bundle: Dict[str, float] = defaultdict(float)
237         self.finished_bundle_timings_per_worker: Dict[
238             RemoteWorkerRecord,
239             List[float]
240         ] = {}
241         self.in_flight_bundles_by_worker: Dict[
242             RemoteWorkerRecord,
243             Set[str]
244         ] = {}
245         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
246         self.finished_bundle_timings: List[float] = []
247         self.last_periodic_dump: Optional[float] = None
248         self.total_bundles_submitted = 0
249
250         # Protects reads and modification using self.  Also used
251         # as a memory fence for modifications to bundle.
252         self.lock = threading.Lock()
253
254     def record_acquire_worker(
255             self,
256             worker: RemoteWorkerRecord,
257             uuid: str
258     ) -> None:
259         with self.lock:
260             self.record_acquire_worker_already_locked(
261                 worker,
262                 uuid
263             )
264
265     def record_acquire_worker_already_locked(
266             self,
267             worker: RemoteWorkerRecord,
268             uuid: str
269     ) -> None:
270         assert self.lock.locked()
271         self.known_workers.add(worker)
272         self.start_per_bundle[uuid] = time.time()
273         x = self.in_flight_bundles_by_worker.get(worker, set())
274         x.add(uuid)
275         self.in_flight_bundles_by_worker[worker] = x
276
277     def record_bundle_details(
278             self,
279             details: BundleDetails) -> None:
280         with self.lock:
281             self.record_bundle_details_already_locked(details)
282
283     def record_bundle_details_already_locked(
284             self,
285             details: BundleDetails) -> None:
286         assert self.lock.locked()
287         self.bundle_details_by_uuid[details.uuid] = details
288
289     def record_release_worker_already_locked(
290             self,
291             worker: RemoteWorkerRecord,
292             uuid: str,
293             was_cancelled: bool,
294     ) -> None:
295         assert self.lock.locked()
296         ts = time.time()
297         self.end_per_bundle[uuid] = ts
298         self.in_flight_bundles_by_worker[worker].remove(uuid)
299         if not was_cancelled:
300             bundle_latency = ts - self.start_per_bundle[uuid]
301             x = self.finished_bundle_timings_per_worker.get(worker, list())
302             x.append(bundle_latency)
303             self.finished_bundle_timings_per_worker[worker] = x
304             self.finished_bundle_timings.append(bundle_latency)
305
306     def total_in_flight(self) -> int:
307         assert self.lock.locked()
308         total_in_flight = 0
309         for worker in self.known_workers:
310             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
311         return total_in_flight
312
313     def total_idle(self) -> int:
314         assert self.lock.locked()
315         return self.worker_count - self.total_in_flight()
316
317     def __repr__(self):
318         assert self.lock.locked()
319         ts = time.time()
320         total_finished = len(self.finished_bundle_timings)
321         total_in_flight = self.total_in_flight()
322         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
323         qall = None
324         if len(self.finished_bundle_timings) > 1:
325             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
326             ret += (
327                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
328                 f'✅={total_finished}/{self.total_bundles_submitted}, '
329                 f'💻n={total_in_flight}/{self.worker_count}\n'
330             )
331         else:
332             ret += (
333                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
334                 f'💻n={total_in_flight}/{self.worker_count}\n'
335             )
336
337         for worker in self.known_workers:
338             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
339             timings = self.finished_bundle_timings_per_worker.get(worker, [])
340             count = len(timings)
341             qworker = None
342             if count > 1:
343                 qworker = numpy.quantile(timings, [0.5, 0.95])
344                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
345             else:
346                 ret += '\n'
347             if count > 0:
348                 ret += f'    ...finished {count} total bundle(s) so far\n'
349             in_flight = len(self.in_flight_bundles_by_worker[worker])
350             if in_flight > 0:
351                 ret += f'    ...{in_flight} bundles currently in flight:\n'
352                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
353                     details = self.bundle_details_by_uuid.get(
354                         bundle_uuid,
355                         None
356                     )
357                     pid = str(details.pid) if details is not None else "TBD"
358                     sec = ts - self.start_per_bundle[bundle_uuid]
359                     ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
360                     if qworker is not None:
361                         if sec > qworker[1]:
362                             ret += f'{bg("red")}>💻p95{reset()} '
363                         elif sec > qworker[0]:
364                             ret += f'{fg("red")}>💻p50{reset()} '
365                     if qall is not None:
366                         if sec > qall[1] * 1.5:
367                             ret += f'{bg("red")}!!!{reset()}'
368                             if details is not None:
369                                 logger.debug(f'Flagging {details.uuid} for another backup')
370                                 details.super_slow = True
371                         elif sec > qall[1]:
372                             ret += f'{bg("red")}>∀p95{reset()} '
373                             if details is not None:
374                                 logger.debug(f'Flagging {details.uuid} for a backup')
375                                 details.too_slow = True
376                         elif sec > qall[0]:
377                             ret += f'{fg("red")}>∀p50{reset()}'
378                     ret += '\n'
379         return ret
380
381     def periodic_dump(self, total_bundles_submitted: int) -> None:
382         assert self.lock.locked()
383         self.total_bundles_submitted = total_bundles_submitted
384         ts = time.time()
385         if (
386                 self.last_periodic_dump is None
387                 or ts - self.last_periodic_dump > 5.0
388         ):
389             print(self)
390             self.last_periodic_dump = ts
391
392
393 class RemoteWorkerSelectionPolicy(ABC):
394     def register_worker_pool(self, workers):
395         random.seed()
396         self.workers = workers
397
398     @abstractmethod
399     def is_worker_available(self) -> bool:
400         pass
401
402     @abstractmethod
403     def acquire_worker(
404             self,
405             machine_to_avoid = None
406     ) -> Optional[RemoteWorkerRecord]:
407         pass
408
409
410 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
411     def is_worker_available(self) -> bool:
412         for worker in self.workers:
413             if worker.count > 0:
414                 return True
415         return False
416
417     def acquire_worker(
418             self,
419             machine_to_avoid = None
420     ) -> Optional[RemoteWorkerRecord]:
421         grabbag = []
422         for worker in self.workers:
423             for x in range(0, worker.count):
424                 for y in range(0, worker.weight):
425                     grabbag.append(worker)
426
427         for _ in range(0, 5):
428             random.shuffle(grabbag)
429             worker = grabbag[0]
430             if worker.machine != machine_to_avoid or _ > 2:
431                 if worker.count > 0:
432                     worker.count -= 1
433                     logger.debug(f'Selected worker {worker}')
434                     return worker
435         logger.warning("Couldn't find a worker; go fish.")
436         return None
437
438
439 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
440     def __init__(self) -> None:
441         self.index = 0
442
443     def is_worker_available(self) -> bool:
444         for worker in self.workers:
445             if worker.count > 0:
446                 return True
447         return False
448
449     def acquire_worker(
450             self,
451             machine_to_avoid: str = None
452     ) -> Optional[RemoteWorkerRecord]:
453         x = self.index
454         while True:
455             worker = self.workers[x]
456             if worker.count > 0:
457                 worker.count -= 1
458                 x += 1
459                 if x >= len(self.workers):
460                     x = 0
461                 self.index = x
462                 logger.debug(f'Selected worker {worker}')
463                 return worker
464             x += 1
465             if x >= len(self.workers):
466                 x = 0
467             if x == self.index:
468                 logger.warning("Couldn't find a worker; go fish.")
469                 return None
470
471
472 class RemoteExecutor(BaseExecutor):
473     def __init__(self,
474                  workers: List[RemoteWorkerRecord],
475                  policy: RemoteWorkerSelectionPolicy) -> None:
476         super().__init__()
477         self.workers = workers
478         self.worker_count = 0
479         for worker in self.workers:
480             self.worker_count += worker.count
481         if self.worker_count <= 0:
482             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
483             logger.critical(msg)
484             raise Exception(msg)
485         self.policy = policy
486         self.policy.register_worker_pool(self.workers)
487         self.cv = threading.Condition()
488         self._helper_executor = fut.ThreadPoolExecutor(
489             thread_name_prefix="remote_executor_helper",
490             max_workers=self.worker_count,
491         )
492         self.status = RemoteExecutorStatus(self.worker_count)
493         self.total_bundles_submitted = 0
494         logger.debug(
495             f'Creating remote processpool with {self.worker_count} remote endpoints.'
496         )
497
498     def is_worker_available(self) -> bool:
499         return self.policy.is_worker_available()
500
501     def acquire_worker(
502             self,
503             machine_to_avoid: str = None
504     ) -> Optional[RemoteWorkerRecord]:
505         return self.policy.acquire_worker(machine_to_avoid)
506
507     def find_available_worker_or_block(
508             self,
509             machine_to_avoid: str = None
510     ) -> RemoteWorkerRecord:
511         with self.cv:
512             while not self.is_worker_available():
513                 self.cv.wait()
514             worker = self.acquire_worker(machine_to_avoid)
515             if worker is not None:
516                 return worker
517         msg = "We should never reach this point in the code"
518         logger.critical(msg)
519         raise Exception(msg)
520
521     def release_worker(self, worker: RemoteWorkerRecord) -> None:
522         logger.debug(f'Released worker {worker}')
523         with self.cv:
524             worker.count += 1
525             self.cv.notify()
526
527     def heartbeat(self) -> None:
528         with self.status.lock:
529             # Regular progress report
530             self.status.periodic_dump(self.total_bundles_submitted)
531
532             # Look for bundles to reschedule
533             if len(self.status.finished_bundle_timings) > 7:
534                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
535                     for uuid in bundle_uuids:
536                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
537                         if (
538                                 bundle is not None and
539                                 bundle.too_slow and
540                                 bundle.src_bundle is None and
541                                 config.config['executors_schedule_remote_backups']
542                         ):
543                             self.consider_backup_for_bundle(bundle)
544
545     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
546         assert self.status.lock.locked()
547         if (
548             bundle.too_slow
549             and len(bundle.backup_bundles) == 0       # one backup per
550         ):
551             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
552             logger.debug(msg)
553             self.schedule_backup_for_bundle(bundle)
554             return
555         elif (
556                 bundle.super_slow
557                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
558                 and self.status.total_idle() > 4
559         ):
560             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
561             logger.debug(msg)
562             self.schedule_backup_for_bundle(bundle)
563             return
564
565     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
566         with self.status.lock:
567             if bundle.is_cancelled.wait(timeout=0.0):
568                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
569                 bundle.was_cancelled = True
570                 return True
571         return False
572
573     def launch(self, bundle: BundleDetails) -> Any:
574         """Find a worker for bundle or block until one is available."""
575         self.adjust_task_count(+1)
576         uuid = bundle.uuid
577         hostname = bundle.hostname
578         avoid_machine = None
579
580         # Try not to schedule a backup on the same host as the original.
581         if bundle.src_bundle is not None:
582             avoid_machine = bundle.src_bundle.machine
583         worker = None
584         while worker is None:
585             worker = self.find_available_worker_or_block(avoid_machine)
586         bundle.worker = worker
587         machine = bundle.machine = worker.machine
588         username = bundle.username = worker.username
589         self.status.record_acquire_worker(worker, uuid)
590         logger.debug(f'Running bundle {uuid} on {worker}...')
591
592         # Before we do any work, make sure the bundle is still viable.
593         if self.check_if_cancelled(bundle):
594             try:
595                 return self.post_launch_work(bundle)
596             except Exception as e:
597                 logger.exception(e)
598                 logger.info(f"Bundle {uuid} seems to have failed?!")
599                 if bundle.failure_count < config.config['executors_max_bundle_failures']:
600                     return self.launch(bundle)
601                 else:
602                     logger.info(f"Bundle {uuid} is poison, giving up on it.")
603                     return None
604
605         # Send input to machine if it's not local.
606         if hostname not in machine:
607             cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
608             logger.info(f"Copying work to {worker} via {cmd}")
609             run_silently(cmd)
610
611         # Do it.
612         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
613                f'"source remote-execution/bin/activate &&'
614                f' /home/scott/lib/python_modules/remote_worker.py'
615                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
616         p = cmd_in_background(cmd, silent=True)
617         bundle.pid = pid = p.pid
618         logger.info(f"Running {cmd} in the background as process {pid}")
619
620         while True:
621             try:
622                 p.wait(timeout=0.25)
623             except subprocess.TimeoutExpired:
624                 self.heartbeat()
625
626                 # Both source and backup bundles can be cancelled by
627                 # the other depending on which finishes first.
628                 if self.check_if_cancelled(bundle):
629                     p.terminate()
630                     break
631             else:
632                 logger.debug(
633                     f"{pid}/{bundle.uuid} has finished its work normally."
634                 )
635                 break
636
637         try:
638             return self.post_launch_work(bundle)
639         except Exception as e:
640             logger.exception(e)
641             logger.info(f"Bundle {uuid} seems to have failed?!")
642             if bundle.failure_count < config.config['executors_max_bundle_failures']:
643                 return self.launch(bundle)
644             logger.info(f"Bundle {uuid} is poison, giving up on it.")
645             return None
646
647     def post_launch_work(self, bundle: BundleDetails) -> Any:
648         with self.status.lock:
649             is_original = bundle.src_bundle is None
650             was_cancelled = bundle.was_cancelled
651             username = bundle.username
652             machine = bundle.machine
653             result_file = bundle.result_file
654             code_file = bundle.code_file
655
656             # Whether original or backup, if we finished first we must
657             # fetch the results if the computation happened on a
658             # remote machine.
659             bundle.end_ts = time.time()
660             if not was_cancelled:
661                 assert bundle.machine is not None
662                 if bundle.hostname not in bundle.machine:
663                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
664                     logger.info(
665                         f"Fetching results from {username}@{machine} via {cmd}"
666                     )
667                     try:
668                         run_silently(cmd)
669                     except subprocess.CalledProcessError:
670                         logger.critical(f'Failed to copy {username}@{machine}:{result_file}!')
671                     run_silently(f'{SSH} {username}@{machine}'
672                                  f' "/bin/rm -f {code_file} {result_file}"')
673                 dur = bundle.end_ts - bundle.start_ts
674                 self.histogram.add_item(dur)
675             assert bundle.worker is not None
676             self.status.record_release_worker_already_locked(
677                 bundle.worker,
678                 bundle.uuid,
679                 was_cancelled
680             )
681
682         # Only the original worker should unpickle the file contents
683         # though since it's the only one whose result matters.  The
684         # original is also the only job that may delete result_file
685         # from disk.  Note that the original may have been cancelled
686         # if one of the backups finished first; it still must read the
687         # result from disk.
688         if is_original:
689             logger.debug(f"Unpickling {result_file}.")
690             try:
691                 with open(f'{result_file}', 'rb') as rb:
692                     serialized = rb.read()
693                 result = cloudpickle.loads(serialized)
694             except Exception as e:
695                 msg = f'Failed to load {result_file}'
696                 logger.critical(msg)
697                 bundle.failure_count += 1
698                 self.release_worker(bundle.worker)
699                 raise Exception(e)
700             os.remove(f'{result_file}')
701             os.remove(f'{code_file}')
702
703             # Notify any backups that the original is done so they
704             # should stop ASAP.  Do this whether or not we
705             # finished first since there could be more than one
706             # backup.
707             if bundle.backup_bundles is not None:
708                 for backup in bundle.backup_bundles:
709                     logger.debug(
710                         f'Notifying backup {backup.uuid} that it is cancelled'
711                     )
712                     backup.is_cancelled.set()
713
714         # This is a backup job.
715         else:
716             # Backup results don't matter, they just need to leave the
717             # result file in the right place for their originals to
718             # read/unpickle later.
719             result = None
720
721             # Tell the original to stop if we finished first.
722             if not was_cancelled:
723                 logger.debug(
724                     f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
725                 )
726                 bundle.src_bundle.is_cancelled.set()
727
728         assert bundle.worker is not None
729         self.release_worker(bundle.worker)
730         self.adjust_task_count(-1)
731         return result
732
733     def create_original_bundle(self, pickle):
734         from string_utils import generate_uuid
735         uuid = generate_uuid(as_hex=True)
736         code_file = f'/tmp/{uuid}.code.bin'
737         result_file = f'/tmp/{uuid}.result.bin'
738
739         logger.debug(f'Writing pickled code to {code_file}')
740         with open(f'{code_file}', 'wb') as wb:
741             wb.write(pickle)
742
743         bundle = BundleDetails(
744             pickled_code = pickle,
745             uuid = uuid,
746             worker = None,
747             username = None,
748             machine = None,
749             hostname = platform.node(),
750             code_file = code_file,
751             result_file = result_file,
752             pid = 0,
753             start_ts = time.time(),
754             end_ts = 0.0,
755             too_slow = False,
756             super_slow = False,
757             src_bundle = None,
758             is_cancelled = threading.Event(),
759             was_cancelled = False,
760             backup_bundles = [],
761             failure_count = 0,
762         )
763         self.status.record_bundle_details(bundle)
764         logger.debug(f'Created original bundle {uuid}')
765         return bundle
766
767     def create_backup_bundle(self, src_bundle: BundleDetails):
768         assert src_bundle.backup_bundles is not None
769         n = len(src_bundle.backup_bundles)
770         uuid = src_bundle.uuid + f'_backup#{n}'
771
772         backup_bundle = BundleDetails(
773             pickled_code = src_bundle.pickled_code,
774             uuid = uuid,
775             worker = None,
776             username = None,
777             machine = None,
778             hostname = src_bundle.hostname,
779             code_file = src_bundle.code_file,
780             result_file = src_bundle.result_file,
781             pid = 0,
782             start_ts = time.time(),
783             end_ts = 0.0,
784             too_slow = False,
785             super_slow = False,
786             src_bundle = src_bundle,
787             is_cancelled = threading.Event(),
788             was_cancelled = False,
789             backup_bundles = None,    # backup backups not allowed
790             failure_count = 0,
791         )
792         src_bundle.backup_bundles.append(backup_bundle)
793         self.status.record_bundle_details_already_locked(backup_bundle)
794         logger.debug(f'Created backup bundle {uuid}')
795         return backup_bundle
796
797     def schedule_backup_for_bundle(self,
798                                    src_bundle: BundleDetails):
799         assert self.status.lock.locked()
800         backup_bundle = self.create_backup_bundle(src_bundle)
801         logger.debug(
802             f'Scheduling backup bundle {backup_bundle.uuid} for execution'
803         )
804         self._helper_executor.submit(self.launch, backup_bundle)
805
806         # Results from backups don't matter; if they finish first
807         # they will move the result_file to this machine and let
808         # the original pick them up and unpickle them.
809
810     def submit(self,
811                function: Callable,
812                *args,
813                **kwargs) -> fut.Future:
814         pickle = make_cloud_pickle(function, *args, **kwargs)
815         bundle = self.create_original_bundle(pickle)
816         self.total_bundles_submitted += 1
817         return self._helper_executor.submit(self.launch, bundle)
818
819     def shutdown(self, wait=True) -> None:
820         self._helper_executor.shutdown(wait)
821         logging.debug(f'Shutting down RemoteExecutor {self.title}')
822         print(self.histogram)
823
824
825 @singleton
826 class DefaultExecutors(object):
827     def __init__(self):
828         self.thread_executor: Optional[ThreadExecutor] = None
829         self.process_executor: Optional[ProcessExecutor] = None
830         self.remote_executor: Optional[RemoteExecutor] = None
831
832     def ping(self, host) -> bool:
833         command = ['ping', '-c', '1', host]
834         return subprocess.call(
835             command,
836             stdout=subprocess.DEVNULL,
837             stderr=subprocess.DEVNULL,
838         ) == 0
839
840     def thread_pool(self) -> ThreadExecutor:
841         if self.thread_executor is None:
842             self.thread_executor = ThreadExecutor()
843         return self.thread_executor
844
845     def process_pool(self) -> ProcessExecutor:
846         if self.process_executor is None:
847             self.process_executor = ProcessExecutor()
848         return self.process_executor
849
850     def remote_pool(self) -> RemoteExecutor:
851         if self.remote_executor is None:
852             pool: List[RemoteWorkerRecord] = []
853             if self.ping('cheetah.house'):
854                 pool.append(
855                     RemoteWorkerRecord(
856                         username = 'scott',
857                         machine = 'cheetah.house',
858                         weight = 12,
859                         count = 4,
860                     ),
861                 )
862             if self.ping('video.house'):
863                 pool.append(
864                     RemoteWorkerRecord(
865                         username = 'scott',
866                         machine = 'video.house',
867                         weight = 1,
868                         count = 4,
869                     ),
870                 )
871             if self.ping('wannabe.house'):
872                 pool.append(
873                     RemoteWorkerRecord(
874                         username = 'scott',
875                         machine = 'wannabe.house',
876                         weight = 2,
877                         count = 4,
878                     ),
879                 )
880             if self.ping('meerkat.cabin'):
881                 pool.append(
882                     RemoteWorkerRecord(
883                         username = 'scott',
884                         machine = 'meerkat.cabin',
885                         weight = 6,
886                         count = 2,
887                     ),
888                 )
889             if self.ping('backup.house'):
890                 pool.append(
891                     RemoteWorkerRecord(
892                         username = 'scott',
893                         machine = 'backup.house',
894                         weight = 1,
895                         count = 4,
896                     ),
897                 )
898             if self.ping('puma.cabin'):
899                 pool.append(
900                     RemoteWorkerRecord(
901                         username = 'scott',
902                         machine = 'puma.cabin',
903                         weight = 12,
904                         count = 4,
905                     ),
906                 )
907             policy = WeightedRandomRemoteWorkerSelectionPolicy()
908             policy.register_worker_pool(pool)
909             self.remote_executor = RemoteExecutor(pool, policy)
910         return self.remote_executor