Make loadfile work.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20
21 from ansi import bg, fg, underline, reset
22 import argparse_utils
23 import config
24 from exec_utils import run_silently, cmd_in_background
25 from decorator_utils import singleton
26 import histogram
27
28 logger = logging.getLogger(__name__)
29
30 parser = config.add_commandline_args(
31     f"Executors ({__file__})",
32     "Args related to processing executors."
33 )
34 parser.add_argument(
35     '--executors_threadpool_size',
36     type=int,
37     metavar='#THREADS',
38     help='Number of threads in the default threadpool, leave unset for default',
39     default=None
40 )
41 parser.add_argument(
42     '--executors_processpool_size',
43     type=int,
44     metavar='#PROCESSES',
45     help='Number of processes in the default processpool, leave unset for default',
46     default=None,
47 )
48 parser.add_argument(
49     '--executors_schedule_remote_backups',
50     default=True,
51     action=argparse_utils.ActionNoYes,
52     help='Should we schedule duplicative backup work if a remote bundle is slow',
53 )
54 parser.add_argument(
55     '--executors_max_bundle_failures',
56     type=int,
57     default=3,
58     metavar='#FAILURES',
59     help='Maximum number of failures before giving up on a bundle',
60 )
61
62 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
63 SSH = 'ssh -oForwardX11=no'
64 HIST = histogram.SimpleHistogram(
65     histogram.SimpleHistogram.n_evenly_spaced_buckets(
66         int(0), int(500), 25
67     )
68 )
69
70
71 def run_local_bundle(fun, *args, **kwargs):
72     logger.debug(f"Running local bundle at {fun.__name__}")
73     start = time.time()
74     result = fun(*args, **kwargs)
75     end = time.time()
76     duration = end - start
77     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
78     HIST.add_item(duration)
79     return result
80
81
82 def run_cloud_pickle(pickle):
83     fun, args, kwargs = cloudpickle.loads(pickle)
84     logger.debug(f"Running pickled bundle at {fun.__name__}")
85     start = time.time()
86     result = fun(*args, **kwargs)
87     end = time.time()
88     duration = end - start
89     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
90     return result
91
92
93 def make_cloud_pickle(fun, *args, **kwargs):
94     logger.info(f"Making cloudpickled bundle at {fun.__name__}")
95     return cloudpickle.dumps((fun, args, kwargs))
96
97
98 class BaseExecutor(ABC):
99     def __init__(self):
100         pass
101
102     @abstractmethod
103     def submit(self,
104                function: Callable,
105                *args,
106                **kwargs) -> fut.Future:
107         pass
108
109     @abstractmethod
110     def shutdown(self,
111                  wait: bool = True) -> None:
112         pass
113
114
115 class ThreadExecutor(BaseExecutor):
116     def __init__(self,
117                  max_workers: Optional[int] = None):
118         super().__init__()
119         workers = None
120         if max_workers is not None:
121             workers = max_workers
122         elif 'executors_threadpool_size' in config.config:
123             workers = config.config['executors_threadpool_size']
124         logger.debug(f'Creating threadpool executor with {workers} workers')
125         self._thread_pool_executor = fut.ThreadPoolExecutor(
126             max_workers=workers,
127             thread_name_prefix="thread_executor_helper"
128         )
129         self.job_count = 0
130
131     def submit(self,
132                function: Callable,
133                *args,
134                **kwargs) -> fut.Future:
135         self.job_count += 1
136         logger.debug(
137             f'Submitted work to threadpool; there are now {self.job_count} items.'
138         )
139         newargs = []
140         newargs.append(function)
141         for arg in args:
142             newargs.append(arg)
143         return self._thread_pool_executor.submit(
144             run_local_bundle,
145             *newargs,
146             **kwargs)
147
148     def shutdown(self,
149                  wait = True) -> None:
150         logger.debug("Shutting down threadpool executor.")
151         print(HIST)
152         self._thread_pool_executor.shutdown(wait)
153
154
155 class ProcessExecutor(BaseExecutor):
156     def __init__(self,
157                  max_workers=None):
158         super().__init__()
159         workers = None
160         if max_workers is not None:
161             workers = max_workers
162         elif 'executors_processpool_size' in config.config:
163             workers = config.config['executors_processpool_size']
164         logger.debug(f'Creating processpool executor with {workers} workers.')
165         self._process_executor = fut.ProcessPoolExecutor(
166             max_workers=workers,
167         )
168         self.job_count = 0
169
170     def submit(self,
171                function: Callable,
172                *args,
173                **kwargs) -> fut.Future:
174         # Bundle it up before submitting because pickle sucks.
175         pickle = make_cloud_pickle(function, *args, **kwargs)
176         self.job_count += 1
177         logger.debug(
178             f'Submitting work to processpool executor; there are now {self.job_count} items.'
179         )
180         return self._process_executor.submit(run_cloud_pickle, pickle)
181
182     def shutdown(self, wait=True) -> None:
183         logger.debug('Shutting down processpool executor')
184         print(HIST)
185         self._process_executor.shutdown(wait)
186
187
188 @dataclass
189 class RemoteWorkerRecord:
190     username: str
191     machine: str
192     weight: int
193     count: int
194
195     def __hash__(self):
196         return hash((self.username, self.machine))
197
198     def __repr__(self):
199         return f'{self.username}@{self.machine}'
200
201
202 @dataclass
203 class BundleDetails:
204     pickled_code: bytes
205     uuid: str
206     worker: Optional[RemoteWorkerRecord]
207     username: Optional[str]
208     machine: Optional[str]
209     hostname: str
210     code_file: str
211     result_file: str
212     pid: int
213     start_ts: float
214     end_ts: float
215     too_slow: bool
216     super_slow: bool
217     src_bundle: BundleDetails
218     is_cancelled: threading.Event
219     was_cancelled: bool
220     backup_bundles: Optional[List[BundleDetails]]
221     failure_count: int
222
223
224 class RemoteExecutorStatus:
225     def __init__(self, total_worker_count: int) -> None:
226         self.worker_count = total_worker_count
227         self.known_workers: Set[RemoteWorkerRecord] = set()
228         self.start_per_bundle: Dict[str, float] = defaultdict(float)
229         self.end_per_bundle: Dict[str, float] = defaultdict(float)
230         self.finished_bundle_timings_per_worker: Dict[
231             RemoteWorkerRecord,
232             List[float]
233         ] = {}
234         self.in_flight_bundles_by_worker: Dict[
235             RemoteWorkerRecord,
236             Set[str]
237         ] = {}
238         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
239         self.finished_bundle_timings: List[float] = []
240         self.last_periodic_dump: Optional[float] = None
241         self.total_bundles_submitted = 0
242
243         # Protects reads and modification using self.  Also used
244         # as a memory fence for modifications to bundle.
245         self.lock = threading.Lock()
246
247     def record_acquire_worker(
248             self,
249             worker: RemoteWorkerRecord,
250             uuid: str
251     ) -> None:
252         with self.lock:
253             self.record_acquire_worker_already_locked(
254                 worker,
255                 uuid
256             )
257
258     def record_acquire_worker_already_locked(
259             self,
260             worker: RemoteWorkerRecord,
261             uuid: str
262     ) -> None:
263         assert self.lock.locked()
264         self.known_workers.add(worker)
265         self.start_per_bundle[uuid] = time.time()
266         x = self.in_flight_bundles_by_worker.get(worker, set())
267         x.add(uuid)
268         self.in_flight_bundles_by_worker[worker] = x
269
270     def record_bundle_details(
271             self,
272             details: BundleDetails) -> None:
273         with self.lock:
274             self.record_bundle_details_already_locked(details)
275
276     def record_bundle_details_already_locked(
277             self,
278             details: BundleDetails) -> None:
279         assert self.lock.locked()
280         self.bundle_details_by_uuid[details.uuid] = details
281
282     def record_release_worker_already_locked(
283             self,
284             worker: RemoteWorkerRecord,
285             uuid: str,
286             was_cancelled: bool,
287     ) -> None:
288         assert self.lock.locked()
289         ts = time.time()
290         self.end_per_bundle[uuid] = ts
291         self.in_flight_bundles_by_worker[worker].remove(uuid)
292         if not was_cancelled:
293             bundle_latency = ts - self.start_per_bundle[uuid]
294             x = self.finished_bundle_timings_per_worker.get(worker, list())
295             x.append(bundle_latency)
296             self.finished_bundle_timings_per_worker[worker] = x
297             self.finished_bundle_timings.append(bundle_latency)
298
299     def total_in_flight(self) -> int:
300         assert self.lock.locked()
301         total_in_flight = 0
302         for worker in self.known_workers:
303             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
304         return total_in_flight
305
306     def total_idle(self) -> int:
307         assert self.lock.locked()
308         return self.worker_count - self.total_in_flight()
309
310     def __repr__(self):
311         assert self.lock.locked()
312         ts = time.time()
313         total_finished = len(self.finished_bundle_timings)
314         total_in_flight = self.total_in_flight()
315         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
316         qall = None
317         if len(self.finished_bundle_timings) > 1:
318             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
319             ret += (
320                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
321                 f'✅={total_finished}/{self.total_bundles_submitted}, '
322                 f'💻n={total_in_flight}/{self.worker_count}\n'
323             )
324         else:
325             ret += (
326                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
327                 f'💻n={total_in_flight}/{self.worker_count}\n'
328             )
329
330         for worker in self.known_workers:
331             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
332             timings = self.finished_bundle_timings_per_worker.get(worker, [])
333             count = len(timings)
334             qworker = None
335             if count > 1:
336                 qworker = numpy.quantile(timings, [0.5, 0.95])
337                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
338             else:
339                 ret += '\n'
340             if count > 0:
341                 ret += f'    ...finished {count} total bundle(s) so far\n'
342             in_flight = len(self.in_flight_bundles_by_worker[worker])
343             if in_flight > 0:
344                 ret += f'    ...{in_flight} bundles currently in flight:\n'
345                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
346                     details = self.bundle_details_by_uuid.get(
347                         bundle_uuid,
348                         None
349                     )
350                     pid = str(details.pid) if details is not None else "TBD"
351                     sec = ts - self.start_per_bundle[bundle_uuid]
352                     ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
353                     if qworker is not None:
354                         if sec > qworker[1]:
355                             ret += f'{bg("red")}>💻p95{reset()} '
356                         elif sec > qworker[0]:
357                             ret += f'{fg("red")}>💻p50{reset()} '
358                     if qall is not None:
359                         if sec > qall[1] * 1.5:
360                             ret += f'{bg("red")}!!!{reset()}'
361                             if details is not None:
362                                 logger.debug(f'Flagging {details.uuid} for another backup')
363                                 details.super_slow = True
364                         elif sec > qall[1]:
365                             ret += f'{bg("red")}>∀p95{reset()} '
366                             if details is not None:
367                                 logger.debug(f'Flagging {details.uuid} for a backup')
368                                 details.too_slow = True
369                         elif sec > qall[0]:
370                             ret += f'{fg("red")}>∀p50{reset()}'
371                     ret += '\n'
372         return ret
373
374     def periodic_dump(self, total_bundles_submitted: int) -> None:
375         assert self.lock.locked()
376         self.total_bundles_submitted = total_bundles_submitted
377         ts = time.time()
378         if (
379                 self.last_periodic_dump is None
380                 or ts - self.last_periodic_dump > 5.0
381         ):
382             print(self)
383             self.last_periodic_dump = ts
384
385
386 class RemoteWorkerSelectionPolicy(ABC):
387     def register_worker_pool(self, workers):
388         random.seed()
389         self.workers = workers
390
391     @abstractmethod
392     def is_worker_available(self) -> bool:
393         pass
394
395     @abstractmethod
396     def acquire_worker(
397             self,
398             machine_to_avoid = None
399     ) -> Optional[RemoteWorkerRecord]:
400         pass
401
402
403 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
404     def is_worker_available(self) -> bool:
405         for worker in self.workers:
406             if worker.count > 0:
407                 return True
408         return False
409
410     def acquire_worker(
411             self,
412             machine_to_avoid = None
413     ) -> Optional[RemoteWorkerRecord]:
414         grabbag = []
415         for worker in self.workers:
416             for x in range(0, worker.count):
417                 for y in range(0, worker.weight):
418                     grabbag.append(worker)
419
420         for _ in range(0, 5):
421             random.shuffle(grabbag)
422             worker = grabbag[0]
423             if worker.machine != machine_to_avoid or _ > 2:
424                 if worker.count > 0:
425                     worker.count -= 1
426                     logger.debug(f'Selected worker {worker}')
427                     return worker
428         logger.warning("Couldn't find a worker; go fish.")
429         return None
430
431
432 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
433     def __init__(self) -> None:
434         self.index = 0
435
436     def is_worker_available(self) -> bool:
437         for worker in self.workers:
438             if worker.count > 0:
439                 return True
440         return False
441
442     def acquire_worker(
443             self,
444             machine_to_avoid: str = None
445     ) -> Optional[RemoteWorkerRecord]:
446         x = self.index
447         while True:
448             worker = self.workers[x]
449             if worker.count > 0:
450                 worker.count -= 1
451                 x += 1
452                 if x >= len(self.workers):
453                     x = 0
454                 self.index = x
455                 logger.debug(f'Selected worker {worker}')
456                 return worker
457             x += 1
458             if x >= len(self.workers):
459                 x = 0
460             if x == self.index:
461                 logger.warning("Couldn't find a worker; go fish.")
462                 return None
463
464
465 class RemoteExecutor(BaseExecutor):
466     def __init__(self,
467                  workers: List[RemoteWorkerRecord],
468                  policy: RemoteWorkerSelectionPolicy) -> None:
469         super().__init__()
470         self.workers = workers
471         self.worker_count = 0
472         for worker in self.workers:
473             self.worker_count += worker.count
474         if self.worker_count <= 0:
475             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
476             logger.critical(msg)
477             raise Exception(msg)
478         self.policy = policy
479         self.policy.register_worker_pool(self.workers)
480         self.cv = threading.Condition()
481         self._helper_executor = fut.ThreadPoolExecutor(
482             thread_name_prefix="remote_executor_helper",
483             max_workers=self.worker_count,
484         )
485         self.status = RemoteExecutorStatus(self.worker_count)
486         self.total_bundles_submitted = 0
487         logger.debug(
488             f'Creating remote processpool with {self.worker_count} remote endpoints.'
489         )
490
491     def is_worker_available(self) -> bool:
492         return self.policy.is_worker_available()
493
494     def acquire_worker(
495             self,
496             machine_to_avoid: str = None
497     ) -> Optional[RemoteWorkerRecord]:
498         return self.policy.acquire_worker(machine_to_avoid)
499
500     def find_available_worker_or_block(
501             self,
502             machine_to_avoid: str = None
503     ) -> RemoteWorkerRecord:
504         with self.cv:
505             while not self.is_worker_available():
506                 self.cv.wait()
507             worker = self.acquire_worker(machine_to_avoid)
508             if worker is not None:
509                 return worker
510         msg = "We should never reach this point in the code"
511         logger.critical(msg)
512         raise Exception(msg)
513
514     def release_worker(self, worker: RemoteWorkerRecord) -> None:
515         logger.debug(f'Released worker {worker}')
516         with self.cv:
517             worker.count += 1
518             self.cv.notify()
519
520     def heartbeat(self) -> None:
521         with self.status.lock:
522             # Regular progress report
523             self.status.periodic_dump(self.total_bundles_submitted)
524
525             # Look for bundles to reschedule
526             if len(self.status.finished_bundle_timings) > 7:
527                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
528                     for uuid in bundle_uuids:
529                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
530                         if (
531                                 bundle is not None and
532                                 bundle.too_slow and
533                                 bundle.src_bundle is None and
534                                 config.config['executors_schedule_remote_backups']
535                         ):
536                             self.consider_backup_for_bundle(bundle)
537
538     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
539         assert self.status.lock.locked()
540         if (
541             bundle.too_slow
542             and len(bundle.backup_bundles) == 0       # one backup per
543         ):
544             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
545             logger.debug(msg)
546             self.schedule_backup_for_bundle(bundle)
547             return
548         elif (
549                 bundle.super_slow
550                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
551                 and self.status.total_idle() > 4
552         ):
553             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
554             logger.debug(msg)
555             self.schedule_backup_for_bundle(bundle)
556             return
557
558     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
559         with self.status.lock:
560             if bundle.is_cancelled.wait(timeout=0.0):
561                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
562                 bundle.was_cancelled = True
563                 return True
564         return False
565
566     def launch(self, bundle: BundleDetails) -> Any:
567         """Find a worker for bundle or block until one is available."""
568         uuid = bundle.uuid
569         hostname = bundle.hostname
570         avoid_machine = None
571
572         # Try not to schedule a backup on the same host as the original.
573         if bundle.src_bundle is not None:
574             avoid_machine = bundle.src_bundle.machine
575         worker = None
576         while worker is None:
577             worker = self.find_available_worker_or_block(avoid_machine)
578         bundle.worker = worker
579         machine = bundle.machine = worker.machine
580         username = bundle.username = worker.username
581         self.status.record_acquire_worker(worker, uuid)
582         logger.debug(f'Running bundle {uuid} on {worker}...')
583
584         # Before we do any work, make sure the bundle is still viable.
585         if self.check_if_cancelled(bundle):
586             try:
587                 return self.post_launch_work(bundle)
588             except Exception as e:
589                 logger.exception(e)
590                 logger.info(f"Bundle {uuid} seems to have failed?!")
591                 if bundle.failure_count < config.config['executors_max_bundle_failures']:
592                     return self.launch(bundle)
593                 else:
594                     logger.info(f"Bundle {uuid} is poison, giving up on it.")
595                     return None
596
597         # Send input to machine if it's not local.
598         if hostname not in machine:
599             cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
600             logger.info(f"Copying work to {worker} via {cmd}")
601             run_silently(cmd)
602
603         # Do it.
604         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
605                f'"source remote-execution/bin/activate &&'
606                f' /home/scott/lib/python_modules/remote_worker.py'
607                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
608         p = cmd_in_background(cmd, silent=True)
609         bundle.pid = pid = p.pid
610         logger.info(f"Running {cmd} in the background as process {pid}")
611
612         while True:
613             try:
614                 p.wait(timeout=0.5)
615             except subprocess.TimeoutExpired:
616                 self.heartbeat()
617
618                 # Both source and backup bundles can be cancelled by
619                 # the other depending on which finishes first.
620                 if self.check_if_cancelled(bundle):
621                     p.terminate()
622                     break
623             else:
624                 logger.debug(
625                     f"{pid}/{bundle.uuid} has finished its work normally."
626                 )
627                 break
628
629         try:
630             return self.post_launch_work(bundle)
631         except Exception as e:
632             logger.exception(e)
633             logger.info(f"Bundle {uuid} seems to have failed?!")
634             if bundle.failure_count < config.config['executors_max_bundle_failures']:
635                 return self.launch(bundle)
636             logger.info(f"Bundle {uuid} is poison, giving up on it.")
637             return None
638
639     def post_launch_work(self, bundle: BundleDetails) -> Any:
640         with self.status.lock:
641             is_original = bundle.src_bundle is None
642             was_cancelled = bundle.was_cancelled
643             username = bundle.username
644             machine = bundle.machine
645             result_file = bundle.result_file
646             code_file = bundle.code_file
647
648             # Whether original or backup, if we finished first we must
649             # fetch the results if the computation happened on a
650             # remote machine.
651             if not was_cancelled:
652                 assert bundle.machine is not None
653                 if bundle.hostname not in bundle.machine:
654                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
655                     logger.info(
656                         f"Fetching results from {username}@{machine} via {cmd}"
657                     )
658                     try:
659                         run_silently(cmd)
660                     except subprocess.CalledProcessError:
661                         pass
662                     run_silently(f'{SSH} {username}@{machine}'
663                                  f' "/bin/rm -f {code_file} {result_file}"')
664             bundle.end_ts = time.time()
665             assert bundle.worker is not None
666             self.status.record_release_worker_already_locked(
667                 bundle.worker,
668                 bundle.uuid,
669                 was_cancelled
670             )
671             if not was_cancelled:
672                 dur = bundle.end_ts - bundle.start_ts
673                 HIST.add_item(dur)
674
675         # Original or not, the results should be back on the local
676         # machine.  Are they?
677         if not os.path.exists(result_file):
678             msg = f'{result_file} unexpectedly missing, wtf?!'
679             logger.critical(msg)
680             bundle.failure_count += 1
681             self.release_worker(bundle.worker)
682             raise Exception(msg)
683
684         # Only the original worker should unpickle the file contents
685         # though since it's the only one whose result matters.
686         if is_original:
687             logger.debug(f"Unpickling {result_file}.")
688             try:
689                 with open(f'{result_file}', 'rb') as rb:
690                     serialized = rb.read()
691                 result = cloudpickle.loads(serialized)
692             except Exception as e:
693                 msg = f'Failed to load {result_file}'
694                 logger.critical(msg)
695                 bundle.failure_count += 1
696                 self.release_worker(bundle.worker)
697                 raise Exception(e)
698             os.remove(f'{result_file}')
699             os.remove(f'{code_file}')
700
701             # Notify any backups that the original is done so they
702             # should stop ASAP.  Do this whether or not we
703             # finished first since there could be more than one
704             # backup.
705             if bundle.backup_bundles is not None:
706                 for backup in bundle.backup_bundles:
707                     logger.debug(
708                         f'Notifying backup {backup.uuid} that it is cancelled'
709                     )
710                     backup.is_cancelled.set()
711
712         # This is a backup.
713         else:
714             # Backup results don't matter, they just need to leave the
715             # result file in the right place for their original to
716             # read later.
717             result = None
718
719             # Tell the original to stop if we finished first.
720             if not was_cancelled:
721                 logger.debug(
722                     f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
723                 )
724                 bundle.src_bundle.is_cancelled.set()
725
726         assert bundle.worker is not None
727         self.release_worker(bundle.worker)
728         return result
729
730     def create_original_bundle(self, pickle):
731         from string_utils import generate_uuid
732         uuid = generate_uuid(as_hex=True)
733         code_file = f'/tmp/{uuid}.code.bin'
734         result_file = f'/tmp/{uuid}.result.bin'
735
736         logger.debug(f'Writing pickled code to {code_file}')
737         with open(f'{code_file}', 'wb') as wb:
738             wb.write(pickle)
739
740         bundle = BundleDetails(
741             pickled_code = pickle,
742             uuid = uuid,
743             worker = None,
744             username = None,
745             machine = None,
746             hostname = platform.node(),
747             code_file = code_file,
748             result_file = result_file,
749             pid = 0,
750             start_ts = time.time(),
751             end_ts = 0.0,
752             too_slow = False,
753             super_slow = False,
754             src_bundle = None,
755             is_cancelled = threading.Event(),
756             was_cancelled = False,
757             backup_bundles = [],
758             failure_count = 0,
759         )
760         self.status.record_bundle_details(bundle)
761         logger.debug(f'Created original bundle {uuid}')
762         return bundle
763
764     def create_backup_bundle(self, src_bundle: BundleDetails):
765         assert src_bundle.backup_bundles is not None
766         n = len(src_bundle.backup_bundles)
767         uuid = src_bundle.uuid + f'_backup#{n}'
768
769         backup_bundle = BundleDetails(
770             pickled_code = src_bundle.pickled_code,
771             uuid = uuid,
772             worker = None,
773             username = None,
774             machine = None,
775             hostname = src_bundle.hostname,
776             code_file = src_bundle.code_file,
777             result_file = src_bundle.result_file,
778             pid = 0,
779             start_ts = time.time(),
780             end_ts = 0.0,
781             too_slow = False,
782             super_slow = False,
783             src_bundle = src_bundle,
784             is_cancelled = threading.Event(),
785             was_cancelled = False,
786             backup_bundles = None,    # backup backups not allowed
787             failure_count = 0,
788         )
789         src_bundle.backup_bundles.append(backup_bundle)
790         self.status.record_bundle_details_already_locked(backup_bundle)
791         logger.debug(f'Created backup bundle {uuid}')
792         return backup_bundle
793
794     def schedule_backup_for_bundle(self,
795                                    src_bundle: BundleDetails):
796         assert self.status.lock.locked()
797         backup_bundle = self.create_backup_bundle(src_bundle)
798         logger.debug(
799             f'Scheduling backup bundle {backup_bundle.uuid} for execution'
800         )
801         self._helper_executor.submit(self.launch, backup_bundle)
802
803         # Results from backups don't matter; if they finish first
804         # they will move the result_file to this machine and let
805         # the original pick them up and unpickle them.
806
807     def submit(self,
808                function: Callable,
809                *args,
810                **kwargs) -> fut.Future:
811         pickle = make_cloud_pickle(function, *args, **kwargs)
812         bundle = self.create_original_bundle(pickle)
813         self.total_bundles_submitted += 1
814         logger.debug(
815             f'Submitted work to remote executor; {self.total_bundles_submitted} items now submitted'
816         )
817         return self._helper_executor.submit(self.launch, bundle)
818
819     def shutdown(self, wait=True) -> None:
820         self._helper_executor.shutdown(wait)
821         print(HIST)
822
823
824 @singleton
825 class DefaultExecutors(object):
826     def __init__(self):
827         self.thread_executor: Optional[ThreadExecutor] = None
828         self.process_executor: Optional[ProcessExecutor] = None
829         self.remote_executor: Optional[RemoteExecutor] = None
830
831     def ping(self, host) -> bool:
832         command = ['ping', '-c', '1', host]
833         return subprocess.call(
834             command,
835             stdout=subprocess.DEVNULL,
836             stderr=subprocess.DEVNULL,
837         ) == 0
838
839     def thread_pool(self) -> ThreadExecutor:
840         if self.thread_executor is None:
841             self.thread_executor = ThreadExecutor()
842         return self.thread_executor
843
844     def process_pool(self) -> ProcessExecutor:
845         if self.process_executor is None:
846             self.process_executor = ProcessExecutor()
847         return self.process_executor
848
849     def remote_pool(self) -> RemoteExecutor:
850         if self.remote_executor is None:
851             pool: List[RemoteWorkerRecord] = []
852             if self.ping('cheetah.house'):
853                 pool.append(
854                     RemoteWorkerRecord(
855                         username = 'scott',
856                         machine = 'cheetah.house',
857                         weight = 12,
858                         count = 4,
859                     ),
860                 )
861             if self.ping('video.house'):
862                 pool.append(
863                     RemoteWorkerRecord(
864                         username = 'scott',
865                         machine = 'video.house',
866                         weight = 1,
867                         count = 4,
868                     ),
869                 )
870             if self.ping('wannabe.house'):
871                 pool.append(
872                     RemoteWorkerRecord(
873                         username = 'scott',
874                         machine = 'wannabe.house',
875                         weight = 2,
876                         count = 4,
877                     ),
878                 )
879             if self.ping('meerkat.cabin'):
880                 pool.append(
881                     RemoteWorkerRecord(
882                         username = 'scott',
883                         machine = 'meerkat.cabin',
884                         weight = 7,
885                         count = 2,
886                     ),
887                 )
888             if self.ping('backup.house'):
889                 pool.append(
890                     RemoteWorkerRecord(
891                         username = 'scott',
892                         machine = 'backup.house',
893                         weight = 1,
894                         count = 4,
895                     ),
896                 )
897             if self.ping('puma.cabin'):
898                 pool.append(
899                     RemoteWorkerRecord(
900                         username = 'scott',
901                         machine = 'puma.cabin',
902                         weight = 12,
903                         count = 4,
904                     ),
905                 )
906             policy = WeightedRandomRemoteWorkerSelectionPolicy()
907             policy.register_worker_pool(pool)
908             self.remote_executor = RemoteExecutor(pool, policy)
909         return self.remote_executor