b16ad92d80a624c466b6d54c5830d5a2f00c8789
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20 from overrides import overrides
21
22 from ansi import bg, fg, underline, reset
23 import argparse_utils
24 import config
25 from exec_utils import run_silently, cmd_in_background
26 from decorator_utils import singleton
27 import histogram as hist
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55 parser.add_argument(
56     '--executors_max_bundle_failures',
57     type=int,
58     default=3,
59     metavar='#FAILURES',
60     help='Maximum number of failures before giving up on a bundle',
61 )
62
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65
66
67 def make_cloud_pickle(fun, *args, **kwargs):
68     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
69     return cloudpickle.dumps((fun, args, kwargs))
70
71
72 class BaseExecutor(ABC):
73     def __init__(self, *, title=''):
74         self.title = title
75         self.task_count = 0
76         self.histogram = hist.SimpleHistogram(
77             hist.SimpleHistogram.n_evenly_spaced_buckets(
78                 int(0), int(500), 50
79             )
80         )
81
82     @abstractmethod
83     def submit(self,
84                function: Callable,
85                *args,
86                **kwargs) -> fut.Future:
87         pass
88
89     @abstractmethod
90     def shutdown(self,
91                  wait: bool = True) -> None:
92         pass
93
94     def adjust_task_count(self, delta: int) -> None:
95         self.task_count += delta
96         logger.debug(f'Executor current task count is {self.task_count}')
97
98
99 class ThreadExecutor(BaseExecutor):
100     def __init__(self,
101                  max_workers: Optional[int] = None):
102         super().__init__()
103         workers = None
104         if max_workers is not None:
105             workers = max_workers
106         elif 'executors_threadpool_size' in config.config:
107             workers = config.config['executors_threadpool_size']
108         logger.debug(f'Creating threadpool executor with {workers} workers')
109         self._thread_pool_executor = fut.ThreadPoolExecutor(
110             max_workers=workers,
111             thread_name_prefix="thread_executor_helper"
112         )
113
114     def run_local_bundle(self, fun, *args, **kwargs):
115         logger.debug(f"Running local bundle at {fun.__name__}")
116         start = time.time()
117         result = fun(*args, **kwargs)
118         end = time.time()
119         self.adjust_task_count(-1)
120         duration = end - start
121         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
122         self.histogram.add_item(duration)
123         return result
124
125     @overrides
126     def submit(self,
127                function: Callable,
128                *args,
129                **kwargs) -> fut.Future:
130         self.adjust_task_count(+1)
131         newargs = []
132         newargs.append(function)
133         for arg in args:
134             newargs.append(arg)
135         return self._thread_pool_executor.submit(
136             self.run_local_bundle,
137             *newargs,
138             **kwargs)
139
140     @overrides
141     def shutdown(self,
142                  wait = True) -> None:
143         logger.debug(f'Shutting down threadpool executor {self.title}')
144         print(self.histogram)
145         self._thread_pool_executor.shutdown(wait)
146
147
148 class ProcessExecutor(BaseExecutor):
149     def __init__(self,
150                  max_workers=None):
151         super().__init__()
152         workers = None
153         if max_workers is not None:
154             workers = max_workers
155         elif 'executors_processpool_size' in config.config:
156             workers = config.config['executors_processpool_size']
157         logger.debug(f'Creating processpool executor with {workers} workers.')
158         self._process_executor = fut.ProcessPoolExecutor(
159             max_workers=workers,
160         )
161
162     def run_cloud_pickle(self, pickle):
163         fun, args, kwargs = cloudpickle.loads(pickle)
164         logger.debug(f"Running pickled bundle at {fun.__name__}")
165         result = fun(*args, **kwargs)
166         self.adjust_task_count(-1)
167         return result
168
169     @overrides
170     def submit(self,
171                function: Callable,
172                *args,
173                **kwargs) -> fut.Future:
174         start = time.time()
175         self.adjust_task_count(+1)
176         pickle = make_cloud_pickle(function, *args, **kwargs)
177         result = self._process_executor.submit(
178             self.run_cloud_pickle,
179             pickle
180         )
181         result.add_done_callback(
182             lambda _: self.histogram.add_item(
183                 time.time() - start
184             )
185         )
186         return result
187
188     @overrides
189     def shutdown(self, wait=True) -> None:
190         logger.debug(f'Shutting down processpool executor {self.title}')
191         self._process_executor.shutdown(wait)
192         print(self.histogram)
193
194     def __getstate__(self):
195         state = self.__dict__.copy()
196         state['_process_executor'] = None
197         return state
198
199
200 @dataclass
201 class RemoteWorkerRecord:
202     username: str
203     machine: str
204     weight: int
205     count: int
206
207     def __hash__(self):
208         return hash((self.username, self.machine))
209
210     def __repr__(self):
211         return f'{self.username}@{self.machine}'
212
213
214 @dataclass
215 class BundleDetails:
216     pickled_code: bytes
217     uuid: str
218     fname: str
219     worker: Optional[RemoteWorkerRecord]
220     username: Optional[str]
221     machine: Optional[str]
222     hostname: str
223     code_file: str
224     result_file: str
225     pid: int
226     start_ts: float
227     end_ts: float
228     too_slow: bool
229     super_slow: bool
230     src_bundle: BundleDetails
231     is_cancelled: threading.Event
232     was_cancelled: bool
233     backup_bundles: Optional[List[BundleDetails]]
234     failure_count: int
235
236
237 class RemoteExecutorStatus:
238     def __init__(self, total_worker_count: int) -> None:
239         self.worker_count = total_worker_count
240         self.known_workers: Set[RemoteWorkerRecord] = set()
241         self.start_per_bundle: Dict[str, float] = defaultdict(float)
242         self.end_per_bundle: Dict[str, float] = defaultdict(float)
243         self.finished_bundle_timings_per_worker: Dict[
244             RemoteWorkerRecord,
245             List[float]
246         ] = {}
247         self.in_flight_bundles_by_worker: Dict[
248             RemoteWorkerRecord,
249             Set[str]
250         ] = {}
251         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
252         self.finished_bundle_timings: List[float] = []
253         self.last_periodic_dump: Optional[float] = None
254         self.total_bundles_submitted = 0
255
256         # Protects reads and modification using self.  Also used
257         # as a memory fence for modifications to bundle.
258         self.lock = threading.Lock()
259
260     def record_acquire_worker(
261             self,
262             worker: RemoteWorkerRecord,
263             uuid: str
264     ) -> None:
265         with self.lock:
266             self.record_acquire_worker_already_locked(
267                 worker,
268                 uuid
269             )
270
271     def record_acquire_worker_already_locked(
272             self,
273             worker: RemoteWorkerRecord,
274             uuid: str
275     ) -> None:
276         assert self.lock.locked()
277         self.known_workers.add(worker)
278         self.start_per_bundle[uuid] = None
279         x = self.in_flight_bundles_by_worker.get(worker, set())
280         x.add(uuid)
281         self.in_flight_bundles_by_worker[worker] = x
282
283     def record_bundle_details(
284             self,
285             details: BundleDetails) -> None:
286         with self.lock:
287             self.record_bundle_details_already_locked(details)
288
289     def record_bundle_details_already_locked(
290             self,
291             details: BundleDetails) -> None:
292         assert self.lock.locked()
293         self.bundle_details_by_uuid[details.uuid] = details
294
295     def record_release_worker(
296             self,
297             worker: RemoteWorkerRecord,
298             uuid: str,
299             was_cancelled: bool,
300     ) -> None:
301         with self.lock:
302             self.record_release_worker_already_locked(
303                 worker, uuid, was_cancelled
304             )
305
306     def record_release_worker_already_locked(
307             self,
308             worker: RemoteWorkerRecord,
309             uuid: str,
310             was_cancelled: bool,
311     ) -> None:
312         assert self.lock.locked()
313         ts = time.time()
314         self.end_per_bundle[uuid] = ts
315         self.in_flight_bundles_by_worker[worker].remove(uuid)
316         if not was_cancelled:
317             bundle_latency = ts - self.start_per_bundle[uuid]
318             x = self.finished_bundle_timings_per_worker.get(worker, list())
319             x.append(bundle_latency)
320             self.finished_bundle_timings_per_worker[worker] = x
321             self.finished_bundle_timings.append(bundle_latency)
322
323     def record_processing_began(self, uuid: str):
324         with self.lock:
325             self.start_per_bundle[uuid] = time.time()
326
327     def total_in_flight(self) -> int:
328         assert self.lock.locked()
329         total_in_flight = 0
330         for worker in self.known_workers:
331             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
332         return total_in_flight
333
334     def total_idle(self) -> int:
335         assert self.lock.locked()
336         return self.worker_count - self.total_in_flight()
337
338     def __repr__(self):
339         assert self.lock.locked()
340         ts = time.time()
341         total_finished = len(self.finished_bundle_timings)
342         total_in_flight = self.total_in_flight()
343         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
344         qall = None
345         if len(self.finished_bundle_timings) > 1:
346             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
347             ret += (
348                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
349                 f'✅={total_finished}/{self.total_bundles_submitted}, '
350                 f'💻n={total_in_flight}/{self.worker_count}\n'
351             )
352         else:
353             ret += (
354                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
355                 f'💻n={total_in_flight}/{self.worker_count}\n'
356             )
357
358         for worker in self.known_workers:
359             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
360             timings = self.finished_bundle_timings_per_worker.get(worker, [])
361             count = len(timings)
362             qworker = None
363             if count > 1:
364                 qworker = numpy.quantile(timings, [0.5, 0.95])
365                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
366             else:
367                 ret += '\n'
368             if count > 0:
369                 ret += f'    ...finished {count} total bundle(s) so far\n'
370             in_flight = len(self.in_flight_bundles_by_worker[worker])
371             if in_flight > 0:
372                 ret += f'    ...{in_flight} bundles currently in flight:\n'
373                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
374                     details = self.bundle_details_by_uuid.get(
375                         bundle_uuid,
376                         None
377                     )
378                     pid = str(details.pid) if details is not None else "TBD"
379                     if self.start_per_bundle[bundle_uuid] is not None:
380                         sec = ts - self.start_per_bundle[bundle_uuid]
381                         ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
382                     else:
383                         ret += f'       {bundle_uuid} setting up / copying data...'
384                         sec = 0.0
385
386                     if qworker is not None:
387                         if sec > qworker[1]:
388                             ret += f'{bg("red")}>💻p95{reset()} '
389                         elif sec > qworker[0]:
390                             ret += f'{fg("red")}>💻p50{reset()} '
391                     if qall is not None:
392                         if sec > qall[1] * 1.5:
393                             ret += f'{bg("red")}!!!{reset()}'
394                             if details is not None:
395                                 logger.debug(f'Flagging {details.uuid} for another backup')
396                                 details.super_slow = True
397                         elif sec > qall[1]:
398                             ret += f'{bg("red")}>∀p95{reset()} '
399                             if details is not None:
400                                 logger.debug(f'Flagging {details.uuid} for a backup')
401                                 details.too_slow = True
402                         elif sec > qall[0]:
403                             ret += f'{fg("red")}>∀p50{reset()}'
404                     ret += '\n'
405         return ret
406
407     def periodic_dump(self, total_bundles_submitted: int) -> None:
408         assert self.lock.locked()
409         self.total_bundles_submitted = total_bundles_submitted
410         ts = time.time()
411         if (
412                 self.last_periodic_dump is None
413                 or ts - self.last_periodic_dump > 5.0
414         ):
415             print(self)
416             self.last_periodic_dump = ts
417
418
419 class RemoteWorkerSelectionPolicy(ABC):
420     def register_worker_pool(self, workers):
421         random.seed()
422         self.workers = workers
423
424     @abstractmethod
425     def is_worker_available(self) -> bool:
426         pass
427
428     @abstractmethod
429     def acquire_worker(
430             self,
431             machine_to_avoid = None
432     ) -> Optional[RemoteWorkerRecord]:
433         pass
434
435
436 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
437     def is_worker_available(self) -> bool:
438         for worker in self.workers:
439             if worker.count > 0:
440                 return True
441         return False
442
443     def acquire_worker(
444             self,
445             machine_to_avoid = None
446     ) -> Optional[RemoteWorkerRecord]:
447         grabbag = []
448         for worker in self.workers:
449             for x in range(0, worker.count):
450                 for y in range(0, worker.weight):
451                     grabbag.append(worker)
452
453         for _ in range(0, 5):
454             random.shuffle(grabbag)
455             worker = grabbag[0]
456             if worker.machine != machine_to_avoid or _ > 2:
457                 if worker.count > 0:
458                     worker.count -= 1
459                     logger.debug(f'Selected worker {worker}')
460                     return worker
461         logger.warning("Couldn't find a worker; go fish.")
462         return None
463
464
465 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
466     def __init__(self) -> None:
467         self.index = 0
468
469     def is_worker_available(self) -> bool:
470         for worker in self.workers:
471             if worker.count > 0:
472                 return True
473         return False
474
475     def acquire_worker(
476             self,
477             machine_to_avoid: str = None
478     ) -> Optional[RemoteWorkerRecord]:
479         x = self.index
480         while True:
481             worker = self.workers[x]
482             if worker.count > 0:
483                 worker.count -= 1
484                 x += 1
485                 if x >= len(self.workers):
486                     x = 0
487                 self.index = x
488                 logger.debug(f'Selected worker {worker}')
489                 return worker
490             x += 1
491             if x >= len(self.workers):
492                 x = 0
493             if x == self.index:
494                 logger.warning("Couldn't find a worker; go fish.")
495                 return None
496
497
498 class RemoteExecutor(BaseExecutor):
499     def __init__(self,
500                  workers: List[RemoteWorkerRecord],
501                  policy: RemoteWorkerSelectionPolicy) -> None:
502         super().__init__()
503         self.workers = workers
504         self.policy = policy
505         self.worker_count = 0
506         for worker in self.workers:
507             self.worker_count += worker.count
508         if self.worker_count <= 0:
509             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
510             logger.critical(msg)
511             raise Exception(msg)
512         self.policy.register_worker_pool(self.workers)
513         self.cv = threading.Condition()
514         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
515         self._helper_executor = fut.ThreadPoolExecutor(
516             thread_name_prefix="remote_executor_helper",
517             max_workers=self.worker_count,
518         )
519         self.status = RemoteExecutorStatus(self.worker_count)
520         self.total_bundles_submitted = 0
521         logger.debug(
522             f'Creating remote processpool with {self.worker_count} remote worker threads.'
523         )
524
525     def is_worker_available(self) -> bool:
526         return self.policy.is_worker_available()
527
528     def acquire_worker(
529             self,
530             machine_to_avoid: str = None
531     ) -> Optional[RemoteWorkerRecord]:
532         return self.policy.acquire_worker(machine_to_avoid)
533
534     def find_available_worker_or_block(
535             self,
536             machine_to_avoid: str = None
537     ) -> RemoteWorkerRecord:
538         with self.cv:
539             while not self.is_worker_available():
540                 self.cv.wait()
541             worker = self.acquire_worker(machine_to_avoid)
542             if worker is not None:
543                 return worker
544         msg = "We should never reach this point in the code"
545         logger.critical(msg)
546         raise Exception(msg)
547
548     def release_worker(self, worker: RemoteWorkerRecord) -> None:
549         logger.debug(f'Released worker {worker}')
550         with self.cv:
551             worker.count += 1
552             self.cv.notify()
553
554     def heartbeat(self) -> None:
555         with self.status.lock:
556             # Regular progress report
557             self.status.periodic_dump(self.total_bundles_submitted)
558
559             # Look for bundles to reschedule
560             num_done = len(self.status.finished_bundle_timings)
561             if num_done > 7 or (num_done > 5 and self.is_worker_available()):
562                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
563                     for uuid in bundle_uuids:
564                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
565                         if (
566                                 bundle is not None and
567                                 bundle.too_slow and
568                                 bundle.src_bundle is None and
569                                 config.config['executors_schedule_remote_backups']
570                         ):
571                             self.consider_backup_for_bundle(bundle)
572
573     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
574         assert self.status.lock.locked()
575         if (
576             bundle.too_slow
577             and len(bundle.backup_bundles) == 0       # one backup per
578         ):
579             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
580             logger.debug(msg)
581             self.schedule_backup_for_bundle(bundle)
582             return
583         elif (
584                 bundle.super_slow
585                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
586                 and self.status.total_idle() > 4
587         ):
588             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
589             logger.debug(msg)
590             self.schedule_backup_for_bundle(bundle)
591             return
592
593     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
594         with self.status.lock:
595             if bundle.is_cancelled.wait(timeout=0.0):
596                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
597                 bundle.was_cancelled = True
598                 return True
599         return False
600
601     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
602         """Find a worker for bundle or block until one is available."""
603         self.adjust_task_count(+1)
604         uuid = bundle.uuid
605         hostname = bundle.hostname
606         avoid_machine = override_avoid_machine
607         is_original = bundle.src_bundle is None
608
609         # Try not to schedule a backup on the same host as the original.
610         if avoid_machine is None and bundle.src_bundle is not None:
611             avoid_machine = bundle.src_bundle.machine
612         worker = None
613         while worker is None:
614             worker = self.find_available_worker_or_block(avoid_machine)
615
616         # Ok, found a worker.
617         bundle.worker = worker
618         machine = bundle.machine = worker.machine
619         username = bundle.username = worker.username
620         fname = bundle.fname
621         self.status.record_acquire_worker(worker, uuid)
622         logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...')
623
624         # Before we do any work, make sure the bundle is still viable.
625         if self.check_if_cancelled(bundle):
626             try:
627                 return self.post_launch_work(bundle)
628             except Exception as e:
629                 logger.exception(e)
630                 logger.error(
631                     f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!'
632                 )
633                 assert bundle.worker is not None
634                 self.status.record_release_worker(
635                     bundle.worker,
636                     bundle.uuid,
637                     True,
638                 )
639                 self.release_worker(bundle.worker)
640                 self.adjust_task_count(-1)
641                 if is_original:
642                     # Weird.  We are the original owner of this
643                     # bundle.  For it to have been cancelled, a backup
644                     # must have already started and completed before
645                     # we even for started.  Moreover, the backup says
646                     # it is done but we can't find the results it
647                     # should have copied over.  Reschedule the whole
648                     # thing.
649                     return self.emergency_retry_nasty_bundle(bundle)
650                 else:
651                     # Expected(?).  We're a backup and our bundle is
652                     # cancelled before we even got started.  Something
653                     # went bad in post_launch_work (I acutually don't
654                     # see what?) but probably not worth worrying
655                     # about.
656                     return None
657
658         # Send input code / data to worker machine if it's not local.
659         if hostname not in machine:
660             try:
661                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
662                 start_ts = time.time()
663                 logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.")
664                 run_silently(cmd)
665                 xfer_latency = time.time() - start_ts
666                 logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.")
667             except Exception as e:
668                 logger.exception(e)
669                 logger.error(
670                     f'{uuid}/{fname}: failed to send instructions to worker machine?!?'
671                 )
672                 assert bundle.worker is not None
673                 self.status.record_release_worker(
674                     bundle.worker,
675                     bundle.uuid,
676                     True,
677                 )
678                 self.release_worker(bundle.worker)
679                 self.adjust_task_count(-1)
680                 if is_original:
681                     # Weird.  We tried to copy the code to the worker and it failed...
682                     # And we're the original bundle.  We have to retry.
683                     return self.emergency_retry_nasty_bundle(bundle)
684                 else:
685                     # This is actually expected; we're a backup.
686                     # There's a race condition where someone else
687                     # already finished the work and removed the source
688                     # code file before we could copy it.  No biggie.
689                     return None
690
691         # Kick off the work.  Note that if this fails we let
692         # wait_for_process deal with it.
693         self.status.record_processing_began(uuid)
694         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
695                f'"source py39-venv/bin/activate &&'
696                f' /home/scott/lib/python_modules/remote_worker.py'
697                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
698         logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...')
699         p = cmd_in_background(cmd, silent=True)
700         bundle.pid = pid = p.pid
701         logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.')
702         return self.wait_for_process(p, bundle, 0)
703
704     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
705         uuid = bundle.uuid
706         machine = bundle.machine
707         fname = bundle.fname
708         pid = p.pid
709         if depth > 3:
710             logger.error(
711                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
712             )
713             p.terminate()
714             self.status.record_release_worker(
715                 bundle.worker,
716                 bundle.uuid,
717                 True,
718             )
719             self.release_worker(bundle.worker)
720             self.adjust_task_count(-1)
721             return self.emergency_retry_nasty_bundle(bundle)
722
723         # Spin until either the ssh job we scheduled finishes the
724         # bundle or some backup worker signals that they finished it
725         # before we could.
726         while True:
727             try:
728                 p.wait(timeout=0.25)
729             except subprocess.TimeoutExpired:
730                 self.heartbeat()
731                 if self.check_if_cancelled(bundle):
732                     logger.info(
733                         f'{uuid}/{fname}: another worker finished bundle, checking it out...'
734                     )
735                     break
736             else:
737                 logger.info(
738                     f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..."
739                 )
740                 p = None
741                 break
742
743         # If we get here we believe the bundle is done; either the ssh
744         # subprocess finished (hopefully successfully) or we noticed
745         # that some other worker seems to have completed the bundle
746         # and we're bailing out.
747         try:
748             ret = self.post_launch_work(bundle)
749             if ret is not None and p is not None:
750                 p.terminate()
751             return ret
752
753         # Something went wrong; e.g. we could not copy the results
754         # back, cleanup after ourselves on the remote machine, or
755         # unpickle the results we got from the remove machine.  If we
756         # still have an active ssh subprocess, keep waiting on it.
757         # Otherwise, time for an emergency reschedule.
758         except Exception as e:
759             logger.exception(e)
760             logger.error(f'{uuid}/{fname}: Something unexpected just happened...')
761             if p is not None:
762                 logger.warning(
763                     f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
764                 )
765                 return self.wait_for_process(p, bundle, depth + 1)
766             else:
767                 self.status.record_release_worker(
768                     bundle.worker,
769                     bundle.uuid,
770                     True,
771                 )
772                 self.release_worker(bundle.worker)
773                 self.adjust_task_count(-1)
774                 return self.emergency_retry_nasty_bundle(bundle)
775
776     def post_launch_work(self, bundle: BundleDetails) -> Any:
777         with self.status.lock:
778             is_original = bundle.src_bundle is None
779             was_cancelled = bundle.was_cancelled
780             username = bundle.username
781             machine = bundle.machine
782             result_file = bundle.result_file
783             code_file = bundle.code_file
784             fname = bundle.fname
785             uuid = bundle.uuid
786
787             # Whether original or backup, if we finished first we must
788             # fetch the results if the computation happened on a
789             # remote machine.
790             bundle.end_ts = time.time()
791             if not was_cancelled:
792                 assert bundle.machine is not None
793                 if bundle.hostname not in bundle.machine:
794                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
795                     logger.info(
796                         f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}"
797                     )
798
799                     # If either of these throw they are handled in
800                     # wait_for_process.
801                     run_silently(cmd)
802                     run_silently(f'{SSH} {username}@{machine}'
803                                  f' "/bin/rm -f {code_file} {result_file}"')
804                 dur = bundle.end_ts - bundle.start_ts
805                 self.histogram.add_item(dur)
806
807         # Only the original worker should unpickle the file contents
808         # though since it's the only one whose result matters.  The
809         # original is also the only job that may delete result_file
810         # from disk.  Note that the original may have been cancelled
811         # if one of the backups finished first; it still must read the
812         # result from disk.
813         if is_original:
814             logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.")
815             try:
816                 with open(f'{result_file}', 'rb') as rb:
817                     serialized = rb.read()
818                 result = cloudpickle.loads(serialized)
819             except Exception as e:
820                 msg = f'Failed to load {result_file}, this is bad news.'
821                 logger.critical(msg)
822                 self.status.record_release_worker(
823                     bundle.worker,
824                     bundle.uuid,
825                     True,
826                 )
827                 self.release_worker(bundle.worker)
828
829                 # Re-raise the exception; the code in wait_for_process may
830                 # decide to emergency_retry_nasty_bundle here.
831                 raise Exception(e)
832
833             logger.debug(
834                 f'Removing local (master) {code_file} and {result_file}.'
835             )
836             os.remove(f'{result_file}')
837             os.remove(f'{code_file}')
838
839             # Notify any backups that the original is done so they
840             # should stop ASAP.  Do this whether or not we
841             # finished first since there could be more than one
842             # backup.
843             if bundle.backup_bundles is not None:
844                 for backup in bundle.backup_bundles:
845                     logger.debug(
846                         f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled'
847                     )
848                     backup.is_cancelled.set()
849
850         # This is a backup job and, by now, we have already fetched
851         # the bundle results.
852         else:
853             # Backup results don't matter, they just need to leave the
854             # result file in the right place for their originals to
855             # read/unpickle later.
856             result = None
857
858             # Tell the original to stop if we finished first.
859             if not was_cancelled:
860                 logger.debug(
861                     f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
862                 )
863                 bundle.src_bundle.is_cancelled.set()
864
865         assert bundle.worker is not None
866         self.status.record_release_worker(
867             bundle.worker,
868             bundle.uuid,
869             was_cancelled,
870         )
871         self.release_worker(bundle.worker)
872         self.adjust_task_count(-1)
873         return result
874
875     def create_original_bundle(self, pickle, fname: str):
876         from string_utils import generate_uuid
877         uuid = generate_uuid(as_hex=True)
878         code_file = f'/tmp/{uuid}.code.bin'
879         result_file = f'/tmp/{uuid}.result.bin'
880
881         logger.debug(f'Writing pickled code to {code_file}')
882         with open(f'{code_file}', 'wb') as wb:
883             wb.write(pickle)
884
885         bundle = BundleDetails(
886             pickled_code = pickle,
887             uuid = uuid,
888             fname = fname,
889             worker = None,
890             username = None,
891             machine = None,
892             hostname = platform.node(),
893             code_file = code_file,
894             result_file = result_file,
895             pid = 0,
896             start_ts = time.time(),
897             end_ts = 0.0,
898             too_slow = False,
899             super_slow = False,
900             src_bundle = None,
901             is_cancelled = threading.Event(),
902             was_cancelled = False,
903             backup_bundles = [],
904             failure_count = 0,
905         )
906         self.status.record_bundle_details(bundle)
907         logger.debug(f'{uuid}/{fname}: Created original bundle')
908         return bundle
909
910     def create_backup_bundle(self, src_bundle: BundleDetails):
911         assert src_bundle.backup_bundles is not None
912         n = len(src_bundle.backup_bundles)
913         uuid = src_bundle.uuid + f'_backup#{n}'
914
915         backup_bundle = BundleDetails(
916             pickled_code = src_bundle.pickled_code,
917             uuid = uuid,
918             fname = src_bundle.fname,
919             worker = None,
920             username = None,
921             machine = None,
922             hostname = src_bundle.hostname,
923             code_file = src_bundle.code_file,
924             result_file = src_bundle.result_file,
925             pid = 0,
926             start_ts = time.time(),
927             end_ts = 0.0,
928             too_slow = False,
929             super_slow = False,
930             src_bundle = src_bundle,
931             is_cancelled = threading.Event(),
932             was_cancelled = False,
933             backup_bundles = None,    # backup backups not allowed
934             failure_count = 0,
935         )
936         src_bundle.backup_bundles.append(backup_bundle)
937         self.status.record_bundle_details_already_locked(backup_bundle)
938         logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle')
939         return backup_bundle
940
941     def schedule_backup_for_bundle(self,
942                                    src_bundle: BundleDetails):
943         assert self.status.lock.locked()
944         backup_bundle = self.create_backup_bundle(src_bundle)
945         logger.debug(
946             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
947         )
948         self._helper_executor.submit(self.launch, backup_bundle)
949
950         # Results from backups don't matter; if they finish first
951         # they will move the result_file to this machine and let
952         # the original pick them up and unpickle them.
953
954     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
955         uuid = bundle.uuid
956         is_original = bundle.src_bundle is None
957         bundle.worker = None
958         avoid_last_machine = bundle.machine
959         bundle.machine = None
960         bundle.username = None
961         bundle.failure_count += 1
962         if is_original:
963             retry_limit = 3
964         else:
965             retry_limit = 2
966
967         if bundle.failure_count > retry_limit:
968             logger.error(
969                 f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.'
970             )
971             if is_original:
972                 logger.critical(
973                     f'{uuid}: This is the original of the bundle; results will be incomplete.'
974                 )
975             else:
976                 logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')
977             return None
978         else:
979             logger.warning(
980                 f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<'
981             )
982             return self.launch(bundle, avoid_last_machine)
983
984     @overrides
985     def submit(self,
986                function: Callable,
987                *args,
988                **kwargs) -> fut.Future:
989         pickle = make_cloud_pickle(function, *args, **kwargs)
990         bundle = self.create_original_bundle(pickle, function.__name__)
991         self.total_bundles_submitted += 1
992         return self._helper_executor.submit(self.launch, bundle)
993
994     @overrides
995     def shutdown(self, wait=True) -> None:
996         self._helper_executor.shutdown(wait)
997         logging.debug(f'Shutting down RemoteExecutor {self.title}')
998         print(self.histogram)
999
1000
1001 @singleton
1002 class DefaultExecutors(object):
1003     def __init__(self):
1004         self.thread_executor: Optional[ThreadExecutor] = None
1005         self.process_executor: Optional[ProcessExecutor] = None
1006         self.remote_executor: Optional[RemoteExecutor] = None
1007
1008     def ping(self, host) -> bool:
1009         logger.debug(f'RUN> ping -c 1 {host}')
1010         command = ['ping', '-c', '1', host]
1011         return subprocess.call(
1012             command,
1013             stdout=subprocess.DEVNULL,
1014             stderr=subprocess.DEVNULL,
1015         ) == 0
1016
1017     def thread_pool(self) -> ThreadExecutor:
1018         if self.thread_executor is None:
1019             self.thread_executor = ThreadExecutor()
1020         return self.thread_executor
1021
1022     def process_pool(self) -> ProcessExecutor:
1023         if self.process_executor is None:
1024             self.process_executor = ProcessExecutor()
1025         return self.process_executor
1026
1027     def remote_pool(self) -> RemoteExecutor:
1028         if self.remote_executor is None:
1029             logger.info('Looking for some helper machines...')
1030             pool: List[RemoteWorkerRecord] = []
1031             if self.ping('cheetah.house'):
1032                 logger.info('Found cheetah.house')
1033                 pool.append(
1034                     RemoteWorkerRecord(
1035                         username = 'scott',
1036                         machine = 'cheetah.house',
1037                         weight = 12,
1038                         count = 4,
1039                     ),
1040                 )
1041             if self.ping('video.house'):
1042                 logger.info('Found video.house')
1043                 pool.append(
1044                     RemoteWorkerRecord(
1045                         username = 'scott',
1046                         machine = 'video.house',
1047                         weight = 1,
1048                         count = 4,
1049                     ),
1050                 )
1051             if self.ping('wannabe.house'):
1052                 logger.info('Found wannabe.house')
1053                 pool.append(
1054                     RemoteWorkerRecord(
1055                         username = 'scott',
1056                         machine = 'wannabe.house',
1057                         weight = 2,
1058                         count = 4,
1059                     ),
1060                 )
1061             if self.ping('meerkat.cabin'):
1062                 logger.info('Found meerkat.cabin')
1063                 pool.append(
1064                     RemoteWorkerRecord(
1065                         username = 'scott',
1066                         machine = 'meerkat.cabin',
1067                         weight = 5,
1068                         count = 2,
1069                     ),
1070                 )
1071             if self.ping('backup.house'):
1072                 logger.info('Found backup.house')
1073                 pool.append(
1074                     RemoteWorkerRecord(
1075                         username = 'scott',
1076                         machine = 'backup.house',
1077                         weight = 1,
1078                         count = 4,
1079                     ),
1080                 )
1081             if self.ping('kiosk.house'):
1082                 logger.info('Found kiosk.house')
1083                 pool.append(
1084                     RemoteWorkerRecord(
1085                         username = 'pi',
1086                         machine = 'kiosk.house',
1087                         weight = 1,
1088                         count = 2,
1089                     ),
1090                 )
1091             if self.ping('puma.cabin'):
1092                 logger.info('Found puma.cabin')
1093                 pool.append(
1094                     RemoteWorkerRecord(
1095                         username = 'scott',
1096                         machine = 'puma.cabin',
1097                         weight = 12,
1098                         count = 4,
1099                     ),
1100                 )
1101
1102             # The controller machine has a lot to do; go easy on it.
1103             for record in pool:
1104                 if record.machine == platform.node() and record.count > 1:
1105                     logger.info(f'Reducing workload for {record.machine}.')
1106                     record.count = 1
1107
1108             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1109             policy.register_worker_pool(pool)
1110             self.remote_executor = RemoteExecutor(pool, policy)
1111         return self.remote_executor