Lots of changes.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20
21 from ansi import bg, fg, underline, reset
22 import argparse_utils
23 import config
24 import exec_utils
25 from decorator_utils import singleton
26 import histogram
27 import string_utils
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55 parser.add_argument(
56     '--executors_max_bundle_failures',
57     type=int,
58     default=3,
59     metavar='#FAILURES',
60     help='Maximum number of failures before giving up on a bundle',
61 )
62
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65 HIST = histogram.SimpleHistogram(
66     histogram.SimpleHistogram.n_evenly_spaced_buckets(
67         int(0), int(500), 25
68     )
69 )
70
71
72 def run_local_bundle(fun, *args, **kwargs):
73     logger.debug(f"Running local bundle at {fun.__name__}")
74     start = time.time()
75     result = fun(*args, **kwargs)
76     end = time.time()
77     duration = end - start
78     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
79     HIST.add_item(duration)
80     return result
81
82
83 def run_cloud_pickle(pickle):
84     fun, args, kwargs = cloudpickle.loads(pickle)
85     logger.debug(f"Running pickled bundle at {fun.__name__}")
86     start = time.time()
87     result = fun(*args, **kwargs)
88     end = time.time()
89     duration = end - start
90     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
91     return result
92
93
94 def make_cloud_pickle(fun, *args, **kwargs):
95     logger.info(f"Making cloudpickled bundle at {fun.__name__}")
96     return cloudpickle.dumps((fun, args, kwargs))
97
98
99 class BaseExecutor(ABC):
100     def __init__(self):
101         pass
102
103     @abstractmethod
104     def submit(self,
105                function: Callable,
106                *args,
107                **kwargs) -> fut.Future:
108         pass
109
110     @abstractmethod
111     def shutdown(self,
112                  wait: bool = True) -> None:
113         pass
114
115
116 class ThreadExecutor(BaseExecutor):
117     def __init__(self,
118                  max_workers: Optional[int] = None):
119         super().__init__()
120         workers = None
121         if max_workers is not None:
122             workers = max_workers
123         elif 'executors_threadpool_size' in config.config:
124             workers = config.config['executors_threadpool_size']
125         logger.debug(f'Creating threadpool executor with {workers} workers')
126         self._thread_pool_executor = fut.ThreadPoolExecutor(
127             max_workers=workers,
128             thread_name_prefix="thread_executor_helper"
129         )
130         self.job_count = 0
131
132     def submit(self,
133                function: Callable,
134                *args,
135                **kwargs) -> fut.Future:
136         self.job_count += 1
137         logger.debug(
138             f'Submitted work to threadpool; there are now {self.job_count} items.'
139         )
140         newargs = []
141         newargs.append(function)
142         for arg in args:
143             newargs.append(arg)
144         return self._thread_pool_executor.submit(
145             run_local_bundle,
146             *newargs,
147             **kwargs)
148
149     def shutdown(self,
150                  wait = True) -> None:
151         logger.debug("Shutting down threadpool executor.")
152         print(HIST)
153         self._thread_pool_executor.shutdown(wait)
154
155
156 class ProcessExecutor(BaseExecutor):
157     def __init__(self,
158                  max_workers=None):
159         super().__init__()
160         workers = None
161         if max_workers is not None:
162             workers = max_workers
163         elif 'executors_processpool_size' in config.config:
164             workers = config.config['executors_processpool_size']
165         logger.debug(f'Creating processpool executor with {workers} workers.')
166         self._process_executor = fut.ProcessPoolExecutor(
167             max_workers=workers,
168         )
169         self.job_count = 0
170
171     def submit(self,
172                function: Callable,
173                *args,
174                **kwargs) -> fut.Future:
175         # Bundle it up before submitting because pickle sucks.
176         pickle = make_cloud_pickle(function, *args, **kwargs)
177         self.job_count += 1
178         logger.debug(
179             f'Submitting work to processpool executor; there are now {self.job_count} items.'
180         )
181         return self._process_executor.submit(run_cloud_pickle, pickle)
182
183     def shutdown(self, wait=True) -> None:
184         logger.debug('Shutting down processpool executor')
185         print(HIST)
186         self._process_executor.shutdown(wait)
187
188
189 @dataclass
190 class RemoteWorkerRecord:
191     username: str
192     machine: str
193     weight: int
194     count: int
195
196     def __hash__(self):
197         return hash((self.username, self.machine))
198
199     def __repr__(self):
200         return f'{self.username}@{self.machine}'
201
202
203 @dataclass
204 class BundleDetails:
205     pickled_code: bytes
206     uuid: str
207     worker: Optional[RemoteWorkerRecord]
208     username: Optional[str]
209     machine: Optional[str]
210     hostname: str
211     code_file: str
212     result_file: str
213     pid: int
214     start_ts: float
215     end_ts: float
216     too_slow: bool
217     super_slow: bool
218     src_bundle: BundleDetails
219     is_cancelled: threading.Event
220     was_cancelled: bool
221     backup_bundles: Optional[List[BundleDetails]]
222     failure_count: int
223
224
225 class RemoteExecutorStatus:
226     def __init__(self, total_worker_count: int) -> None:
227         self.worker_count = total_worker_count
228         self.known_workers: Set[RemoteWorkerRecord] = set()
229         self.start_per_bundle: Dict[str, float] = defaultdict(float)
230         self.end_per_bundle: Dict[str, float] = defaultdict(float)
231         self.finished_bundle_timings_per_worker: Dict[
232             RemoteWorkerRecord,
233             List[float]
234         ] = {}
235         self.in_flight_bundles_by_worker: Dict[
236             RemoteWorkerRecord,
237             Set[str]
238         ] = {}
239         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
240         self.finished_bundle_timings: List[float] = []
241         self.last_periodic_dump: Optional[float] = None
242         self.total_bundles_submitted = 0
243
244         # Protects reads and modification using self.  Also used
245         # as a memory fence for modifications to bundle.
246         self.lock = threading.Lock()
247
248     def record_acquire_worker(
249             self,
250             worker: RemoteWorkerRecord,
251             uuid: str
252     ) -> None:
253         with self.lock:
254             self.record_acquire_worker_already_locked(
255                 worker,
256                 uuid
257             )
258
259     def record_acquire_worker_already_locked(
260             self,
261             worker: RemoteWorkerRecord,
262             uuid: str
263     ) -> None:
264         assert self.lock.locked()
265         self.known_workers.add(worker)
266         self.start_per_bundle[uuid] = time.time()
267         x = self.in_flight_bundles_by_worker.get(worker, set())
268         x.add(uuid)
269         self.in_flight_bundles_by_worker[worker] = x
270
271     def record_bundle_details(
272             self,
273             details: BundleDetails) -> None:
274         with self.lock:
275             self.record_bundle_details_already_locked(details)
276
277     def record_bundle_details_already_locked(
278             self,
279             details: BundleDetails) -> None:
280         assert self.lock.locked()
281         self.bundle_details_by_uuid[details.uuid] = details
282
283     def record_release_worker_already_locked(
284             self,
285             worker: RemoteWorkerRecord,
286             uuid: str,
287             was_cancelled: bool,
288     ) -> None:
289         assert self.lock.locked()
290         ts = time.time()
291         self.end_per_bundle[uuid] = ts
292         self.in_flight_bundles_by_worker[worker].remove(uuid)
293         if not was_cancelled:
294             bundle_latency = ts - self.start_per_bundle[uuid]
295             x = self.finished_bundle_timings_per_worker.get(worker, list())
296             x.append(bundle_latency)
297             self.finished_bundle_timings_per_worker[worker] = x
298             self.finished_bundle_timings.append(bundle_latency)
299
300     def total_in_flight(self) -> int:
301         assert self.lock.locked()
302         total_in_flight = 0
303         for worker in self.known_workers:
304             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
305         return total_in_flight
306
307     def total_idle(self) -> int:
308         assert self.lock.locked()
309         return self.worker_count - self.total_in_flight()
310
311     def __repr__(self):
312         assert self.lock.locked()
313         ts = time.time()
314         total_finished = len(self.finished_bundle_timings)
315         total_in_flight = self.total_in_flight()
316         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
317         qall = None
318         if len(self.finished_bundle_timings) > 1:
319             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
320             ret += (
321                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
322                 f'✅={total_finished}/{self.total_bundles_submitted}, '
323                 f'💻n={total_in_flight}/{self.worker_count}\n'
324             )
325         else:
326             ret += (
327                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
328                 f'💻n={total_in_flight}/{self.worker_count}\n'
329             )
330
331         for worker in self.known_workers:
332             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
333             timings = self.finished_bundle_timings_per_worker.get(worker, [])
334             count = len(timings)
335             qworker = None
336             if count > 1:
337                 qworker = numpy.quantile(timings, [0.5, 0.95])
338                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
339             else:
340                 ret += '\n'
341             if count > 0:
342                 ret += f'    ...finished {count} total bundle(s) so far\n'
343             in_flight = len(self.in_flight_bundles_by_worker[worker])
344             if in_flight > 0:
345                 ret += f'    ...{in_flight} bundles currently in flight:\n'
346                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
347                     details = self.bundle_details_by_uuid.get(
348                         bundle_uuid,
349                         None
350                     )
351                     pid = str(details.pid) if details is not None else "TBD"
352                     sec = ts - self.start_per_bundle[bundle_uuid]
353                     ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
354                     if qworker is not None:
355                         if sec > qworker[1]:
356                             ret += f'{bg("red")}>💻p95{reset()} '
357                         elif sec > qworker[0]:
358                             ret += f'{fg("red")}>💻p50{reset()} '
359                     if qall is not None:
360                         if sec > qall[1] * 1.5:
361                             ret += f'{bg("red")}!!!{reset()}'
362                             if details is not None:
363                                 logger.debug(f'Flagging {details.uuid} for another backup')
364                                 details.super_slow = True
365                         elif sec > qall[1]:
366                             ret += f'{bg("red")}>∀p95{reset()} '
367                             if details is not None:
368                                 logger.debug(f'Flagging {details.uuid} for a backup')
369                                 details.too_slow = True
370                         elif sec > qall[0]:
371                             ret += f'{fg("red")}>∀p50{reset()}'
372                     ret += '\n'
373         return ret
374
375     def periodic_dump(self, total_bundles_submitted: int) -> None:
376         assert self.lock.locked()
377         self.total_bundles_submitted = total_bundles_submitted
378         ts = time.time()
379         if (
380                 self.last_periodic_dump is None
381                 or ts - self.last_periodic_dump > 5.0
382         ):
383             print(self)
384             self.last_periodic_dump = ts
385
386
387 class RemoteWorkerSelectionPolicy(ABC):
388     def register_worker_pool(self, workers):
389         random.seed()
390         self.workers = workers
391
392     @abstractmethod
393     def is_worker_available(self) -> bool:
394         pass
395
396     @abstractmethod
397     def acquire_worker(
398             self,
399             machine_to_avoid = None
400     ) -> Optional[RemoteWorkerRecord]:
401         pass
402
403
404 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
405     def is_worker_available(self) -> bool:
406         for worker in self.workers:
407             if worker.count > 0:
408                 return True
409         return False
410
411     def acquire_worker(
412             self,
413             machine_to_avoid = None
414     ) -> Optional[RemoteWorkerRecord]:
415         grabbag = []
416         for worker in self.workers:
417             for x in range(0, worker.count):
418                 for y in range(0, worker.weight):
419                     grabbag.append(worker)
420
421         for _ in range(0, 5):
422             random.shuffle(grabbag)
423             worker = grabbag[0]
424             if worker.machine != machine_to_avoid or _ > 2:
425                 if worker.count > 0:
426                     worker.count -= 1
427                     logger.debug(f'Selected worker {worker}')
428                     return worker
429         logger.warning("Couldn't find a worker; go fish.")
430         return None
431
432
433 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
434     def __init__(self) -> None:
435         self.index = 0
436
437     def is_worker_available(self) -> bool:
438         for worker in self.workers:
439             if worker.count > 0:
440                 return True
441         return False
442
443     def acquire_worker(
444             self,
445             machine_to_avoid: str = None
446     ) -> Optional[RemoteWorkerRecord]:
447         x = self.index
448         while True:
449             worker = self.workers[x]
450             if worker.count > 0:
451                 worker.count -= 1
452                 x += 1
453                 if x >= len(self.workers):
454                     x = 0
455                 self.index = x
456                 logger.debug(f'Selected worker {worker}')
457                 return worker
458             x += 1
459             if x >= len(self.workers):
460                 x = 0
461             if x == self.index:
462                 logger.warning("Couldn't find a worker; go fish.")
463                 return None
464
465
466 class RemoteExecutor(BaseExecutor):
467     def __init__(self,
468                  workers: List[RemoteWorkerRecord],
469                  policy: RemoteWorkerSelectionPolicy) -> None:
470         super().__init__()
471         self.workers = workers
472         self.worker_count = 0
473         for worker in self.workers:
474             self.worker_count += worker.count
475         if self.worker_count <= 0:
476             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
477             logger.critical(msg)
478             raise Exception(msg)
479         self.policy = policy
480         self.policy.register_worker_pool(self.workers)
481         self.cv = threading.Condition()
482         self._helper_executor = fut.ThreadPoolExecutor(
483             thread_name_prefix="remote_executor_helper",
484             max_workers=self.worker_count,
485         )
486         self.status = RemoteExecutorStatus(self.worker_count)
487         self.total_bundles_submitted = 0
488         logger.debug(
489             f'Creating remote processpool with {self.worker_count} remote endpoints.'
490         )
491
492     def is_worker_available(self) -> bool:
493         return self.policy.is_worker_available()
494
495     def acquire_worker(
496             self,
497             machine_to_avoid: str = None
498     ) -> Optional[RemoteWorkerRecord]:
499         return self.policy.acquire_worker(machine_to_avoid)
500
501     def find_available_worker_or_block(
502             self,
503             machine_to_avoid: str = None
504     ) -> RemoteWorkerRecord:
505         with self.cv:
506             while not self.is_worker_available():
507                 self.cv.wait()
508             worker = self.acquire_worker(machine_to_avoid)
509             if worker is not None:
510                 return worker
511         msg = "We should never reach this point in the code"
512         logger.critical(msg)
513         raise Exception(msg)
514
515     def release_worker(self, worker: RemoteWorkerRecord) -> None:
516         logger.debug(f'Released worker {worker}')
517         with self.cv:
518             worker.count += 1
519             self.cv.notify()
520
521     def heartbeat(self) -> None:
522         with self.status.lock:
523             # Regular progress report
524             self.status.periodic_dump(self.total_bundles_submitted)
525
526             # Look for bundles to reschedule
527             if len(self.status.finished_bundle_timings) > 7:
528                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
529                     for uuid in bundle_uuids:
530                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
531                         if (
532                                 bundle is not None and
533                                 bundle.too_slow and
534                                 bundle.src_bundle is None and
535                                 config.config['executors_schedule_remote_backups']
536                         ):
537                             self.consider_backup_for_bundle(bundle)
538
539     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
540         assert self.status.lock.locked()
541         if (
542             bundle.too_slow
543             and len(bundle.backup_bundles) == 0       # one backup per
544         ):
545             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
546             logger.debug(msg)
547             self.schedule_backup_for_bundle(bundle)
548             return
549         elif (
550                 bundle.super_slow
551                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
552                 and self.status.total_idle() > 4
553         ):
554             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
555             logger.debug(msg)
556             self.schedule_backup_for_bundle(bundle)
557             return
558
559     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
560         with self.status.lock:
561             if bundle.is_cancelled.wait(timeout=0.0):
562                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
563                 bundle.was_cancelled = True
564                 return True
565         return False
566
567     def launch(self, bundle: BundleDetails) -> Any:
568         """Find a worker for bundle or block until one is available."""
569         uuid = bundle.uuid
570         hostname = bundle.hostname
571         avoid_machine = None
572
573         # Try not to schedule a backup on the same host as the original.
574         if bundle.src_bundle is not None:
575             avoid_machine = bundle.src_bundle.machine
576         worker = None
577         while worker is None:
578             worker = self.find_available_worker_or_block(avoid_machine)
579         bundle.worker = worker
580         machine = bundle.machine = worker.machine
581         username = bundle.username = worker.username
582         self.status.record_acquire_worker(worker, uuid)
583         logger.debug(f'Running bundle {uuid} on {worker}...')
584
585         # Before we do any work, make sure the bundle is still viable.
586         if self.check_if_cancelled(bundle):
587             try:
588                 return self.post_launch_work(bundle)
589             except Exception as e:
590                 logger.exception(e)
591                 logger.info(f"Bundle {uuid} seems to have failed?!")
592                 if bundle.failure_count < config.config['executors_max_bundle_failures']:
593                     return self.launch(bundle)
594                 else:
595                     logger.info(f"Bundle {uuid} is poison, giving up on it.")
596                     return None
597
598         # Send input to machine if it's not local.
599         if hostname not in machine:
600             cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
601             logger.info(f"Copying work to {worker} via {cmd}")
602             exec_utils.run_silently(cmd)
603
604         # Do it.
605         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
606                f'"source remote-execution/bin/activate &&'
607                f' /home/scott/lib/python_modules/remote_worker.py'
608                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
609         p = exec_utils.cmd_in_background(cmd, silent=True)
610         bundle.pid = pid = p.pid
611         logger.info(f"Running {cmd} in the background as process {pid}")
612
613         while True:
614             try:
615                 p.wait(timeout=0.5)
616             except subprocess.TimeoutExpired:
617                 self.heartbeat()
618
619                 # Both source and backup bundles can be cancelled by
620                 # the other depending on which finishes first.
621                 if self.check_if_cancelled(bundle):
622                     p.terminate()
623                     break
624             else:
625                 logger.debug(
626                     f"{pid}/{bundle.uuid} has finished its work normally."
627                 )
628                 break
629
630         try:
631             return self.post_launch_work(bundle)
632         except Exception as e:
633             logger.exception(e)
634             logger.info(f"Bundle {uuid} seems to have failed?!")
635             if bundle.failure_count < config.config['executors_max_bundle_failures']:
636                 return self.launch(bundle)
637             logger.info(f"Bundle {uuid} is poison, giving up on it.")
638             return None
639
640     def post_launch_work(self, bundle: BundleDetails) -> Any:
641         with self.status.lock:
642             is_original = bundle.src_bundle is None
643             was_cancelled = bundle.was_cancelled
644             username = bundle.username
645             machine = bundle.machine
646             result_file = bundle.result_file
647             code_file = bundle.code_file
648
649             # Whether original or backup, if we finished first we must
650             # fetch the results if the computation happened on a
651             # remote machine.
652             if not was_cancelled:
653                 assert bundle.machine is not None
654                 if bundle.hostname not in bundle.machine:
655                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
656                     logger.info(
657                         f"Fetching results from {username}@{machine} via {cmd}"
658                     )
659                     try:
660                         exec_utils.run_silently(cmd)
661                     except subprocess.CalledProcessError:
662                         pass
663                     exec_utils.run_silently(f'{SSH} {username}@{machine}'
664                                             f' "/bin/rm -f {code_file} {result_file}"')
665             bundle.end_ts = time.time()
666             assert bundle.worker is not None
667             self.status.record_release_worker_already_locked(
668                 bundle.worker,
669                 bundle.uuid,
670                 was_cancelled
671             )
672             if not was_cancelled:
673                 dur = bundle.end_ts - bundle.start_ts
674                 HIST.add_item(dur)
675
676         # Original or not, the results should be back on the local
677         # machine.  Are they?
678         if not os.path.exists(result_file):
679             msg = f'{result_file} unexpectedly missing, wtf?!'
680             logger.critical(msg)
681             bundle.failure_count += 1
682             self.release_worker(bundle.worker)
683             raise Exception(msg)
684
685         # Only the original worker should unpickle the file contents
686         # though since it's the only one whose result matters.
687         if is_original:
688             logger.debug(f"Unpickling {result_file}.")
689             try:
690                 with open(f'{result_file}', 'rb') as rb:
691                     serialized = rb.read()
692                 result = cloudpickle.loads(serialized)
693             except Exception as e:
694                 msg = f'Failed to load {result_file}'
695                 logger.critical(msg)
696                 bundle.failure_count += 1
697                 self.release_worker(bundle.worker)
698                 raise Exception(e)
699             os.remove(f'{result_file}')
700             os.remove(f'{code_file}')
701
702             # Notify any backups that the original is done so they
703             # should stop ASAP.  Do this whether or not we
704             # finished first since there could be more than one
705             # backup.
706             if bundle.backup_bundles is not None:
707                 for backup in bundle.backup_bundles:
708                     logger.debug(
709                         f'Notifying backup {backup.uuid} that it is cancelled'
710                     )
711                     backup.is_cancelled.set()
712
713         # This is a backup.
714         else:
715             # Backup results don't matter, they just need to leave the
716             # result file in the right place for their original to
717             # read later.
718             result = None
719
720             # Tell the original to stop if we finished first.
721             if not was_cancelled:
722                 logger.debug(
723                     f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
724                 )
725                 bundle.src_bundle.is_cancelled.set()
726
727         assert bundle.worker is not None
728         self.release_worker(bundle.worker)
729         return result
730
731     def create_original_bundle(self, pickle):
732         uuid = string_utils.generate_uuid(as_hex=True)
733         code_file = f'/tmp/{uuid}.code.bin'
734         result_file = f'/tmp/{uuid}.result.bin'
735
736         logger.debug(f'Writing pickled code to {code_file}')
737         with open(f'{code_file}', 'wb') as wb:
738             wb.write(pickle)
739
740         bundle = BundleDetails(
741             pickled_code = pickle,
742             uuid = uuid,
743             worker = None,
744             username = None,
745             machine = None,
746             hostname = platform.node(),
747             code_file = code_file,
748             result_file = result_file,
749             pid = 0,
750             start_ts = time.time(),
751             end_ts = 0.0,
752             too_slow = False,
753             super_slow = False,
754             src_bundle = None,
755             is_cancelled = threading.Event(),
756             was_cancelled = False,
757             backup_bundles = [],
758             failure_count = 0,
759         )
760         self.status.record_bundle_details(bundle)
761         logger.debug(f'Created original bundle {uuid}')
762         return bundle
763
764     def create_backup_bundle(self, src_bundle: BundleDetails):
765         assert src_bundle.backup_bundles is not None
766         n = len(src_bundle.backup_bundles)
767         uuid = src_bundle.uuid + f'_backup#{n}'
768
769         backup_bundle = BundleDetails(
770             pickled_code = src_bundle.pickled_code,
771             uuid = uuid,
772             worker = None,
773             username = None,
774             machine = None,
775             hostname = src_bundle.hostname,
776             code_file = src_bundle.code_file,
777             result_file = src_bundle.result_file,
778             pid = 0,
779             start_ts = time.time(),
780             end_ts = 0.0,
781             too_slow = False,
782             super_slow = False,
783             src_bundle = src_bundle,
784             is_cancelled = threading.Event(),
785             was_cancelled = False,
786             backup_bundles = None,    # backup backups not allowed
787             failure_count = 0,
788         )
789         src_bundle.backup_bundles.append(backup_bundle)
790         self.status.record_bundle_details_already_locked(backup_bundle)
791         logger.debug(f'Created backup bundle {uuid}')
792         return backup_bundle
793
794     def schedule_backup_for_bundle(self,
795                                    src_bundle: BundleDetails):
796         assert self.status.lock.locked()
797         backup_bundle = self.create_backup_bundle(src_bundle)
798         logger.debug(
799             f'Scheduling backup bundle {backup_bundle.uuid} for execution'
800         )
801         self._helper_executor.submit(self.launch, backup_bundle)
802
803         # Results from backups don't matter; if they finish first
804         # they will move the result_file to this machine and let
805         # the original pick them up and unpickle them.
806
807     def submit(self,
808                function: Callable,
809                *args,
810                **kwargs) -> fut.Future:
811         pickle = make_cloud_pickle(function, *args, **kwargs)
812         bundle = self.create_original_bundle(pickle)
813         self.total_bundles_submitted += 1
814         logger.debug(
815             f'Submitted work to remote executor; {self.total_bundles_submitted} items now submitted'
816         )
817         return self._helper_executor.submit(self.launch, bundle)
818
819     def shutdown(self, wait=True) -> None:
820         self._helper_executor.shutdown(wait)
821         print(HIST)
822
823
824 @singleton
825 class DefaultExecutors(object):
826     def __init__(self):
827         self.thread_executor: Optional[ThreadExecutor] = None
828         self.process_executor: Optional[ProcessExecutor] = None
829         self.remote_executor: Optional[RemoteExecutor] = None
830
831     def ping(self, host) -> bool:
832         command = ['ping', '-c', '1', host]
833         return subprocess.call(
834             command,
835             stdout=subprocess.DEVNULL,
836             stderr=subprocess.DEVNULL,
837         ) == 0
838
839     def thread_pool(self) -> ThreadExecutor:
840         if self.thread_executor is None:
841             self.thread_executor = ThreadExecutor()
842         return self.thread_executor
843
844     def process_pool(self) -> ProcessExecutor:
845         if self.process_executor is None:
846             self.process_executor = ProcessExecutor()
847         return self.process_executor
848
849     def remote_pool(self) -> RemoteExecutor:
850         if self.remote_executor is None:
851             pool: List[RemoteWorkerRecord] = []
852             if self.ping('cheetah.house'):
853                 pool.append(
854                     RemoteWorkerRecord(
855                         username = 'scott',
856                         machine = 'cheetah.house',
857                         weight = 12,
858                         count = 4,
859                     ),
860                 )
861             if self.ping('video.house'):
862                 pool.append(
863                     RemoteWorkerRecord(
864                         username = 'scott',
865                         machine = 'video.house',
866                         weight = 1,
867                         count = 4,
868                     ),
869                 )
870             if self.ping('wannabe.house'):
871                 pool.append(
872                     RemoteWorkerRecord(
873                         username = 'scott',
874                         machine = 'wannabe.house',
875                         weight = 2,
876                         count = 4,
877                     ),
878                 )
879             if self.ping('meerkat.cabin'):
880                 pool.append(
881                     RemoteWorkerRecord(
882                         username = 'scott',
883                         machine = 'meerkat.cabin',
884                         weight = 7,
885                         count = 2,
886                     ),
887                 )
888             if self.ping('backup.house'):
889                 pool.append(
890                     RemoteWorkerRecord(
891                         username = 'scott',
892                         machine = 'backup.house',
893                         weight = 1,
894                         count = 4,
895                     ),
896                 )
897             if self.ping('puma.cabin'):
898                 pool.append(
899                     RemoteWorkerRecord(
900                         username = 'scott',
901                         machine = 'puma.cabin',
902                         weight = 12,
903                         count = 4,
904                     ),
905                 )
906             policy = WeightedRandomRemoteWorkerSelectionPolicy()
907             policy.register_worker_pool(pool)
908             self.remote_executor = RemoteExecutor(pool, policy)
909         return self.remote_executor