Initial revision
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20
21 from ansi import bg, fg, underline, reset
22 import argparse_utils
23 import config
24 import exec_utils
25 from decorator_utils import singleton
26 import histogram
27 import string_utils
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55
56 rsync = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
57 ssh = 'ssh -oForwardX11=no'
58
59
60 hist = histogram.SimpleHistogram(
61     histogram.SimpleHistogram.n_evenly_spaced_buckets(
62         int(0), int(500), 25
63     )
64 )
65
66
67 def run_local_bundle(fun, *args, **kwargs):
68     logger.debug(f"Running local bundle at {fun.__name__}")
69     start = time.time()
70     result = fun(*args, **kwargs)
71     end = time.time()
72     duration = end - start
73     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
74     hist.add_item(duration)
75     return result
76
77
78 def run_cloud_pickle(pickle):
79     fun, args, kwargs = cloudpickle.loads(pickle)
80     logger.debug(f"Running pickled bundle at {fun.__name__}")
81     start = time.time()
82     result = fun(*args, **kwargs)
83     end = time.time()
84     duration = end - start
85     logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
86     return result
87
88
89 def make_cloud_pickle(fun, *args, **kwargs):
90     logger.info(f"Making cloudpickled bundle at {fun.__name__}")
91     return cloudpickle.dumps((fun, args, kwargs))
92
93
94 class BaseExecutor(ABC):
95     def __init__(self):
96         pass
97
98     @abstractmethod
99     def submit(self,
100                function: Callable,
101                *args,
102                **kwargs) -> fut.Future:
103         pass
104
105     @abstractmethod
106     def shutdown(self,
107                  wait: bool = True) -> None:
108         pass
109
110
111 class ThreadExecutor(BaseExecutor):
112     def __init__(self,
113                  max_workers: Optional[int] = None):
114         super().__init__()
115         workers = None
116         if max_workers is not None:
117             workers = max_workers
118         elif 'executors_threadpool_size' in config.config:
119             workers = config.config['executors_threadpool_size']
120         logger.debug(f'Creating threadpool executor with {workers} workers')
121         self._thread_pool_executor = fut.ThreadPoolExecutor(
122             max_workers=workers,
123             thread_name_prefix="thread_executor_helper"
124         )
125         self.job_count = 0
126
127     def submit(self,
128                function: Callable,
129                *args,
130                **kwargs) -> fut.Future:
131         self.job_count += 1
132         logger.debug(
133             f'Submitted work to threadpool; there are now {self.job_count} items.'
134         )
135         newargs = []
136         newargs.append(function)
137         for arg in args:
138             newargs.append(arg)
139         return self._thread_pool_executor.submit(
140             run_local_bundle,
141             *newargs,
142             **kwargs)
143
144     def shutdown(self,
145                  wait = True) -> None:
146         logger.debug("Shutting down threadpool executor.")
147         print(hist)
148         self._thread_pool_executor.shutdown(wait)
149
150
151 class ProcessExecutor(BaseExecutor):
152     def __init__(self,
153                  max_workers=None):
154         super().__init__()
155         workers = None
156         if max_workers is not None:
157             workers = max_workers
158         elif 'executors_processpool_size' in config.config:
159             workers = config.config['executors_processpool_size']
160         logger.debug(f'Creating processpool executor with {workers} workers.')
161         self._process_executor = fut.ProcessPoolExecutor(
162             max_workers=workers,
163         )
164         self.job_count = 0
165
166     def submit(self,
167                function: Callable,
168                *args,
169                **kwargs) -> fut.Future:
170         # Bundle it up before submitting because pickle sucks.
171         pickle = make_cloud_pickle(function, *args, **kwargs)
172         self.job_count += 1
173         logger.debug(
174             f'Submitting work to processpool executor; there are now {self.job_count} items.'
175         )
176         return self._process_executor.submit(run_cloud_pickle, pickle)
177
178     def shutdown(self, wait=True) -> None:
179         logger.debug('Shutting down processpool executor')
180         print(hist)
181         self._process_executor.shutdown(wait)
182
183
184 @dataclass
185 class RemoteWorkerRecord:
186     username: str
187     machine: str
188     weight: int
189     count: int
190
191     def __hash__(self):
192         return hash((self.username, self.machine))
193
194     def __repr__(self):
195         return f'{self.username}@{self.machine}'
196
197
198 @dataclass
199 class BundleDetails:
200     pickled_code: bytes
201     uuid: str
202     worker: Optional[RemoteWorkerRecord]
203     username: Optional[str]
204     machine: Optional[str]
205     hostname: str
206     code_file: str
207     result_file: str
208     pid: int
209     start_ts: float
210     end_ts: float
211     too_slow: bool
212     super_slow: bool
213     src_bundle: BundleDetails
214     is_cancelled: threading.Event
215     was_cancelled: bool
216     backup_bundles: Optional[List[BundleDetails]]
217
218
219 class RemoteExecutorStatus:
220     def __init__(self, total_worker_count: int) -> None:
221         self.worker_count = total_worker_count
222         self.known_workers: Set[RemoteWorkerRecord] = set()
223         self.start_per_bundle: Dict[str, float] = defaultdict(float)
224         self.end_per_bundle: Dict[str, float] = defaultdict(float)
225         self.finished_bundle_timings_per_worker: Dict[
226             RemoteWorkerRecord,
227             List[float]
228         ] = {}
229         self.in_flight_bundles_by_worker: Dict[
230             RemoteWorkerRecord,
231             Set[str]
232         ] = {}
233         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
234         self.finished_bundle_timings: List[float] = []
235         self.last_periodic_dump: Optional[float] = None
236         self.total_bundles_submitted = 0
237
238         # Protects reads and modification using self.  Also used
239         # as a memory fence for modifications to bundle.
240         self.lock = threading.Lock()
241
242     def record_acquire_worker(
243             self,
244             worker: RemoteWorkerRecord,
245             uuid: str
246     ) -> None:
247         with self.lock:
248             self.record_acquire_worker_already_locked(
249                 worker,
250                 uuid
251             )
252
253     def record_acquire_worker_already_locked(
254             self,
255             worker: RemoteWorkerRecord,
256             uuid: str
257     ) -> None:
258         assert self.lock.locked()
259         self.known_workers.add(worker)
260         self.start_per_bundle[uuid] = time.time()
261         x = self.in_flight_bundles_by_worker.get(worker, set())
262         x.add(uuid)
263         self.in_flight_bundles_by_worker[worker] = x
264
265     def record_bundle_details(
266             self,
267             details: BundleDetails) -> None:
268         with self.lock:
269             self.record_bundle_details_already_locked(details)
270
271     def record_bundle_details_already_locked(
272             self,
273             details: BundleDetails) -> None:
274         assert self.lock.locked()
275         self.bundle_details_by_uuid[details.uuid] = details
276
277     def record_release_worker_already_locked(
278             self,
279             worker: RemoteWorkerRecord,
280             uuid: str,
281             was_cancelled: bool,
282     ) -> None:
283         assert self.lock.locked()
284         ts = time.time()
285         self.end_per_bundle[uuid] = ts
286         self.in_flight_bundles_by_worker[worker].remove(uuid)
287         if not was_cancelled:
288             bundle_latency = ts - self.start_per_bundle[uuid]
289             x = self.finished_bundle_timings_per_worker.get(worker, list())
290             x.append(bundle_latency)
291             self.finished_bundle_timings_per_worker[worker] = x
292             self.finished_bundle_timings.append(bundle_latency)
293
294     def total_in_flight(self) -> int:
295         assert self.lock.locked()
296         total_in_flight = 0
297         for worker in self.known_workers:
298             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
299         return total_in_flight
300
301     def total_idle(self) -> int:
302         assert self.lock.locked()
303         return self.worker_count - self.total_in_flight()
304
305     def __repr__(self):
306         assert self.lock.locked()
307         ts = time.time()
308         total_finished = len(self.finished_bundle_timings)
309         total_in_flight = self.total_in_flight()
310         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
311         qall = None
312         if len(self.finished_bundle_timings) > 1:
313             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
314             ret += (
315                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
316                 f'✅={total_finished}/{self.total_bundles_submitted}, '
317                 f'💻n={total_in_flight}/{self.worker_count}\n'
318             )
319         else:
320             ret += (
321                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
322                 f'💻n={total_in_flight}/{self.worker_count}\n'
323             )
324
325         for worker in self.known_workers:
326             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
327             timings = self.finished_bundle_timings_per_worker.get(worker, [])
328             count = len(timings)
329             qworker = None
330             if count > 1:
331                 qworker = numpy.quantile(timings, [0.5, 0.95])
332                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
333             else:
334                 ret += '\n'
335             if count > 0:
336                 ret += f'    ...finished {count} total bundle(s) so far\n'
337             in_flight = len(self.in_flight_bundles_by_worker[worker])
338             if in_flight > 0:
339                 ret += f'    ...{in_flight} bundles currently in flight:\n'
340                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
341                     details = self.bundle_details_by_uuid.get(
342                         bundle_uuid,
343                         None
344                     )
345                     pid = str(details.pid) if details is not None else "TBD"
346                     sec = ts - self.start_per_bundle[bundle_uuid]
347                     ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
348                     if qworker is not None:
349                         if sec > qworker[1]:
350                             ret += f'{bg("red")}>💻p95{reset()} '
351                         elif sec > qworker[0]:
352                             ret += f'{fg("red")}>💻p50{reset()} '
353                     if qall is not None:
354                         if sec > qall[1] * 1.5:
355                             ret += f'{bg("red")}!!!{reset()}'
356                             if details is not None:
357                                 logger.debug(f'Flagging {details.uuid} for another backup')
358                                 details.super_slow = True
359                         elif sec > qall[1]:
360                             ret += f'{bg("red")}>∀p95{reset()} '
361                             if details is not None:
362                                 logger.debug(f'Flagging {details.uuid} for a backup')
363                                 details.too_slow = True
364                         elif sec > qall[0]:
365                             ret += f'{fg("red")}>∀p50{reset()}'
366                     ret += '\n'
367         return ret
368
369     def periodic_dump(self, total_bundles_submitted: int) -> None:
370         assert self.lock.locked()
371         self.total_bundles_submitted = total_bundles_submitted
372         ts = time.time()
373         if (
374                 self.last_periodic_dump is None
375                 or ts - self.last_periodic_dump > 5.0
376         ):
377             print(self)
378             self.last_periodic_dump = ts
379
380
381 class RemoteWorkerSelectionPolicy(ABC):
382     def register_worker_pool(self, workers):
383         random.seed()
384         self.workers = workers
385
386     @abstractmethod
387     def is_worker_available(self) -> bool:
388         pass
389
390     @abstractmethod
391     def acquire_worker(
392             self,
393             machine_to_avoid = None
394     ) -> Optional[RemoteWorkerRecord]:
395         pass
396
397
398 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
399     def is_worker_available(self) -> bool:
400         for worker in self.workers:
401             if worker.count > 0:
402                 return True
403         return False
404
405     def acquire_worker(
406             self,
407             machine_to_avoid = None
408     ) -> Optional[RemoteWorkerRecord]:
409         grabbag = []
410         for worker in self.workers:
411             for x in range(0, worker.count):
412                 for y in range(0, worker.weight):
413                     grabbag.append(worker)
414
415         for _ in range(0, 5):
416             random.shuffle(grabbag)
417             worker = grabbag[0]
418             if worker.machine != machine_to_avoid or _ > 2:
419                 if worker.count > 0:
420                     worker.count -= 1
421                     logger.debug(f'Selected worker {worker}')
422                     return worker
423         logger.warning("Couldn't find a worker; go fish.")
424         return None
425
426
427 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
428     def __init__(self) -> None:
429         self.index = 0
430
431     def is_worker_available(self) -> bool:
432         for worker in self.workers:
433             if worker.count > 0:
434                 return True
435         return False
436
437     def acquire_worker(
438             self,
439             machine_to_avoid: str = None
440     ) -> Optional[RemoteWorkerRecord]:
441         x = self.index
442         while True:
443             worker = self.workers[x]
444             if worker.count > 0:
445                 worker.count -= 1
446                 x += 1
447                 if x >= len(self.workers):
448                     x = 0
449                 self.index = x
450                 logger.debug(f'Selected worker {worker}')
451                 return worker
452             x += 1
453             if x >= len(self.workers):
454                 x = 0
455             if x == self.index:
456                 logger.warning("Couldn't find a worker; go fish.")
457                 return None
458
459
460 class RemoteExecutor(BaseExecutor):
461     def __init__(self,
462                  workers: List[RemoteWorkerRecord],
463                  policy: RemoteWorkerSelectionPolicy) -> None:
464         super().__init__()
465         self.workers = workers
466         self.worker_count = 0
467         for worker in self.workers:
468             self.worker_count += worker.count
469         if self.worker_count <= 0:
470             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
471             logger.critical(msg)
472             raise Exception(msg)
473         self.policy = policy
474         self.policy.register_worker_pool(self.workers)
475         self.cv = threading.Condition()
476         self._helper_executor = fut.ThreadPoolExecutor(
477             thread_name_prefix="remote_executor_helper",
478             max_workers=self.worker_count,
479         )
480         self.status = RemoteExecutorStatus(self.worker_count)
481         self.total_bundles_submitted = 0
482         logger.debug(
483             f'Creating remote processpool with {self.worker_count} remote endpoints.'
484         )
485
486     def is_worker_available(self) -> bool:
487         return self.policy.is_worker_available()
488
489     def acquire_worker(
490             self,
491             machine_to_avoid: str = None
492     ) -> Optional[RemoteWorkerRecord]:
493         return self.policy.acquire_worker(machine_to_avoid)
494
495     def find_available_worker_or_block(
496             self,
497             machine_to_avoid: str = None
498     ) -> RemoteWorkerRecord:
499         with self.cv:
500             while not self.is_worker_available():
501                 self.cv.wait()
502             worker = self.acquire_worker(machine_to_avoid)
503             if worker is not None:
504                 return worker
505         msg = "We should never reach this point in the code"
506         logger.critical(msg)
507         raise Exception(msg)
508
509     def release_worker(self, worker: RemoteWorkerRecord) -> None:
510         logger.debug(f'Released worker {worker}')
511         with self.cv:
512             worker.count += 1
513             self.cv.notify()
514
515     def heartbeat(self) -> None:
516         with self.status.lock:
517             # Regular progress report
518             self.status.periodic_dump(self.total_bundles_submitted)
519
520             # Look for bundles to reschedule
521             if len(self.status.finished_bundle_timings) > 7:
522                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
523                     for uuid in bundle_uuids:
524                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
525                         if (
526                                 bundle is not None and
527                                 bundle.too_slow and
528                                 bundle.src_bundle is None and
529                                 config.config['executors_schedule_remote_backups']
530                         ):
531                             self.consider_backup_for_bundle(bundle)
532
533     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
534         assert self.status.lock.locked()
535         if (
536             bundle.too_slow
537             and len(bundle.backup_bundles) == 0       # one backup per
538         ):
539             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
540             logger.debug(msg)
541             self.schedule_backup_for_bundle(bundle)
542             return
543         elif (
544                 bundle.super_slow
545                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
546                 and self.status.total_idle() > 4
547         ):
548             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
549             logger.debug(msg)
550             self.schedule_backup_for_bundle(bundle)
551             return
552
553     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
554         with self.status.lock:
555             if bundle.is_cancelled.wait(timeout=0.0):
556                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
557                 bundle.was_cancelled = True
558                 return True
559         return False
560
561     def launch(self, bundle: BundleDetails) -> Any:
562         # Find a worker for bundle or block until one is available.
563         uuid = bundle.uuid
564         hostname = bundle.hostname
565         avoid_machine = None
566         if bundle.src_bundle is not None:
567             avoid_machine = bundle.src_bundle.machine
568         worker = None
569         while worker is None:
570             worker = self.find_available_worker_or_block(avoid_machine)
571         bundle.worker = worker
572         machine = bundle.machine = worker.machine
573         username = bundle.username = worker.username
574         self.status.record_acquire_worker(worker, uuid)
575         logger.debug(f'Running bundle {uuid} on {worker}...')
576
577         # Before we do work, make sure it's still viable.
578         if self.check_if_cancelled(bundle):
579             return self.post_launch_work(bundle)
580
581         # Send input to machine if it's not local.
582         if hostname not in machine:
583             cmd = f'{rsync} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
584             logger.debug(f"Copying work to {worker} via {cmd}")
585             exec_utils.run_silently(cmd)
586
587         # Before we do more work, make sure it's still viable.
588         if self.check_if_cancelled(bundle):
589             return self.post_launch_work(bundle)
590
591         # Fucking Apple has a python3 binary in /usr/sbin that is not
592         # the one we want and is protected by the OS so make sure that
593         # /usr/local/bin is early in the path.
594         cmd = (f'{ssh} {bundle.username}@{bundle.machine} '
595                f'"export PATH=/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/home/scott/bin:/home/scott/.local/bin; /home/scott/lib/python_modules/remote_worker.py'
596                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
597         p = exec_utils.cmd_in_background(cmd, silent=True)
598         bundle.pid = pid = p.pid
599         logger.debug(f"Running {cmd} in the background as process {pid}")
600
601         while True:
602             try:
603                 p.wait(timeout=0.5)
604             except subprocess.TimeoutExpired:
605                 self.heartbeat()
606
607                 # Both source and backup bundles can be cancelled by
608                 # the other depending on which finishes first.
609                 if self.check_if_cancelled(bundle):
610                     p.terminate()
611                     break
612             else:
613                 logger.debug(
614                     f"{pid}/{bundle.uuid} has finished its work normally."
615                 )
616                 break
617         return self.post_launch_work(bundle)
618
619     def post_launch_work(self, bundle: BundleDetails) -> Any:
620         with self.status.lock:
621             is_original = bundle.src_bundle is None
622             was_cancelled = bundle.was_cancelled
623             username = bundle.username
624             machine = bundle.machine
625             result_file = bundle.result_file
626             code_file = bundle.code_file
627
628             # Whether original or backup, if we finished first we must
629             # fetch the results if the computation happened on a
630             # remote machine.
631             if not was_cancelled:
632                 assert bundle.machine is not None
633                 if bundle.hostname not in bundle.machine:
634                     cmd = f'{rsync} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
635                     logger.debug(
636                         f"Fetching results from {username}@{machine} via {cmd}"
637                     )
638                     try:
639                         exec_utils.run_silently(cmd)
640                     except subprocess.CalledProcessError:
641                         pass
642                     exec_utils.run_silently(f'{ssh} {username}@{machine}'
643                                             f' "/bin/rm -f {code_file} {result_file}"')
644             bundle.end_ts = time.time()
645             assert bundle.worker is not None
646             self.status.record_release_worker_already_locked(
647                 bundle.worker,
648                 bundle.uuid,
649                 was_cancelled
650             )
651             if not was_cancelled:
652                 dur = bundle.end_ts - bundle.start_ts
653                 hist.add_item(dur)
654
655         # Only the original worker should unpickle the file contents
656         # though since it's the only one whose result matters.
657         if is_original:
658             logger.debug(f"Unpickling {result_file}.")
659             with open(f'{result_file}', 'rb') as rb:
660                 serialized = rb.read()
661             result = cloudpickle.loads(serialized)
662             os.remove(f'{result_file}')
663             os.remove(f'{code_file}')
664
665             # Notify any backups that the original is done so they
666             # should stop ASAP.  Do this whether or not we
667             # finished first since there could be more than one
668             # backup.
669             if bundle.backup_bundles is not None:
670                 for backup in bundle.backup_bundles:
671                     logger.debug(
672                         f'Notifying backup {backup.uuid} that it is cancelled'
673                     )
674                     backup.is_cancelled.set()
675
676         # This is a backup.
677         else:
678             # Backup results don't matter, they just need to leave the
679             # result file in the right place for their original to
680             # read later.
681             result = None
682
683             # Tell the original to stop if we finished first.
684             if not was_cancelled:
685                 logger.debug(
686                     f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
687                 )
688                 bundle.src_bundle.is_cancelled.set()
689
690         assert bundle.worker is not None
691         self.release_worker(bundle.worker)
692         return result
693
694     def create_original_bundle(self, pickle):
695         uuid = string_utils.generate_uuid(as_hex=True)
696         code_file = f'/tmp/{uuid}.code.bin'
697         result_file = f'/tmp/{uuid}.result.bin'
698
699         logger.debug(f'Writing pickled code to {code_file}')
700         with open(f'{code_file}', 'wb') as wb:
701             wb.write(pickle)
702
703         bundle = BundleDetails(
704             pickled_code = pickle,
705             uuid = uuid,
706             worker = None,
707             username = None,
708             machine = None,
709             hostname = platform.node(),
710             code_file = code_file,
711             result_file = result_file,
712             pid = 0,
713             start_ts = time.time(),
714             end_ts = 0.0,
715             too_slow = False,
716             super_slow = False,
717             src_bundle = None,
718             is_cancelled = threading.Event(),
719             was_cancelled = False,
720             backup_bundles = [],
721         )
722         self.status.record_bundle_details(bundle)
723         logger.debug(f'Created original bundle {uuid}')
724         return bundle
725
726     def create_backup_bundle(self, src_bundle: BundleDetails):
727         assert src_bundle.backup_bundles is not None
728         n = len(src_bundle.backup_bundles)
729         uuid = src_bundle.uuid + f'_backup#{n}'
730
731         backup_bundle = BundleDetails(
732             pickled_code = src_bundle.pickled_code,
733             uuid = uuid,
734             worker = None,
735             username = None,
736             machine = None,
737             hostname = src_bundle.hostname,
738             code_file = src_bundle.code_file,
739             result_file = src_bundle.result_file,
740             pid = 0,
741             start_ts = time.time(),
742             end_ts = 0.0,
743             too_slow = False,
744             super_slow = False,
745             src_bundle = src_bundle,
746             is_cancelled = threading.Event(),
747             was_cancelled = False,
748             backup_bundles = None,    # backup backups not allowed
749         )
750         src_bundle.backup_bundles.append(backup_bundle)
751         self.status.record_bundle_details_already_locked(backup_bundle)
752         logger.debug(f'Created backup bundle {uuid}')
753         return backup_bundle
754
755     def schedule_backup_for_bundle(self,
756                                    src_bundle: BundleDetails):
757         assert self.status.lock.locked()
758         backup_bundle = self.create_backup_bundle(src_bundle)
759         logger.debug(
760             f'Scheduling backup bundle {backup_bundle.uuid} for execution'
761         )
762         self._helper_executor.submit(self.launch, backup_bundle)
763
764         # Results from backups don't matter; if they finish first
765         # they will move the result_file to this machine and let
766         # the original pick them up and unpickle them.
767
768     def submit(self,
769                function: Callable,
770                *args,
771                **kwargs) -> fut.Future:
772         pickle = make_cloud_pickle(function, *args, **kwargs)
773         bundle = self.create_original_bundle(pickle)
774         self.total_bundles_submitted += 1
775         logger.debug(
776             f'Submitted work to remote executor; {self.total_bundles_submitted} items now submitted'
777         )
778         return self._helper_executor.submit(self.launch, bundle)
779
780     def shutdown(self, wait=True) -> None:
781         self._helper_executor.shutdown(wait)
782         print(hist)
783
784
785 @singleton
786 class DefaultExecutors(object):
787     def __init__(self):
788         self.thread_executor: Optional[ThreadExecutor] = None
789         self.process_executor: Optional[ProcessExecutor] = None
790         self.remote_executor: Optional[RemoteExecutor] = None
791
792     def ping(self, host) -> bool:
793         command = ['ping', '-c', '1', host]
794         return subprocess.call(
795             command,
796             stdout=subprocess.DEVNULL,
797             stderr=subprocess.DEVNULL,
798         ) == 0
799
800     def thread_pool(self) -> ThreadExecutor:
801         if self.thread_executor is None:
802             self.thread_executor = ThreadExecutor()
803         return self.thread_executor
804
805     def process_pool(self) -> ProcessExecutor:
806         if self.process_executor is None:
807             self.process_executor = ProcessExecutor()
808         return self.process_executor
809
810     def remote_pool(self) -> RemoteExecutor:
811         if self.remote_executor is None:
812             pool: List[RemoteWorkerRecord] = []
813             if self.ping('cheetah.house'):
814                 pool.append(
815                     RemoteWorkerRecord(
816                         username = 'scott',
817                         machine = 'cheetah.house',
818                         weight = 10,
819                         count = 6,
820                     ),
821                 )
822             if self.ping('video.house'):
823                 pool.append(
824                     RemoteWorkerRecord(
825                         username = 'scott',
826                         machine = 'video.house',
827                         weight = 2,
828                         count = 4,
829                     ),
830                 )
831             if self.ping('wannabe.house'):
832                 pool.append(
833                     RemoteWorkerRecord(
834                         username = 'scott',
835                         machine = 'wannabe.house',
836                         weight = 2,
837                         count = 4,
838                     ),
839                 )
840             if self.ping('meerkat.cabin'):
841                 pool.append(
842                     RemoteWorkerRecord(
843                         username = 'scott',
844                         machine = 'meerkat.cabin',
845                         weight = 7,
846                         count = 2,
847                     ),
848                 )
849             if self.ping('backup.house'):
850                 pool.append(
851                     RemoteWorkerRecord(
852                         username = 'scott',
853                         machine = 'backup.house',
854                         weight = 3,
855                         count = 2,
856                     ),
857                 )
858             if self.ping('puma.cabin'):
859                 pool.append(
860                     RemoteWorkerRecord(
861                         username = 'scott',
862                         machine = 'puma.cabin',
863                         weight = 10,
864                         count = 6,
865                     ),
866                 )
867             policy = WeightedRandomRemoteWorkerSelectionPolicy()
868             policy.register_worker_pool(pool)
869             self.remote_executor = RemoteExecutor(pool, policy)
870         return self.remote_executor