Bugfixes.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20
21 from ansi import bg, fg, underline, reset
22 import argparse_utils
23 import config
24 from exec_utils import run_silently, cmd_in_background
25 from decorator_utils import singleton
26 import histogram as hist
27
28 logger = logging.getLogger(__name__)
29
30 parser = config.add_commandline_args(
31     f"Executors ({__file__})",
32     "Args related to processing executors."
33 )
34 parser.add_argument(
35     '--executors_threadpool_size',
36     type=int,
37     metavar='#THREADS',
38     help='Number of threads in the default threadpool, leave unset for default',
39     default=None
40 )
41 parser.add_argument(
42     '--executors_processpool_size',
43     type=int,
44     metavar='#PROCESSES',
45     help='Number of processes in the default processpool, leave unset for default',
46     default=None,
47 )
48 parser.add_argument(
49     '--executors_schedule_remote_backups',
50     default=True,
51     action=argparse_utils.ActionNoYes,
52     help='Should we schedule duplicative backup work if a remote bundle is slow',
53 )
54 parser.add_argument(
55     '--executors_max_bundle_failures',
56     type=int,
57     default=3,
58     metavar='#FAILURES',
59     help='Maximum number of failures before giving up on a bundle',
60 )
61
62 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
63 SSH = 'ssh -oForwardX11=no'
64
65
66 def make_cloud_pickle(fun, *args, **kwargs):
67     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
68     return cloudpickle.dumps((fun, args, kwargs))
69
70
71 class BaseExecutor(ABC):
72     def __init__(self, *, title=''):
73         self.title = title
74         self.task_count = 0
75         self.histogram = hist.SimpleHistogram(
76             hist.SimpleHistogram.n_evenly_spaced_buckets(
77                 int(0), int(500), 50
78             )
79         )
80
81     @abstractmethod
82     def submit(self,
83                function: Callable,
84                *args,
85                **kwargs) -> fut.Future:
86         pass
87
88     @abstractmethod
89     def shutdown(self,
90                  wait: bool = True) -> None:
91         pass
92
93     def adjust_task_count(self, delta: int) -> None:
94         self.task_count += delta
95         logger.debug(f'Executor current task count is {self.task_count}')
96
97
98 class ThreadExecutor(BaseExecutor):
99     def __init__(self,
100                  max_workers: Optional[int] = None):
101         super().__init__()
102         workers = None
103         if max_workers is not None:
104             workers = max_workers
105         elif 'executors_threadpool_size' in config.config:
106             workers = config.config['executors_threadpool_size']
107         logger.debug(f'Creating threadpool executor with {workers} workers')
108         self._thread_pool_executor = fut.ThreadPoolExecutor(
109             max_workers=workers,
110             thread_name_prefix="thread_executor_helper"
111         )
112
113     def run_local_bundle(self, fun, *args, **kwargs):
114         logger.debug(f"Running local bundle at {fun.__name__}")
115         start = time.time()
116         result = fun(*args, **kwargs)
117         end = time.time()
118         self.adjust_task_count(-1)
119         duration = end - start
120         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
121         self.histogram.add_item(duration)
122         return result
123
124     def submit(self,
125                function: Callable,
126                *args,
127                **kwargs) -> fut.Future:
128         self.adjust_task_count(+1)
129         newargs = []
130         newargs.append(function)
131         for arg in args:
132             newargs.append(arg)
133         return self._thread_pool_executor.submit(
134             self.run_local_bundle,
135             *newargs,
136             **kwargs)
137
138     def shutdown(self,
139                  wait = True) -> None:
140         logger.debug(f'Shutting down threadpool executor {self.title}')
141         print(self.histogram)
142         self._thread_pool_executor.shutdown(wait)
143
144
145 class ProcessExecutor(BaseExecutor):
146     def __init__(self,
147                  max_workers=None):
148         super().__init__()
149         workers = None
150         if max_workers is not None:
151             workers = max_workers
152         elif 'executors_processpool_size' in config.config:
153             workers = config.config['executors_processpool_size']
154         logger.debug(f'Creating processpool executor with {workers} workers.')
155         self._process_executor = fut.ProcessPoolExecutor(
156             max_workers=workers,
157         )
158
159     def run_cloud_pickle(self, pickle):
160         fun, args, kwargs = cloudpickle.loads(pickle)
161         logger.debug(f"Running pickled bundle at {fun.__name__}")
162         result = fun(*args, **kwargs)
163         self.adjust_task_count(-1)
164         return result
165
166     def submit(self,
167                function: Callable,
168                *args,
169                **kwargs) -> fut.Future:
170         start = time.time()
171         self.adjust_task_count(+1)
172         pickle = make_cloud_pickle(function, *args, **kwargs)
173         result = self._process_executor.submit(
174             self.run_cloud_pickle,
175             pickle
176         )
177         result.add_done_callback(
178             lambda _: self.histogram.add_item(
179                 time.time() - start
180             )
181         )
182         return result
183
184     def shutdown(self, wait=True) -> None:
185         logger.debug(f'Shutting down processpool executor {self.title}')
186         self._process_executor.shutdown(wait)
187         print(self.histogram)
188
189     def __getstate__(self):
190         state = self.__dict__.copy()
191         state['_process_executor'] = None
192         return state
193
194
195 @dataclass
196 class RemoteWorkerRecord:
197     username: str
198     machine: str
199     weight: int
200     count: int
201
202     def __hash__(self):
203         return hash((self.username, self.machine))
204
205     def __repr__(self):
206         return f'{self.username}@{self.machine}'
207
208
209 @dataclass
210 class BundleDetails:
211     pickled_code: bytes
212     uuid: str
213     fname: str
214     worker: Optional[RemoteWorkerRecord]
215     username: Optional[str]
216     machine: Optional[str]
217     hostname: str
218     code_file: str
219     result_file: str
220     pid: int
221     start_ts: float
222     end_ts: float
223     too_slow: bool
224     super_slow: bool
225     src_bundle: BundleDetails
226     is_cancelled: threading.Event
227     was_cancelled: bool
228     backup_bundles: Optional[List[BundleDetails]]
229     failure_count: int
230
231
232 class RemoteExecutorStatus:
233     def __init__(self, total_worker_count: int) -> None:
234         self.worker_count = total_worker_count
235         self.known_workers: Set[RemoteWorkerRecord] = set()
236         self.start_per_bundle: Dict[str, float] = defaultdict(float)
237         self.end_per_bundle: Dict[str, float] = defaultdict(float)
238         self.finished_bundle_timings_per_worker: Dict[
239             RemoteWorkerRecord,
240             List[float]
241         ] = {}
242         self.in_flight_bundles_by_worker: Dict[
243             RemoteWorkerRecord,
244             Set[str]
245         ] = {}
246         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
247         self.finished_bundle_timings: List[float] = []
248         self.last_periodic_dump: Optional[float] = None
249         self.total_bundles_submitted = 0
250
251         # Protects reads and modification using self.  Also used
252         # as a memory fence for modifications to bundle.
253         self.lock = threading.Lock()
254
255     def record_acquire_worker(
256             self,
257             worker: RemoteWorkerRecord,
258             uuid: str
259     ) -> None:
260         with self.lock:
261             self.record_acquire_worker_already_locked(
262                 worker,
263                 uuid
264             )
265
266     def record_acquire_worker_already_locked(
267             self,
268             worker: RemoteWorkerRecord,
269             uuid: str
270     ) -> None:
271         assert self.lock.locked()
272         self.known_workers.add(worker)
273         self.start_per_bundle[uuid] = time.time()
274         x = self.in_flight_bundles_by_worker.get(worker, set())
275         x.add(uuid)
276         self.in_flight_bundles_by_worker[worker] = x
277
278     def record_bundle_details(
279             self,
280             details: BundleDetails) -> None:
281         with self.lock:
282             self.record_bundle_details_already_locked(details)
283
284     def record_bundle_details_already_locked(
285             self,
286             details: BundleDetails) -> None:
287         assert self.lock.locked()
288         self.bundle_details_by_uuid[details.uuid] = details
289
290     def record_release_worker_already_locked(
291             self,
292             worker: RemoteWorkerRecord,
293             uuid: str,
294             was_cancelled: bool,
295     ) -> None:
296         assert self.lock.locked()
297         ts = time.time()
298         self.end_per_bundle[uuid] = ts
299         self.in_flight_bundles_by_worker[worker].remove(uuid)
300         if not was_cancelled:
301             bundle_latency = ts - self.start_per_bundle[uuid]
302             x = self.finished_bundle_timings_per_worker.get(worker, list())
303             x.append(bundle_latency)
304             self.finished_bundle_timings_per_worker[worker] = x
305             self.finished_bundle_timings.append(bundle_latency)
306
307     def total_in_flight(self) -> int:
308         assert self.lock.locked()
309         total_in_flight = 0
310         for worker in self.known_workers:
311             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
312         return total_in_flight
313
314     def total_idle(self) -> int:
315         assert self.lock.locked()
316         return self.worker_count - self.total_in_flight()
317
318     def __repr__(self):
319         assert self.lock.locked()
320         ts = time.time()
321         total_finished = len(self.finished_bundle_timings)
322         total_in_flight = self.total_in_flight()
323         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
324         qall = None
325         if len(self.finished_bundle_timings) > 1:
326             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
327             ret += (
328                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
329                 f'✅={total_finished}/{self.total_bundles_submitted}, '
330                 f'💻n={total_in_flight}/{self.worker_count}\n'
331             )
332         else:
333             ret += (
334                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
335                 f'💻n={total_in_flight}/{self.worker_count}\n'
336             )
337
338         for worker in self.known_workers:
339             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
340             timings = self.finished_bundle_timings_per_worker.get(worker, [])
341             count = len(timings)
342             qworker = None
343             if count > 1:
344                 qworker = numpy.quantile(timings, [0.5, 0.95])
345                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
346             else:
347                 ret += '\n'
348             if count > 0:
349                 ret += f'    ...finished {count} total bundle(s) so far\n'
350             in_flight = len(self.in_flight_bundles_by_worker[worker])
351             if in_flight > 0:
352                 ret += f'    ...{in_flight} bundles currently in flight:\n'
353                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
354                     details = self.bundle_details_by_uuid.get(
355                         bundle_uuid,
356                         None
357                     )
358                     pid = str(details.pid) if details is not None else "TBD"
359                     sec = ts - self.start_per_bundle[bundle_uuid]
360                     ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
361                     if qworker is not None:
362                         if sec > qworker[1]:
363                             ret += f'{bg("red")}>💻p95{reset()} '
364                         elif sec > qworker[0]:
365                             ret += f'{fg("red")}>💻p50{reset()} '
366                     if qall is not None:
367                         if sec > qall[1] * 1.5:
368                             ret += f'{bg("red")}!!!{reset()}'
369                             if details is not None:
370                                 logger.debug(f'Flagging {details.uuid} for another backup')
371                                 details.super_slow = True
372                         elif sec > qall[1]:
373                             ret += f'{bg("red")}>∀p95{reset()} '
374                             if details is not None:
375                                 logger.debug(f'Flagging {details.uuid} for a backup')
376                                 details.too_slow = True
377                         elif sec > qall[0]:
378                             ret += f'{fg("red")}>∀p50{reset()}'
379                     ret += '\n'
380         return ret
381
382     def periodic_dump(self, total_bundles_submitted: int) -> None:
383         assert self.lock.locked()
384         self.total_bundles_submitted = total_bundles_submitted
385         ts = time.time()
386         if (
387                 self.last_periodic_dump is None
388                 or ts - self.last_periodic_dump > 5.0
389         ):
390             print(self)
391             self.last_periodic_dump = ts
392
393
394 class RemoteWorkerSelectionPolicy(ABC):
395     def register_worker_pool(self, workers):
396         random.seed()
397         self.workers = workers
398
399     @abstractmethod
400     def is_worker_available(self) -> bool:
401         pass
402
403     @abstractmethod
404     def acquire_worker(
405             self,
406             machine_to_avoid = None
407     ) -> Optional[RemoteWorkerRecord]:
408         pass
409
410
411 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
412     def is_worker_available(self) -> bool:
413         for worker in self.workers:
414             if worker.count > 0:
415                 return True
416         return False
417
418     def acquire_worker(
419             self,
420             machine_to_avoid = None
421     ) -> Optional[RemoteWorkerRecord]:
422         grabbag = []
423         for worker in self.workers:
424             for x in range(0, worker.count):
425                 for y in range(0, worker.weight):
426                     grabbag.append(worker)
427
428         for _ in range(0, 5):
429             random.shuffle(grabbag)
430             worker = grabbag[0]
431             if worker.machine != machine_to_avoid or _ > 2:
432                 if worker.count > 0:
433                     worker.count -= 1
434                     logger.debug(f'Selected worker {worker}')
435                     return worker
436         logger.warning("Couldn't find a worker; go fish.")
437         return None
438
439
440 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
441     def __init__(self) -> None:
442         self.index = 0
443
444     def is_worker_available(self) -> bool:
445         for worker in self.workers:
446             if worker.count > 0:
447                 return True
448         return False
449
450     def acquire_worker(
451             self,
452             machine_to_avoid: str = None
453     ) -> Optional[RemoteWorkerRecord]:
454         x = self.index
455         while True:
456             worker = self.workers[x]
457             if worker.count > 0:
458                 worker.count -= 1
459                 x += 1
460                 if x >= len(self.workers):
461                     x = 0
462                 self.index = x
463                 logger.debug(f'Selected worker {worker}')
464                 return worker
465             x += 1
466             if x >= len(self.workers):
467                 x = 0
468             if x == self.index:
469                 logger.warning("Couldn't find a worker; go fish.")
470                 return None
471
472
473 class RemoteExecutor(BaseExecutor):
474     def __init__(self,
475                  workers: List[RemoteWorkerRecord],
476                  policy: RemoteWorkerSelectionPolicy) -> None:
477         super().__init__()
478         self.workers = workers
479         self.worker_count = 0
480         for worker in self.workers:
481             self.worker_count += worker.count
482         if self.worker_count <= 0:
483             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
484             logger.critical(msg)
485             raise Exception(msg)
486         self.policy = policy
487         self.policy.register_worker_pool(self.workers)
488         self.cv = threading.Condition()
489         self._helper_executor = fut.ThreadPoolExecutor(
490             thread_name_prefix="remote_executor_helper",
491             max_workers=self.worker_count,
492         )
493         self.status = RemoteExecutorStatus(self.worker_count)
494         self.total_bundles_submitted = 0
495         logger.debug(
496             f'Creating remote processpool with {self.worker_count} remote endpoints.'
497         )
498
499     def is_worker_available(self) -> bool:
500         return self.policy.is_worker_available()
501
502     def acquire_worker(
503             self,
504             machine_to_avoid: str = None
505     ) -> Optional[RemoteWorkerRecord]:
506         return self.policy.acquire_worker(machine_to_avoid)
507
508     def find_available_worker_or_block(
509             self,
510             machine_to_avoid: str = None
511     ) -> RemoteWorkerRecord:
512         with self.cv:
513             while not self.is_worker_available():
514                 self.cv.wait()
515             worker = self.acquire_worker(machine_to_avoid)
516             if worker is not None:
517                 return worker
518         msg = "We should never reach this point in the code"
519         logger.critical(msg)
520         raise Exception(msg)
521
522     def release_worker(self, worker: RemoteWorkerRecord) -> None:
523         logger.debug(f'Released worker {worker}')
524         with self.cv:
525             worker.count += 1
526             self.cv.notify()
527
528     def heartbeat(self) -> None:
529         with self.status.lock:
530             # Regular progress report
531             self.status.periodic_dump(self.total_bundles_submitted)
532
533             # Look for bundles to reschedule
534             if len(self.status.finished_bundle_timings) > 7:
535                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
536                     for uuid in bundle_uuids:
537                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
538                         if (
539                                 bundle is not None and
540                                 bundle.too_slow and
541                                 bundle.src_bundle is None and
542                                 config.config['executors_schedule_remote_backups']
543                         ):
544                             self.consider_backup_for_bundle(bundle)
545
546     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
547         assert self.status.lock.locked()
548         if (
549             bundle.too_slow
550             and len(bundle.backup_bundles) == 0       # one backup per
551         ):
552             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
553             logger.debug(msg)
554             self.schedule_backup_for_bundle(bundle)
555             return
556         elif (
557                 bundle.super_slow
558                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
559                 and self.status.total_idle() > 4
560         ):
561             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
562             logger.debug(msg)
563             self.schedule_backup_for_bundle(bundle)
564             return
565
566     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
567         with self.status.lock:
568             if bundle.is_cancelled.wait(timeout=0.0):
569                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
570                 bundle.was_cancelled = True
571                 return True
572         return False
573
574     def launch(self, bundle: BundleDetails) -> Any:
575         """Find a worker for bundle or block until one is available."""
576         self.adjust_task_count(+1)
577         uuid = bundle.uuid
578         hostname = bundle.hostname
579         avoid_machine = None
580
581         # Try not to schedule a backup on the same host as the original.
582         if bundle.src_bundle is not None:
583             avoid_machine = bundle.src_bundle.machine
584         worker = None
585         while worker is None:
586             worker = self.find_available_worker_or_block(avoid_machine)
587         bundle.worker = worker
588         machine = bundle.machine = worker.machine
589         username = bundle.username = worker.username
590         fname = bundle.fname
591         self.status.record_acquire_worker(worker, uuid)
592         logger.debug(f'Running bundle {uuid} on {worker}...')
593
594         # Before we do any work, make sure the bundle is still viable.
595         if self.check_if_cancelled(bundle):
596             try:
597                 return self.post_launch_work(bundle)
598             except Exception as e:
599                 logger.exception(e)
600                 logger.info(f"{uuid}/{fname}: bundle seems to have failed?!")
601                 if bundle.failure_count < config.config['executors_max_bundle_failures']:
602                     return self.launch(bundle)
603                 else:
604                     logger.info(f"{uuid}/{fname}: bundle is poison, giving up on it.")
605                     return None
606
607         # Send input to machine if it's not local.
608         if hostname not in machine:
609             cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
610             logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}")
611             run_silently(cmd)
612
613         # Do it.
614         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
615                f'"source py39-venv/bin/activate &&'
616                f' /home/scott/lib/python_modules/remote_worker.py'
617                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
618         p = cmd_in_background(cmd, silent=True)
619         bundle.pid = pid = p.pid
620         logger.info(f"{uuid}/{fname}: Start training on {worker} via {cmd} (background pid {pid})")
621
622         while True:
623             try:
624                 p.wait(timeout=0.25)
625             except subprocess.TimeoutExpired:
626                 self.heartbeat()
627
628                 # Both source and backup bundles can be cancelled by
629                 # the other depending on which finishes first.
630                 if self.check_if_cancelled(bundle):
631                     p.terminate()
632                     break
633             else:
634                 logger.debug(
635                     f"{uuid}/{fname}: pid {pid} has finished its work normally."
636                 )
637                 break
638
639         try:
640             return self.post_launch_work(bundle)
641         except Exception as e:
642             logger.exception(e)
643             logger.info(f"{uuid}: Bundle seems to have failed?!")
644             if bundle.failure_count < config.config['executors_max_bundle_failures']:
645                 return self.launch(bundle)
646             logger.info(f"{uuid}: Bundle is poison, giving up on it.")
647             return None
648
649     def post_launch_work(self, bundle: BundleDetails) -> Any:
650         with self.status.lock:
651             is_original = bundle.src_bundle is None
652             was_cancelled = bundle.was_cancelled
653             username = bundle.username
654             machine = bundle.machine
655             result_file = bundle.result_file
656             code_file = bundle.code_file
657             fname = bundle.fname
658             uuid = bundle.uuid
659
660             # Whether original or backup, if we finished first we must
661             # fetch the results if the computation happened on a
662             # remote machine.
663             bundle.end_ts = time.time()
664             if not was_cancelled:
665                 assert bundle.machine is not None
666                 if bundle.hostname not in bundle.machine:
667                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
668                     logger.info(
669                         f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}"
670                     )
671                     try:
672                         run_silently(cmd)
673                     except subprocess.CalledProcessError:
674                         logger.critical(f'Failed to copy {username}@{machine}:{result_file}!')
675                     run_silently(f'{SSH} {username}@{machine}'
676                                  f' "/bin/rm -f {code_file} {result_file}"')
677                 dur = bundle.end_ts - bundle.start_ts
678                 self.histogram.add_item(dur)
679             assert bundle.worker is not None
680             self.status.record_release_worker_already_locked(
681                 bundle.worker,
682                 bundle.uuid,
683                 was_cancelled
684             )
685
686         # Only the original worker should unpickle the file contents
687         # though since it's the only one whose result matters.  The
688         # original is also the only job that may delete result_file
689         # from disk.  Note that the original may have been cancelled
690         # if one of the backups finished first; it still must read the
691         # result from disk.
692         if is_original:
693             logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.")
694             try:
695                 with open(f'{result_file}', 'rb') as rb:
696                     serialized = rb.read()
697                 result = cloudpickle.loads(serialized)
698             except Exception as e:
699                 msg = f'Failed to load {result_file}'
700                 logger.critical(msg)
701                 bundle.failure_count += 1
702                 self.release_worker(bundle.worker)
703                 raise Exception(e)
704             os.remove(f'{result_file}')
705             os.remove(f'{code_file}')
706
707             # Notify any backups that the original is done so they
708             # should stop ASAP.  Do this whether or not we
709             # finished first since there could be more than one
710             # backup.
711             if bundle.backup_bundles is not None:
712                 for backup in bundle.backup_bundles:
713                     logger.debug(
714                         f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled'
715                     )
716                     backup.is_cancelled.set()
717
718         # This is a backup job.
719         else:
720             # Backup results don't matter, they just need to leave the
721             # result file in the right place for their originals to
722             # read/unpickle later.
723             result = None
724
725             # Tell the original to stop if we finished first.
726             if not was_cancelled:
727                 logger.debug(
728                     f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} that it\'s cancelled'
729                 )
730                 bundle.src_bundle.is_cancelled.set()
731
732         assert bundle.worker is not None
733         self.release_worker(bundle.worker)
734         self.adjust_task_count(-1)
735         return result
736
737     def create_original_bundle(self, pickle, fname: str):
738         from string_utils import generate_uuid
739         uuid = generate_uuid(as_hex=True)
740         code_file = f'/tmp/{uuid}.code.bin'
741         result_file = f'/tmp/{uuid}.result.bin'
742
743         logger.debug(f'Writing pickled code to {code_file}')
744         with open(f'{code_file}', 'wb') as wb:
745             wb.write(pickle)
746
747         bundle = BundleDetails(
748             pickled_code = pickle,
749             uuid = uuid,
750             fname = fname,
751             worker = None,
752             username = None,
753             machine = None,
754             hostname = platform.node(),
755             code_file = code_file,
756             result_file = result_file,
757             pid = 0,
758             start_ts = time.time(),
759             end_ts = 0.0,
760             too_slow = False,
761             super_slow = False,
762             src_bundle = None,
763             is_cancelled = threading.Event(),
764             was_cancelled = False,
765             backup_bundles = [],
766             failure_count = 0,
767         )
768         self.status.record_bundle_details(bundle)
769         logger.debug(f'{uuid}/{fname}: Created original bundle')
770         return bundle
771
772     def create_backup_bundle(self, src_bundle: BundleDetails):
773         assert src_bundle.backup_bundles is not None
774         n = len(src_bundle.backup_bundles)
775         uuid = src_bundle.uuid + f'_backup#{n}'
776
777         backup_bundle = BundleDetails(
778             pickled_code = src_bundle.pickled_code,
779             uuid = uuid,
780             fname = src_bundle.fname,
781             worker = None,
782             username = None,
783             machine = None,
784             hostname = src_bundle.hostname,
785             code_file = src_bundle.code_file,
786             result_file = src_bundle.result_file,
787             pid = 0,
788             start_ts = time.time(),
789             end_ts = 0.0,
790             too_slow = False,
791             super_slow = False,
792             src_bundle = src_bundle,
793             is_cancelled = threading.Event(),
794             was_cancelled = False,
795             backup_bundles = None,    # backup backups not allowed
796             failure_count = 0,
797         )
798         src_bundle.backup_bundles.append(backup_bundle)
799         self.status.record_bundle_details_already_locked(backup_bundle)
800         logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle')
801         return backup_bundle
802
803     def schedule_backup_for_bundle(self,
804                                    src_bundle: BundleDetails):
805         assert self.status.lock.locked()
806         backup_bundle = self.create_backup_bundle(src_bundle)
807         logger.debug(
808             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
809         )
810         self._helper_executor.submit(self.launch, backup_bundle)
811
812         # Results from backups don't matter; if they finish first
813         # they will move the result_file to this machine and let
814         # the original pick them up and unpickle them.
815
816     def submit(self,
817                function: Callable,
818                *args,
819                **kwargs) -> fut.Future:
820         pickle = make_cloud_pickle(function, *args, **kwargs)
821         bundle = self.create_original_bundle(pickle, function.__name__)
822         self.total_bundles_submitted += 1
823         return self._helper_executor.submit(self.launch, bundle)
824
825     def shutdown(self, wait=True) -> None:
826         self._helper_executor.shutdown(wait)
827         logging.debug(f'Shutting down RemoteExecutor {self.title}')
828         print(self.histogram)
829
830
831 @singleton
832 class DefaultExecutors(object):
833     def __init__(self):
834         self.thread_executor: Optional[ThreadExecutor] = None
835         self.process_executor: Optional[ProcessExecutor] = None
836         self.remote_executor: Optional[RemoteExecutor] = None
837
838     def ping(self, host) -> bool:
839         command = ['ping', '-c', '1', host]
840         return subprocess.call(
841             command,
842             stdout=subprocess.DEVNULL,
843             stderr=subprocess.DEVNULL,
844         ) == 0
845
846     def thread_pool(self) -> ThreadExecutor:
847         if self.thread_executor is None:
848             self.thread_executor = ThreadExecutor()
849         return self.thread_executor
850
851     def process_pool(self) -> ProcessExecutor:
852         if self.process_executor is None:
853             self.process_executor = ProcessExecutor()
854         return self.process_executor
855
856     def remote_pool(self) -> RemoteExecutor:
857         if self.remote_executor is None:
858             pool: List[RemoteWorkerRecord] = []
859             if self.ping('cheetah.house'):
860                 pool.append(
861                     RemoteWorkerRecord(
862                         username = 'scott',
863                         machine = 'cheetah.house',
864                         weight = 12,
865                         count = 4,
866                     ),
867                 )
868             if self.ping('video.house'):
869                 pool.append(
870                     RemoteWorkerRecord(
871                         username = 'scott',
872                         machine = 'video.house',
873                         weight = 1,
874                         count = 4,
875                     ),
876                 )
877             if self.ping('wannabe.house'):
878                 pool.append(
879                     RemoteWorkerRecord(
880                         username = 'scott',
881                         machine = 'wannabe.house',
882                         weight = 2,
883                         count = 4,
884                     ),
885                 )
886             if self.ping('meerkat.cabin'):
887                 pool.append(
888                     RemoteWorkerRecord(
889                         username = 'scott',
890                         machine = 'meerkat.cabin',
891                         weight = 5,
892                         count = 2,
893                     ),
894                 )
895             if self.ping('backup.house'):
896                 pool.append(
897                     RemoteWorkerRecord(
898                         username = 'scott',
899                         machine = 'backup.house',
900                         weight = 1,
901                         count = 4,
902                     ),
903                 )
904             if self.ping('puma.cabin'):
905                 pool.append(
906                     RemoteWorkerRecord(
907                         username = 'scott',
908                         machine = 'puma.cabin',
909                         weight = 12,
910                         count = 4,
911                     ),
912                 )
913             policy = WeightedRandomRemoteWorkerSelectionPolicy()
914             policy.register_worker_pool(pool)
915             self.remote_executor = RemoteExecutor(pool, policy)
916         return self.remote_executor