Bugfixes in executors.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20 from overrides import overrides
21
22 from ansi import bg, fg, underline, reset
23 import argparse_utils
24 import config
25 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
26 from decorator_utils import singleton
27 import histogram as hist
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55 parser.add_argument(
56     '--executors_max_bundle_failures',
57     type=int,
58     default=3,
59     metavar='#FAILURES',
60     help='Maximum number of failures before giving up on a bundle',
61 )
62
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65
66
67 def make_cloud_pickle(fun, *args, **kwargs):
68     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
69     return cloudpickle.dumps((fun, args, kwargs))
70
71
72 class BaseExecutor(ABC):
73     def __init__(self, *, title=''):
74         self.title = title
75         self.task_count = 0
76         self.histogram = hist.SimpleHistogram(
77             hist.SimpleHistogram.n_evenly_spaced_buckets(
78                 int(0), int(500), 50
79             )
80         )
81
82     @abstractmethod
83     def submit(self,
84                function: Callable,
85                *args,
86                **kwargs) -> fut.Future:
87         pass
88
89     @abstractmethod
90     def shutdown(self,
91                  wait: bool = True) -> None:
92         pass
93
94     def adjust_task_count(self, delta: int) -> None:
95         self.task_count += delta
96         logger.debug(f'Executor current task count is {self.task_count}')
97
98
99 class ThreadExecutor(BaseExecutor):
100     def __init__(self,
101                  max_workers: Optional[int] = None):
102         super().__init__()
103         workers = None
104         if max_workers is not None:
105             workers = max_workers
106         elif 'executors_threadpool_size' in config.config:
107             workers = config.config['executors_threadpool_size']
108         logger.debug(f'Creating threadpool executor with {workers} workers')
109         self._thread_pool_executor = fut.ThreadPoolExecutor(
110             max_workers=workers,
111             thread_name_prefix="thread_executor_helper"
112         )
113
114     def run_local_bundle(self, fun, *args, **kwargs):
115         logger.debug(f"Running local bundle at {fun.__name__}")
116         start = time.time()
117         result = fun(*args, **kwargs)
118         end = time.time()
119         self.adjust_task_count(-1)
120         duration = end - start
121         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
122         self.histogram.add_item(duration)
123         return result
124
125     @overrides
126     def submit(self,
127                function: Callable,
128                *args,
129                **kwargs) -> fut.Future:
130         self.adjust_task_count(+1)
131         newargs = []
132         newargs.append(function)
133         for arg in args:
134             newargs.append(arg)
135         return self._thread_pool_executor.submit(
136             self.run_local_bundle,
137             *newargs,
138             **kwargs)
139
140     @overrides
141     def shutdown(self,
142                  wait = True) -> None:
143         logger.debug(f'Shutting down threadpool executor {self.title}')
144         print(self.histogram)
145         self._thread_pool_executor.shutdown(wait)
146
147
148 class ProcessExecutor(BaseExecutor):
149     def __init__(self,
150                  max_workers=None):
151         super().__init__()
152         workers = None
153         if max_workers is not None:
154             workers = max_workers
155         elif 'executors_processpool_size' in config.config:
156             workers = config.config['executors_processpool_size']
157         logger.debug(f'Creating processpool executor with {workers} workers.')
158         self._process_executor = fut.ProcessPoolExecutor(
159             max_workers=workers,
160         )
161
162     def run_cloud_pickle(self, pickle):
163         fun, args, kwargs = cloudpickle.loads(pickle)
164         logger.debug(f"Running pickled bundle at {fun.__name__}")
165         result = fun(*args, **kwargs)
166         self.adjust_task_count(-1)
167         return result
168
169     @overrides
170     def submit(self,
171                function: Callable,
172                *args,
173                **kwargs) -> fut.Future:
174         start = time.time()
175         self.adjust_task_count(+1)
176         pickle = make_cloud_pickle(function, *args, **kwargs)
177         result = self._process_executor.submit(
178             self.run_cloud_pickle,
179             pickle
180         )
181         result.add_done_callback(
182             lambda _: self.histogram.add_item(
183                 time.time() - start
184             )
185         )
186         return result
187
188     @overrides
189     def shutdown(self, wait=True) -> None:
190         logger.debug(f'Shutting down processpool executor {self.title}')
191         self._process_executor.shutdown(wait)
192         print(self.histogram)
193
194     def __getstate__(self):
195         state = self.__dict__.copy()
196         state['_process_executor'] = None
197         return state
198
199
200 class RemoteExecutorException(Exception):
201     """Thrown when a bundle cannot be executed despite several retries."""
202     pass
203
204
205 @dataclass
206 class RemoteWorkerRecord:
207     username: str
208     machine: str
209     weight: int
210     count: int
211
212     def __hash__(self):
213         return hash((self.username, self.machine))
214
215     def __repr__(self):
216         return f'{self.username}@{self.machine}'
217
218
219 @dataclass
220 class BundleDetails:
221     pickled_code: bytes
222     uuid: str
223     fname: str
224     worker: Optional[RemoteWorkerRecord]
225     username: Optional[str]
226     machine: Optional[str]
227     hostname: str
228     code_file: str
229     result_file: str
230     pid: int
231     start_ts: float
232     end_ts: float
233     too_slow: bool
234     super_slow: bool
235     src_bundle: BundleDetails
236     is_cancelled: threading.Event
237     was_cancelled: bool
238     backup_bundles: Optional[List[BundleDetails]]
239     failure_count: int
240
241     def __repr__(self):
242         uuid = self.uuid
243         if uuid[-9:-2] == '_backup':
244             uuid = uuid[:-9]
245             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
246         else:
247             suffix = uuid[-6:]
248
249         colorz = [
250             fg('violet red'),
251             fg('red'),
252             fg('orange'),
253             fg('peach orange'),
254             fg('yellow'),
255             fg('marigold yellow'),
256             fg('green yellow'),
257             fg('tea green'),
258             fg('cornflower blue'),
259             fg('turquoise blue'),
260             fg('tropical blue'),
261             fg('lavender purple'),
262             fg('medium purple'),
263         ]
264         c = colorz[int(uuid[-2:], 16) % len(colorz)]
265         fname = self.fname if self.fname is not None else 'nofname'
266         machine = self.machine if self.machine is not None else 'nomachine'
267         return f'{c}{suffix}/{fname}/{machine}{reset()}'
268
269
270 class RemoteExecutorStatus:
271     def __init__(self, total_worker_count: int) -> None:
272         self.worker_count = total_worker_count
273         self.known_workers: Set[RemoteWorkerRecord] = set()
274         self.start_per_bundle: Dict[str, float] = defaultdict(float)
275         self.end_per_bundle: Dict[str, float] = defaultdict(float)
276         self.finished_bundle_timings_per_worker: Dict[
277             RemoteWorkerRecord,
278             List[float]
279         ] = {}
280         self.in_flight_bundles_by_worker: Dict[
281             RemoteWorkerRecord,
282             Set[str]
283         ] = {}
284         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
285         self.finished_bundle_timings: List[float] = []
286         self.last_periodic_dump: Optional[float] = None
287         self.total_bundles_submitted = 0
288
289         # Protects reads and modification using self.  Also used
290         # as a memory fence for modifications to bundle.
291         self.lock = threading.Lock()
292
293     def record_acquire_worker(
294             self,
295             worker: RemoteWorkerRecord,
296             uuid: str
297     ) -> None:
298         with self.lock:
299             self.record_acquire_worker_already_locked(
300                 worker,
301                 uuid
302             )
303
304     def record_acquire_worker_already_locked(
305             self,
306             worker: RemoteWorkerRecord,
307             uuid: str
308     ) -> None:
309         assert self.lock.locked()
310         self.known_workers.add(worker)
311         self.start_per_bundle[uuid] = None
312         x = self.in_flight_bundles_by_worker.get(worker, set())
313         x.add(uuid)
314         self.in_flight_bundles_by_worker[worker] = x
315
316     def record_bundle_details(
317             self,
318             details: BundleDetails) -> None:
319         with self.lock:
320             self.record_bundle_details_already_locked(details)
321
322     def record_bundle_details_already_locked(
323             self,
324             details: BundleDetails) -> None:
325         assert self.lock.locked()
326         self.bundle_details_by_uuid[details.uuid] = details
327
328     def record_release_worker(
329             self,
330             worker: RemoteWorkerRecord,
331             uuid: str,
332             was_cancelled: bool,
333     ) -> None:
334         with self.lock:
335             self.record_release_worker_already_locked(
336                 worker, uuid, was_cancelled
337             )
338
339     def record_release_worker_already_locked(
340             self,
341             worker: RemoteWorkerRecord,
342             uuid: str,
343             was_cancelled: bool,
344     ) -> None:
345         assert self.lock.locked()
346         ts = time.time()
347         self.end_per_bundle[uuid] = ts
348         self.in_flight_bundles_by_worker[worker].remove(uuid)
349         if not was_cancelled:
350             bundle_latency = ts - self.start_per_bundle[uuid]
351             x = self.finished_bundle_timings_per_worker.get(worker, list())
352             x.append(bundle_latency)
353             self.finished_bundle_timings_per_worker[worker] = x
354             self.finished_bundle_timings.append(bundle_latency)
355
356     def record_processing_began(self, uuid: str):
357         with self.lock:
358             self.start_per_bundle[uuid] = time.time()
359
360     def total_in_flight(self) -> int:
361         assert self.lock.locked()
362         total_in_flight = 0
363         for worker in self.known_workers:
364             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
365         return total_in_flight
366
367     def total_idle(self) -> int:
368         assert self.lock.locked()
369         return self.worker_count - self.total_in_flight()
370
371     def __repr__(self):
372         assert self.lock.locked()
373         ts = time.time()
374         total_finished = len(self.finished_bundle_timings)
375         total_in_flight = self.total_in_flight()
376         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
377         qall = None
378         if len(self.finished_bundle_timings) > 1:
379             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
380             ret += (
381                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
382                 f'✅={total_finished}/{self.total_bundles_submitted}, '
383                 f'💻n={total_in_flight}/{self.worker_count}\n'
384             )
385         else:
386             ret += (
387                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
388                 f'💻n={total_in_flight}/{self.worker_count}\n'
389             )
390
391         for worker in self.known_workers:
392             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
393             timings = self.finished_bundle_timings_per_worker.get(worker, [])
394             count = len(timings)
395             qworker = None
396             if count > 1:
397                 qworker = numpy.quantile(timings, [0.5, 0.95])
398                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
399             else:
400                 ret += '\n'
401             if count > 0:
402                 ret += f'    ...finished {count} total bundle(s) so far\n'
403             in_flight = len(self.in_flight_bundles_by_worker[worker])
404             if in_flight > 0:
405                 ret += f'    ...{in_flight} bundles currently in flight:\n'
406                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
407                     details = self.bundle_details_by_uuid.get(
408                         bundle_uuid,
409                         None
410                     )
411                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
412                     if self.start_per_bundle[bundle_uuid] is not None:
413                         sec = ts - self.start_per_bundle[bundle_uuid]
414                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
415                     else:
416                         ret += f'       {details} setting up / copying data...'
417                         sec = 0.0
418
419                     if qworker is not None:
420                         if sec > qworker[1]:
421                             ret += f'{bg("red")}>💻p95{reset()} '
422                         elif sec > qworker[0]:
423                             ret += f'{fg("red")}>💻p50{reset()} '
424                     if qall is not None:
425                         if sec > qall[1] * 1.5:
426                             ret += f'{bg("red")}!!!{reset()}'
427                             if details is not None:
428                                 logger.debug(f'Flagging {details} for another backup')
429                                 details.super_slow = True
430                         elif sec > qall[1]:
431                             ret += f'{bg("red")}>∀p95{reset()} '
432                             if details is not None:
433                                 logger.debug(f'Flagging {details} for a backup')
434                                 details.too_slow = True
435                         elif sec > qall[0]:
436                             ret += f'{fg("red")}>∀p50{reset()}'
437                     ret += '\n'
438         return ret
439
440     def periodic_dump(self, total_bundles_submitted: int) -> None:
441         assert self.lock.locked()
442         self.total_bundles_submitted = total_bundles_submitted
443         ts = time.time()
444         if (
445                 self.last_periodic_dump is None
446                 or ts - self.last_periodic_dump > 5.0
447         ):
448             print(self)
449             self.last_periodic_dump = ts
450
451
452 class RemoteWorkerSelectionPolicy(ABC):
453     def register_worker_pool(self, workers):
454         random.seed()
455         self.workers = workers
456
457     @abstractmethod
458     def is_worker_available(self) -> bool:
459         pass
460
461     @abstractmethod
462     def acquire_worker(
463             self,
464             machine_to_avoid = None
465     ) -> Optional[RemoteWorkerRecord]:
466         pass
467
468
469 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
470     def is_worker_available(self) -> bool:
471         for worker in self.workers:
472             if worker.count > 0:
473                 return True
474         return False
475
476     def acquire_worker(
477             self,
478             machine_to_avoid = None
479     ) -> Optional[RemoteWorkerRecord]:
480         grabbag = []
481         for worker in self.workers:
482             for x in range(0, worker.count):
483                 for y in range(0, worker.weight):
484                     grabbag.append(worker)
485
486         for _ in range(0, 5):
487             random.shuffle(grabbag)
488             worker = grabbag[0]
489             if worker.machine != machine_to_avoid or _ > 2:
490                 if worker.count > 0:
491                     worker.count -= 1
492                     logger.debug(f'Selected worker {worker}')
493                     return worker
494         logger.warning("Couldn't find a worker; go fish.")
495         return None
496
497
498 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
499     def __init__(self) -> None:
500         self.index = 0
501
502     def is_worker_available(self) -> bool:
503         for worker in self.workers:
504             if worker.count > 0:
505                 return True
506         return False
507
508     def acquire_worker(
509             self,
510             machine_to_avoid: str = None
511     ) -> Optional[RemoteWorkerRecord]:
512         x = self.index
513         while True:
514             worker = self.workers[x]
515             if worker.count > 0:
516                 worker.count -= 1
517                 x += 1
518                 if x >= len(self.workers):
519                     x = 0
520                 self.index = x
521                 logger.debug(f'Selected worker {worker}')
522                 return worker
523             x += 1
524             if x >= len(self.workers):
525                 x = 0
526             if x == self.index:
527                 logger.warning("Couldn't find a worker; go fish.")
528                 return None
529
530
531 class RemoteExecutor(BaseExecutor):
532     def __init__(self,
533                  workers: List[RemoteWorkerRecord],
534                  policy: RemoteWorkerSelectionPolicy) -> None:
535         super().__init__()
536         self.workers = workers
537         self.policy = policy
538         self.worker_count = 0
539         for worker in self.workers:
540             self.worker_count += worker.count
541         if self.worker_count <= 0:
542             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
543             logger.critical(msg)
544             raise RemoteExecutorException(msg)
545         self.policy.register_worker_pool(self.workers)
546         self.cv = threading.Condition()
547         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
548         self._helper_executor = fut.ThreadPoolExecutor(
549             thread_name_prefix="remote_executor_helper",
550             max_workers=self.worker_count,
551         )
552         self.status = RemoteExecutorStatus(self.worker_count)
553         self.total_bundles_submitted = 0
554
555     def is_worker_available(self) -> bool:
556         return self.policy.is_worker_available()
557
558     def acquire_worker(
559             self,
560             machine_to_avoid: str = None
561     ) -> Optional[RemoteWorkerRecord]:
562         return self.policy.acquire_worker(machine_to_avoid)
563
564     def find_available_worker_or_block(
565             self,
566             machine_to_avoid: str = None
567     ) -> RemoteWorkerRecord:
568         with self.cv:
569             while not self.is_worker_available():
570                 self.cv.wait()
571             worker = self.acquire_worker(machine_to_avoid)
572             if worker is not None:
573                 return worker
574         msg = "We should never reach this point in the code"
575         logger.critical(msg)
576         raise Exception(msg)
577
578     def release_worker(self, worker: RemoteWorkerRecord) -> None:
579         logger.debug(f'Released worker {worker}')
580         with self.cv:
581             worker.count += 1
582             self.cv.notify()
583
584     def heartbeat(self) -> None:
585         with self.status.lock:
586             # Regular progress report
587             self.status.periodic_dump(self.total_bundles_submitted)
588
589             # Look for bundles to reschedule.
590             num_done = len(self.status.finished_bundle_timings)
591             if num_done > 7 or (num_done > 5 and self.is_worker_available()):
592                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
593                     for uuid in bundle_uuids:
594                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
595                         if (
596                                 bundle is not None and
597                                 bundle.too_slow and
598                                 bundle.src_bundle is None and
599                                 config.config['executors_schedule_remote_backups']
600                         ):
601                             self.consider_backup_for_bundle(bundle)
602
603     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
604         assert self.status.lock.locked()
605         if (
606             bundle.too_slow
607             and len(bundle.backup_bundles) == 0       # one backup per
608         ):
609             msg = f"*** Rescheduling {bundle} (first backup) ***"
610             logger.debug(msg)
611             self.schedule_backup_for_bundle(bundle)
612             return
613         elif (
614                 bundle.super_slow
615                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
616                 and self.status.total_idle() > 4
617         ):
618             msg = f"*** Rescheduling {bundle} (second backup) ***"
619             logger.debug(msg)
620             self.schedule_backup_for_bundle(bundle)
621             return
622
623     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
624         with self.status.lock:
625             if bundle.is_cancelled.wait(timeout=0.0):
626                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
627                 bundle.was_cancelled = True
628                 return True
629         return False
630
631     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
632         """Find a worker for bundle or block until one is available."""
633         self.adjust_task_count(+1)
634         uuid = bundle.uuid
635         hostname = bundle.hostname
636         avoid_machine = override_avoid_machine
637         is_original = bundle.src_bundle is None
638
639         # Try not to schedule a backup on the same host as the original.
640         if avoid_machine is None and bundle.src_bundle is not None:
641             avoid_machine = bundle.src_bundle.machine
642         worker = None
643         while worker is None:
644             worker = self.find_available_worker_or_block(avoid_machine)
645
646         # Ok, found a worker.
647         bundle.worker = worker
648         machine = bundle.machine = worker.machine
649         username = bundle.username = worker.username
650
651         self.status.record_acquire_worker(worker, uuid)
652         logger.debug(f'{bundle}: Running bundle on {worker}...')
653
654         # Before we do any work, make sure the bundle is still viable.
655         if self.check_if_cancelled(bundle):
656             try:
657                 return self.post_launch_work(bundle)
658             except Exception as e:
659                 logger.exception(e)
660                 logger.error(
661                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
662                 )
663                 assert bundle.worker is not None
664                 self.status.record_release_worker(
665                     bundle.worker,
666                     bundle.uuid,
667                     True,
668                 )
669                 self.release_worker(bundle.worker)
670                 self.adjust_task_count(-1)
671                 if is_original:
672                     # Weird.  We are the original owner of this
673                     # bundle.  For it to have been cancelled, a backup
674                     # must have already started and completed before
675                     # we even for started.  Moreover, the backup says
676                     # it is done but we can't find the results it
677                     # should have copied over.  Reschedule the whole
678                     # thing.
679                     return self.emergency_retry_nasty_bundle(bundle)
680                 else:
681                     # Expected(?).  We're a backup and our bundle is
682                     # cancelled before we even got started.  Something
683                     # went bad in post_launch_work (I acutually don't
684                     # see what?) but probably not worth worrying
685                     # about.
686                     return None
687
688         # Send input code / data to worker machine if it's not local.
689         if hostname not in machine:
690             try:
691                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
692                 start_ts = time.time()
693                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
694                 run_silently(cmd)
695                 xfer_latency = time.time() - start_ts
696                 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
697             except Exception as e:
698                 logger.exception(e)
699                 logger.error(
700                     f'{bundle}: failed to send instructions to worker machine?!?'
701                 )
702                 assert bundle.worker is not None
703                 self.status.record_release_worker(
704                     bundle.worker,
705                     bundle.uuid,
706                     True,
707                 )
708                 self.release_worker(bundle.worker)
709                 self.adjust_task_count(-1)
710                 if is_original:
711                     # Weird.  We tried to copy the code to the worker and it failed...
712                     # And we're the original bundle.  We have to retry.
713                     return self.emergency_retry_nasty_bundle(bundle)
714                 else:
715                     # This is actually expected; we're a backup.
716                     # There's a race condition where someone else
717                     # already finished the work and removed the source
718                     # code file before we could copy it.  No biggie.
719                     return None
720
721         # Kick off the work.  Note that if this fails we let
722         # wait_for_process deal with it.
723         self.status.record_processing_began(uuid)
724         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
725                f'"source py39-venv/bin/activate &&'
726                f' /home/scott/lib/python_modules/remote_worker.py'
727                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
728         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
729         p = cmd_in_background(cmd, silent=True)
730         bundle.pid = pid = p.pid
731         logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.')
732         return self.wait_for_process(p, bundle, 0)
733
734     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
735         machine = bundle.machine
736         pid = p.pid
737         if depth > 3:
738             logger.error(
739                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
740             )
741             p.terminate()
742             self.status.record_release_worker(
743                 bundle.worker,
744                 bundle.uuid,
745                 True,
746             )
747             self.release_worker(bundle.worker)
748             self.adjust_task_count(-1)
749             return self.emergency_retry_nasty_bundle(bundle)
750
751         # Spin until either the ssh job we scheduled finishes the
752         # bundle or some backup worker signals that they finished it
753         # before we could.
754         while True:
755             try:
756                 p.wait(timeout=0.25)
757             except subprocess.TimeoutExpired:
758                 self.heartbeat()
759                 if self.check_if_cancelled(bundle):
760                     logger.info(
761                         f'{bundle}: another worker finished bundle, checking it out...'
762                     )
763                     break
764             else:
765                 logger.info(
766                     f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..."
767                 )
768                 p = None
769                 break
770
771         # If we get here we believe the bundle is done; either the ssh
772         # subprocess finished (hopefully successfully) or we noticed
773         # that some other worker seems to have completed the bundle
774         # and we're bailing out.
775         try:
776             ret = self.post_launch_work(bundle)
777             if ret is not None and p is not None:
778                 p.terminate()
779             return ret
780
781         # Something went wrong; e.g. we could not copy the results
782         # back, cleanup after ourselves on the remote machine, or
783         # unpickle the results we got from the remove machine.  If we
784         # still have an active ssh subprocess, keep waiting on it.
785         # Otherwise, time for an emergency reschedule.
786         except Exception as e:
787             logger.exception(e)
788             logger.error(f'{bundle}: Something unexpected just happened...')
789             if p is not None:
790                 logger.warning(
791                     f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
792                 )
793                 return self.wait_for_process(p, bundle, depth + 1)
794             else:
795                 self.status.record_release_worker(
796                     bundle.worker,
797                     bundle.uuid,
798                     True,
799                 )
800                 self.release_worker(bundle.worker)
801                 self.adjust_task_count(-1)
802                 return self.emergency_retry_nasty_bundle(bundle)
803
804     def post_launch_work(self, bundle: BundleDetails) -> Any:
805         with self.status.lock:
806             is_original = bundle.src_bundle is None
807             was_cancelled = bundle.was_cancelled
808             username = bundle.username
809             machine = bundle.machine
810             result_file = bundle.result_file
811             code_file = bundle.code_file
812
813             # Whether original or backup, if we finished first we must
814             # fetch the results if the computation happened on a
815             # remote machine.
816             bundle.end_ts = time.time()
817             if not was_cancelled:
818                 assert bundle.machine is not None
819                 if bundle.hostname not in bundle.machine:
820                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
821                     logger.info(
822                         f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
823                     )
824
825                     # If either of these throw they are handled in
826                     # wait_for_process.
827                     run_silently(cmd)
828                     run_silently(f'{SSH} {username}@{machine}'
829                                  f' "/bin/rm -f {code_file} {result_file}"')
830                 dur = bundle.end_ts - bundle.start_ts
831                 self.histogram.add_item(dur)
832
833         # Only the original worker should unpickle the file contents
834         # though since it's the only one whose result matters.  The
835         # original is also the only job that may delete result_file
836         # from disk.  Note that the original may have been cancelled
837         # if one of the backups finished first; it still must read the
838         # result from disk.
839         if is_original:
840             logger.debug(f"{bundle}: Unpickling {result_file}.")
841             try:
842                 with open(f'{result_file}', 'rb') as rb:
843                     serialized = rb.read()
844                 result = cloudpickle.loads(serialized)
845             except Exception as e:
846                 msg = f'Failed to load {result_file}, this is bad news.'
847                 logger.critical(msg)
848                 self.status.record_release_worker(
849                     bundle.worker,
850                     bundle.uuid,
851                     True,
852                 )
853                 self.release_worker(bundle.worker)
854
855                 # Re-raise the exception; the code in wait_for_process may
856                 # decide to emergency_retry_nasty_bundle here.
857                 raise Exception(e)
858
859             logger.debug(
860                 f'Removing local (master) {code_file} and {result_file}.'
861             )
862             os.remove(f'{result_file}')
863             os.remove(f'{code_file}')
864
865             # Notify any backups that the original is done so they
866             # should stop ASAP.  Do this whether or not we
867             # finished first since there could be more than one
868             # backup.
869             if bundle.backup_bundles is not None:
870                 for backup in bundle.backup_bundles:
871                     logger.debug(
872                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
873                     )
874                     backup.is_cancelled.set()
875
876         # This is a backup job and, by now, we have already fetched
877         # the bundle results.
878         else:
879             # Backup results don't matter, they just need to leave the
880             # result file in the right place for their originals to
881             # read/unpickle later.
882             result = None
883
884             # Tell the original to stop if we finished first.
885             if not was_cancelled:
886                 logger.debug(
887                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
888                 )
889                 bundle.src_bundle.is_cancelled.set()
890
891         assert bundle.worker is not None
892         self.status.record_release_worker(
893             bundle.worker,
894             bundle.uuid,
895             was_cancelled,
896         )
897         self.release_worker(bundle.worker)
898         self.adjust_task_count(-1)
899         return result
900
901     def create_original_bundle(self, pickle, fname: str):
902         from string_utils import generate_uuid
903         uuid = generate_uuid(as_hex=True)
904         code_file = f'/tmp/{uuid}.code.bin'
905         result_file = f'/tmp/{uuid}.result.bin'
906
907         logger.debug(f'Writing pickled code to {code_file}')
908         with open(f'{code_file}', 'wb') as wb:
909             wb.write(pickle)
910
911         bundle = BundleDetails(
912             pickled_code = pickle,
913             uuid = uuid,
914             fname = fname,
915             worker = None,
916             username = None,
917             machine = None,
918             hostname = platform.node(),
919             code_file = code_file,
920             result_file = result_file,
921             pid = 0,
922             start_ts = time.time(),
923             end_ts = 0.0,
924             too_slow = False,
925             super_slow = False,
926             src_bundle = None,
927             is_cancelled = threading.Event(),
928             was_cancelled = False,
929             backup_bundles = [],
930             failure_count = 0,
931         )
932         self.status.record_bundle_details(bundle)
933         logger.debug(f'{bundle}: Created an original bundle')
934         return bundle
935
936     def create_backup_bundle(self, src_bundle: BundleDetails):
937         assert src_bundle.backup_bundles is not None
938         n = len(src_bundle.backup_bundles)
939         uuid = src_bundle.uuid + f'_backup#{n}'
940
941         backup_bundle = BundleDetails(
942             pickled_code = src_bundle.pickled_code,
943             uuid = uuid,
944             fname = src_bundle.fname,
945             worker = None,
946             username = None,
947             machine = None,
948             hostname = src_bundle.hostname,
949             code_file = src_bundle.code_file,
950             result_file = src_bundle.result_file,
951             pid = 0,
952             start_ts = time.time(),
953             end_ts = 0.0,
954             too_slow = False,
955             super_slow = False,
956             src_bundle = src_bundle,
957             is_cancelled = threading.Event(),
958             was_cancelled = False,
959             backup_bundles = None,    # backup backups not allowed
960             failure_count = 0,
961         )
962         src_bundle.backup_bundles.append(backup_bundle)
963         self.status.record_bundle_details_already_locked(backup_bundle)
964         logger.debug(f'{backup_bundle}: Created a backup bundle')
965         return backup_bundle
966
967     def schedule_backup_for_bundle(self,
968                                    src_bundle: BundleDetails):
969         assert self.status.lock.locked()
970         backup_bundle = self.create_backup_bundle(src_bundle)
971         logger.debug(
972             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
973         )
974         self._helper_executor.submit(self.launch, backup_bundle)
975
976         # Results from backups don't matter; if they finish first
977         # they will move the result_file to this machine and let
978         # the original pick them up and unpickle them.
979
980     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
981         is_original = bundle.src_bundle is None
982         bundle.worker = None
983         avoid_last_machine = bundle.machine
984         bundle.machine = None
985         bundle.username = None
986         bundle.failure_count += 1
987         if is_original:
988             retry_limit = 3
989         else:
990             retry_limit = 2
991
992         if bundle.failure_count > retry_limit:
993             logger.error(
994                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
995             )
996             if is_original:
997                 raise RemoteExecutorException(
998                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
999                 )
1000             else:
1001                 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
1002             return None
1003         else:
1004             logger.warning(
1005                 f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1006             )
1007             return self.launch(bundle, avoid_last_machine)
1008
1009     @overrides
1010     def submit(self,
1011                function: Callable,
1012                *args,
1013                **kwargs) -> fut.Future:
1014         pickle = make_cloud_pickle(function, *args, **kwargs)
1015         bundle = self.create_original_bundle(pickle, function.__name__)
1016         self.total_bundles_submitted += 1
1017         return self._helper_executor.submit(self.launch, bundle)
1018
1019     @overrides
1020     def shutdown(self, wait=True) -> None:
1021         self._helper_executor.shutdown(wait)
1022         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1023         print(self.histogram)
1024
1025
1026 @singleton
1027 class DefaultExecutors(object):
1028     def __init__(self):
1029         self.thread_executor: Optional[ThreadExecutor] = None
1030         self.process_executor: Optional[ProcessExecutor] = None
1031         self.remote_executor: Optional[RemoteExecutor] = None
1032
1033     def ping(self, host) -> bool:
1034         logger.debug(f'RUN> ping -c 1 {host}')
1035         try:
1036             x = cmd_with_timeout(
1037                 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1038                 timeout_seconds=1.0
1039             )
1040             return x == 0
1041         except Exception:
1042             return False
1043
1044     def thread_pool(self) -> ThreadExecutor:
1045         if self.thread_executor is None:
1046             self.thread_executor = ThreadExecutor()
1047         return self.thread_executor
1048
1049     def process_pool(self) -> ProcessExecutor:
1050         if self.process_executor is None:
1051             self.process_executor = ProcessExecutor()
1052         return self.process_executor
1053
1054     def remote_pool(self) -> RemoteExecutor:
1055         if self.remote_executor is None:
1056             logger.info('Looking for some helper machines...')
1057             pool: List[RemoteWorkerRecord] = []
1058             if self.ping('cheetah.house'):
1059                 logger.info('Found cheetah.house')
1060                 pool.append(
1061                     RemoteWorkerRecord(
1062                         username = 'scott',
1063                         machine = 'cheetah.house',
1064                         weight = 14,
1065                         count = 4,
1066                     ),
1067                 )
1068             if self.ping('video.house'):
1069                 logger.info('Found video.house')
1070                 pool.append(
1071                     RemoteWorkerRecord(
1072                         username = 'scott',
1073                         machine = 'video.house',
1074                         weight = 1,
1075                         count = 4,
1076                     ),
1077                 )
1078             if self.ping('wannabe.house'):
1079                 logger.info('Found wannabe.house')
1080                 pool.append(
1081                     RemoteWorkerRecord(
1082                         username = 'scott',
1083                         machine = 'wannabe.house',
1084                         weight = 2,
1085                         count = 4,
1086                     ),
1087                 )
1088             if self.ping('meerkat.cabin'):
1089                 logger.info('Found meerkat.cabin')
1090                 pool.append(
1091                     RemoteWorkerRecord(
1092                         username = 'scott',
1093                         machine = 'meerkat.cabin',
1094                         weight = 5,
1095                         count = 2,
1096                     ),
1097                 )
1098             if self.ping('backup.house'):
1099                 logger.info('Found backup.house')
1100                 pool.append(
1101                     RemoteWorkerRecord(
1102                         username = 'scott',
1103                         machine = 'backup.house',
1104                         weight = 1,
1105                         count = 4,
1106                     ),
1107                 )
1108             if self.ping('kiosk.house'):
1109                 logger.info('Found kiosk.house')
1110                 pool.append(
1111                     RemoteWorkerRecord(
1112                         username = 'pi',
1113                         machine = 'kiosk.house',
1114                         weight = 1,
1115                         count = 2,
1116                     ),
1117                 )
1118             if self.ping('puma.cabin'):
1119                 logger.info('Found puma.cabin')
1120                 pool.append(
1121                     RemoteWorkerRecord(
1122                         username = 'scott',
1123                         machine = 'puma.cabin',
1124                         weight = 12,
1125                         count = 4,
1126                     ),
1127                 )
1128
1129             # The controller machine has a lot to do; go easy on it.
1130             for record in pool:
1131                 if record.machine == platform.node() and record.count > 1:
1132                     logger.info(f'Reducing workload for {record.machine}.')
1133                     record.count = 1
1134
1135             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1136             policy.register_worker_pool(pool)
1137             self.remote_executor = RemoteExecutor(pool, policy)
1138         return self.remote_executor