Make remote workers die if no longer needed; cleanups in executors.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20 from overrides import overrides
21
22 from ansi import bg, fg, underline, reset
23 import argparse_utils
24 import config
25 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
26 from decorator_utils import singleton
27 import histogram as hist
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55 parser.add_argument(
56     '--executors_max_bundle_failures',
57     type=int,
58     default=3,
59     metavar='#FAILURES',
60     help='Maximum number of failures before giving up on a bundle',
61 )
62
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65
66
67 def make_cloud_pickle(fun, *args, **kwargs):
68     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
69     return cloudpickle.dumps((fun, args, kwargs))
70
71
72 class BaseExecutor(ABC):
73     def __init__(self, *, title=''):
74         self.title = title
75         self.task_count = 0
76         self.histogram = hist.SimpleHistogram(
77             hist.SimpleHistogram.n_evenly_spaced_buckets(
78                 int(0), int(500), 50
79             )
80         )
81
82     @abstractmethod
83     def submit(self,
84                function: Callable,
85                *args,
86                **kwargs) -> fut.Future:
87         pass
88
89     @abstractmethod
90     def shutdown(self,
91                  wait: bool = True) -> None:
92         pass
93
94     def adjust_task_count(self, delta: int) -> None:
95         self.task_count += delta
96         logger.debug(f'Executor current task count is {self.task_count}')
97
98
99 class ThreadExecutor(BaseExecutor):
100     def __init__(self,
101                  max_workers: Optional[int] = None):
102         super().__init__()
103         workers = None
104         if max_workers is not None:
105             workers = max_workers
106         elif 'executors_threadpool_size' in config.config:
107             workers = config.config['executors_threadpool_size']
108         logger.debug(f'Creating threadpool executor with {workers} workers')
109         self._thread_pool_executor = fut.ThreadPoolExecutor(
110             max_workers=workers,
111             thread_name_prefix="thread_executor_helper"
112         )
113
114     def run_local_bundle(self, fun, *args, **kwargs):
115         logger.debug(f"Running local bundle at {fun.__name__}")
116         start = time.time()
117         result = fun(*args, **kwargs)
118         end = time.time()
119         self.adjust_task_count(-1)
120         duration = end - start
121         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
122         self.histogram.add_item(duration)
123         return result
124
125     @overrides
126     def submit(self,
127                function: Callable,
128                *args,
129                **kwargs) -> fut.Future:
130         self.adjust_task_count(+1)
131         newargs = []
132         newargs.append(function)
133         for arg in args:
134             newargs.append(arg)
135         return self._thread_pool_executor.submit(
136             self.run_local_bundle,
137             *newargs,
138             **kwargs)
139
140     @overrides
141     def shutdown(self,
142                  wait = True) -> None:
143         logger.debug(f'Shutting down threadpool executor {self.title}')
144         print(self.histogram)
145         self._thread_pool_executor.shutdown(wait)
146
147
148 class ProcessExecutor(BaseExecutor):
149     def __init__(self,
150                  max_workers=None):
151         super().__init__()
152         workers = None
153         if max_workers is not None:
154             workers = max_workers
155         elif 'executors_processpool_size' in config.config:
156             workers = config.config['executors_processpool_size']
157         logger.debug(f'Creating processpool executor with {workers} workers.')
158         self._process_executor = fut.ProcessPoolExecutor(
159             max_workers=workers,
160         )
161
162     def run_cloud_pickle(self, pickle):
163         fun, args, kwargs = cloudpickle.loads(pickle)
164         logger.debug(f"Running pickled bundle at {fun.__name__}")
165         result = fun(*args, **kwargs)
166         self.adjust_task_count(-1)
167         return result
168
169     @overrides
170     def submit(self,
171                function: Callable,
172                *args,
173                **kwargs) -> fut.Future:
174         start = time.time()
175         self.adjust_task_count(+1)
176         pickle = make_cloud_pickle(function, *args, **kwargs)
177         result = self._process_executor.submit(
178             self.run_cloud_pickle,
179             pickle
180         )
181         result.add_done_callback(
182             lambda _: self.histogram.add_item(
183                 time.time() - start
184             )
185         )
186         return result
187
188     @overrides
189     def shutdown(self, wait=True) -> None:
190         logger.debug(f'Shutting down processpool executor {self.title}')
191         self._process_executor.shutdown(wait)
192         print(self.histogram)
193
194     def __getstate__(self):
195         state = self.__dict__.copy()
196         state['_process_executor'] = None
197         return state
198
199
200 class RemoteExecutorException(Exception):
201     """Thrown when a bundle cannot be executed despite several retries."""
202     pass
203
204
205 @dataclass
206 class RemoteWorkerRecord:
207     username: str
208     machine: str
209     weight: int
210     count: int
211
212     def __hash__(self):
213         return hash((self.username, self.machine))
214
215     def __repr__(self):
216         return f'{self.username}@{self.machine}'
217
218
219 @dataclass
220 class BundleDetails:
221     pickled_code: bytes
222     uuid: str
223     fname: str
224     worker: Optional[RemoteWorkerRecord]
225     username: Optional[str]
226     machine: Optional[str]
227     hostname: str
228     code_file: str
229     result_file: str
230     pid: int
231     start_ts: float
232     end_ts: float
233     too_slow: bool
234     super_slow: bool
235     src_bundle: BundleDetails
236     is_cancelled: threading.Event
237     was_cancelled: bool
238     backup_bundles: Optional[List[BundleDetails]]
239     failure_count: int
240
241
242 class RemoteExecutorStatus:
243     def __init__(self, total_worker_count: int) -> None:
244         self.worker_count = total_worker_count
245         self.known_workers: Set[RemoteWorkerRecord] = set()
246         self.start_per_bundle: Dict[str, float] = defaultdict(float)
247         self.end_per_bundle: Dict[str, float] = defaultdict(float)
248         self.finished_bundle_timings_per_worker: Dict[
249             RemoteWorkerRecord,
250             List[float]
251         ] = {}
252         self.in_flight_bundles_by_worker: Dict[
253             RemoteWorkerRecord,
254             Set[str]
255         ] = {}
256         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
257         self.finished_bundle_timings: List[float] = []
258         self.last_periodic_dump: Optional[float] = None
259         self.total_bundles_submitted = 0
260
261         # Protects reads and modification using self.  Also used
262         # as a memory fence for modifications to bundle.
263         self.lock = threading.Lock()
264
265     def record_acquire_worker(
266             self,
267             worker: RemoteWorkerRecord,
268             uuid: str
269     ) -> None:
270         with self.lock:
271             self.record_acquire_worker_already_locked(
272                 worker,
273                 uuid
274             )
275
276     def record_acquire_worker_already_locked(
277             self,
278             worker: RemoteWorkerRecord,
279             uuid: str
280     ) -> None:
281         assert self.lock.locked()
282         self.known_workers.add(worker)
283         self.start_per_bundle[uuid] = None
284         x = self.in_flight_bundles_by_worker.get(worker, set())
285         x.add(uuid)
286         self.in_flight_bundles_by_worker[worker] = x
287
288     def record_bundle_details(
289             self,
290             details: BundleDetails) -> None:
291         with self.lock:
292             self.record_bundle_details_already_locked(details)
293
294     def record_bundle_details_already_locked(
295             self,
296             details: BundleDetails) -> None:
297         assert self.lock.locked()
298         self.bundle_details_by_uuid[details.uuid] = details
299
300     def record_release_worker(
301             self,
302             worker: RemoteWorkerRecord,
303             uuid: str,
304             was_cancelled: bool,
305     ) -> None:
306         with self.lock:
307             self.record_release_worker_already_locked(
308                 worker, uuid, was_cancelled
309             )
310
311     def record_release_worker_already_locked(
312             self,
313             worker: RemoteWorkerRecord,
314             uuid: str,
315             was_cancelled: bool,
316     ) -> None:
317         assert self.lock.locked()
318         ts = time.time()
319         self.end_per_bundle[uuid] = ts
320         self.in_flight_bundles_by_worker[worker].remove(uuid)
321         if not was_cancelled:
322             bundle_latency = ts - self.start_per_bundle[uuid]
323             x = self.finished_bundle_timings_per_worker.get(worker, list())
324             x.append(bundle_latency)
325             self.finished_bundle_timings_per_worker[worker] = x
326             self.finished_bundle_timings.append(bundle_latency)
327
328     def record_processing_began(self, uuid: str):
329         with self.lock:
330             self.start_per_bundle[uuid] = time.time()
331
332     def total_in_flight(self) -> int:
333         assert self.lock.locked()
334         total_in_flight = 0
335         for worker in self.known_workers:
336             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
337         return total_in_flight
338
339     def total_idle(self) -> int:
340         assert self.lock.locked()
341         return self.worker_count - self.total_in_flight()
342
343     def __repr__(self):
344         assert self.lock.locked()
345         ts = time.time()
346         total_finished = len(self.finished_bundle_timings)
347         total_in_flight = self.total_in_flight()
348         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
349         qall = None
350         if len(self.finished_bundle_timings) > 1:
351             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
352             ret += (
353                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
354                 f'✅={total_finished}/{self.total_bundles_submitted}, '
355                 f'💻n={total_in_flight}/{self.worker_count}\n'
356             )
357         else:
358             ret += (
359                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
360                 f'💻n={total_in_flight}/{self.worker_count}\n'
361             )
362
363         for worker in self.known_workers:
364             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
365             timings = self.finished_bundle_timings_per_worker.get(worker, [])
366             count = len(timings)
367             qworker = None
368             if count > 1:
369                 qworker = numpy.quantile(timings, [0.5, 0.95])
370                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
371             else:
372                 ret += '\n'
373             if count > 0:
374                 ret += f'    ...finished {count} total bundle(s) so far\n'
375             in_flight = len(self.in_flight_bundles_by_worker[worker])
376             if in_flight > 0:
377                 ret += f'    ...{in_flight} bundles currently in flight:\n'
378                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
379                     details = self.bundle_details_by_uuid.get(
380                         bundle_uuid,
381                         None
382                     )
383                     pid = str(details.pid) if details is not None else "TBD"
384                     if self.start_per_bundle[bundle_uuid] is not None:
385                         sec = ts - self.start_per_bundle[bundle_uuid]
386                         ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
387                     else:
388                         ret += f'       {bundle_uuid} setting up / copying data...'
389                         sec = 0.0
390
391                     if qworker is not None:
392                         if sec > qworker[1]:
393                             ret += f'{bg("red")}>💻p95{reset()} '
394                         elif sec > qworker[0]:
395                             ret += f'{fg("red")}>💻p50{reset()} '
396                     if qall is not None:
397                         if sec > qall[1] * 1.5:
398                             ret += f'{bg("red")}!!!{reset()}'
399                             if details is not None:
400                                 logger.debug(f'Flagging {details.uuid} for another backup')
401                                 details.super_slow = True
402                         elif sec > qall[1]:
403                             ret += f'{bg("red")}>∀p95{reset()} '
404                             if details is not None:
405                                 logger.debug(f'Flagging {details.uuid} for a backup')
406                                 details.too_slow = True
407                         elif sec > qall[0]:
408                             ret += f'{fg("red")}>∀p50{reset()}'
409                     ret += '\n'
410         return ret
411
412     def periodic_dump(self, total_bundles_submitted: int) -> None:
413         assert self.lock.locked()
414         self.total_bundles_submitted = total_bundles_submitted
415         ts = time.time()
416         if (
417                 self.last_periodic_dump is None
418                 or ts - self.last_periodic_dump > 5.0
419         ):
420             print(self)
421             self.last_periodic_dump = ts
422
423
424 class RemoteWorkerSelectionPolicy(ABC):
425     def register_worker_pool(self, workers):
426         random.seed()
427         self.workers = workers
428
429     @abstractmethod
430     def is_worker_available(self) -> bool:
431         pass
432
433     @abstractmethod
434     def acquire_worker(
435             self,
436             machine_to_avoid = None
437     ) -> Optional[RemoteWorkerRecord]:
438         pass
439
440
441 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
442     def is_worker_available(self) -> bool:
443         for worker in self.workers:
444             if worker.count > 0:
445                 return True
446         return False
447
448     def acquire_worker(
449             self,
450             machine_to_avoid = None
451     ) -> Optional[RemoteWorkerRecord]:
452         grabbag = []
453         for worker in self.workers:
454             for x in range(0, worker.count):
455                 for y in range(0, worker.weight):
456                     grabbag.append(worker)
457
458         for _ in range(0, 5):
459             random.shuffle(grabbag)
460             worker = grabbag[0]
461             if worker.machine != machine_to_avoid or _ > 2:
462                 if worker.count > 0:
463                     worker.count -= 1
464                     logger.debug(f'Selected worker {worker}')
465                     return worker
466         logger.warning("Couldn't find a worker; go fish.")
467         return None
468
469
470 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
471     def __init__(self) -> None:
472         self.index = 0
473
474     def is_worker_available(self) -> bool:
475         for worker in self.workers:
476             if worker.count > 0:
477                 return True
478         return False
479
480     def acquire_worker(
481             self,
482             machine_to_avoid: str = None
483     ) -> Optional[RemoteWorkerRecord]:
484         x = self.index
485         while True:
486             worker = self.workers[x]
487             if worker.count > 0:
488                 worker.count -= 1
489                 x += 1
490                 if x >= len(self.workers):
491                     x = 0
492                 self.index = x
493                 logger.debug(f'Selected worker {worker}')
494                 return worker
495             x += 1
496             if x >= len(self.workers):
497                 x = 0
498             if x == self.index:
499                 logger.warning("Couldn't find a worker; go fish.")
500                 return None
501
502
503 class RemoteExecutor(BaseExecutor):
504     def __init__(self,
505                  workers: List[RemoteWorkerRecord],
506                  policy: RemoteWorkerSelectionPolicy) -> None:
507         super().__init__()
508         self.workers = workers
509         self.policy = policy
510         self.worker_count = 0
511         for worker in self.workers:
512             self.worker_count += worker.count
513         if self.worker_count <= 0:
514             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
515             logger.critical(msg)
516             raise RemoteExecutorException(msg)
517         self.policy.register_worker_pool(self.workers)
518         self.cv = threading.Condition()
519         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
520         self._helper_executor = fut.ThreadPoolExecutor(
521             thread_name_prefix="remote_executor_helper",
522             max_workers=self.worker_count,
523         )
524         self.status = RemoteExecutorStatus(self.worker_count)
525         self.total_bundles_submitted = 0
526
527     def bundle_prefix(self, bundle: BundleDetails) -> str:
528         colorz = [
529             fg('violet red'),
530             fg('red'),
531             fg('orange'),
532             fg('peach orange'),
533             fg('yellow'),
534             fg('marigold yellow'),
535             fg('green yellow'),
536             fg('tea green'),
537             fg('cornflower blue'),
538             fg('turquoise blue'),
539             fg('tropical blue'),
540             fg('lavender purple'),
541             fg('medium purple'),
542         ]
543         c = colorz[int(bundle.uuid[-2:], 16) % len(colorz)]
544         fname = bundle.fname if bundle.fname is not None else 'nofname'
545         machine = bundle.machine if bundle.machine is not None else 'nomachine'
546         return f'{c}{bundle.uuid[-8:]}/{fname}/{machine}{reset()}'
547
548     def is_worker_available(self) -> bool:
549         return self.policy.is_worker_available()
550
551     def acquire_worker(
552             self,
553             machine_to_avoid: str = None
554     ) -> Optional[RemoteWorkerRecord]:
555         return self.policy.acquire_worker(machine_to_avoid)
556
557     def find_available_worker_or_block(
558             self,
559             machine_to_avoid: str = None
560     ) -> RemoteWorkerRecord:
561         with self.cv:
562             while not self.is_worker_available():
563                 self.cv.wait()
564             worker = self.acquire_worker(machine_to_avoid)
565             if worker is not None:
566                 return worker
567         msg = "We should never reach this point in the code"
568         logger.critical(msg)
569         raise Exception(msg)
570
571     def release_worker(self, worker: RemoteWorkerRecord) -> None:
572         logger.debug(f'Released worker {worker}')
573         with self.cv:
574             worker.count += 1
575             self.cv.notify()
576
577     def heartbeat(self) -> None:
578         with self.status.lock:
579             # Regular progress report
580             self.status.periodic_dump(self.total_bundles_submitted)
581
582             # Look for bundles to reschedule.
583             num_done = len(self.status.finished_bundle_timings)
584             if num_done > 7 or (num_done > 5 and self.is_worker_available()):
585                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
586                     for uuid in bundle_uuids:
587                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
588                         if (
589                                 bundle is not None and
590                                 bundle.too_slow and
591                                 bundle.src_bundle is None and
592                                 config.config['executors_schedule_remote_backups']
593                         ):
594                             self.consider_backup_for_bundle(bundle)
595
596     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
597         assert self.status.lock.locked()
598         if (
599             bundle.too_slow
600             and len(bundle.backup_bundles) == 0       # one backup per
601         ):
602             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
603             logger.debug(msg)
604             self.schedule_backup_for_bundle(bundle)
605             return
606         elif (
607                 bundle.super_slow
608                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
609                 and self.status.total_idle() > 4
610         ):
611             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
612             logger.debug(msg)
613             self.schedule_backup_for_bundle(bundle)
614             return
615
616     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
617         with self.status.lock:
618             if bundle.is_cancelled.wait(timeout=0.0):
619                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
620                 bundle.was_cancelled = True
621                 return True
622         return False
623
624     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
625         """Find a worker for bundle or block until one is available."""
626         self.adjust_task_count(+1)
627         uuid = bundle.uuid
628         hostname = bundle.hostname
629         avoid_machine = override_avoid_machine
630         is_original = bundle.src_bundle is None
631
632         # Try not to schedule a backup on the same host as the original.
633         if avoid_machine is None and bundle.src_bundle is not None:
634             avoid_machine = bundle.src_bundle.machine
635         worker = None
636         while worker is None:
637             worker = self.find_available_worker_or_block(avoid_machine)
638
639         # Ok, found a worker.
640         bundle.worker = worker
641         machine = bundle.machine = worker.machine
642         username = bundle.username = worker.username
643         fname = bundle.fname
644         self.status.record_acquire_worker(worker, uuid)
645         logger.debug(f'{self.bundle_prefix(bundle)}: Running bundle on {worker}...')
646
647         # Before we do any work, make sure the bundle is still viable.
648         if self.check_if_cancelled(bundle):
649             try:
650                 return self.post_launch_work(bundle)
651             except Exception as e:
652                 logger.exception(e)
653                 logger.error(
654                     f'{self.bundle_prefix(bundle)}: bundle says it\'s cancelled upfront but no results?!'
655                 )
656                 assert bundle.worker is not None
657                 self.status.record_release_worker(
658                     bundle.worker,
659                     bundle.uuid,
660                     True,
661                 )
662                 self.release_worker(bundle.worker)
663                 self.adjust_task_count(-1)
664                 if is_original:
665                     # Weird.  We are the original owner of this
666                     # bundle.  For it to have been cancelled, a backup
667                     # must have already started and completed before
668                     # we even for started.  Moreover, the backup says
669                     # it is done but we can't find the results it
670                     # should have copied over.  Reschedule the whole
671                     # thing.
672                     return self.emergency_retry_nasty_bundle(bundle)
673                 else:
674                     # Expected(?).  We're a backup and our bundle is
675                     # cancelled before we even got started.  Something
676                     # went bad in post_launch_work (I acutually don't
677                     # see what?) but probably not worth worrying
678                     # about.
679                     return None
680
681         # Send input code / data to worker machine if it's not local.
682         if hostname not in machine:
683             try:
684                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
685                 start_ts = time.time()
686                 logger.info(f"{self.bundle_prefix(bundle)}: Copying work to {worker} via {cmd}.")
687                 run_silently(cmd)
688                 xfer_latency = time.time() - start_ts
689                 logger.info(f"{self.bundle_prefix(bundle)}: Copying done to {worker} in {xfer_latency:.1f}s.")
690             except Exception as e:
691                 logger.exception(e)
692                 logger.error(
693                     f'{self.bundle_prefix(bundle)}: failed to send instructions to worker machine?!?'
694                 )
695                 assert bundle.worker is not None
696                 self.status.record_release_worker(
697                     bundle.worker,
698                     bundle.uuid,
699                     True,
700                 )
701                 self.release_worker(bundle.worker)
702                 self.adjust_task_count(-1)
703                 if is_original:
704                     # Weird.  We tried to copy the code to the worker and it failed...
705                     # And we're the original bundle.  We have to retry.
706                     return self.emergency_retry_nasty_bundle(bundle)
707                 else:
708                     # This is actually expected; we're a backup.
709                     # There's a race condition where someone else
710                     # already finished the work and removed the source
711                     # code file before we could copy it.  No biggie.
712                     return None
713
714         # Kick off the work.  Note that if this fails we let
715         # wait_for_process deal with it.
716         self.status.record_processing_began(uuid)
717         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
718                f'"source py39-venv/bin/activate &&'
719                f' /home/scott/lib/python_modules/remote_worker.py'
720                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
721         logger.debug(f'{self.bundle_prefix(bundle)}: Executing {cmd} in the background to kick off work...')
722         p = cmd_in_background(cmd, silent=True)
723         bundle.pid = pid = p.pid
724         logger.debug(f'{self.bundle_prefix(bundle)}: Local ssh process pid={pid}; remote worker is {machine}.')
725         return self.wait_for_process(p, bundle, 0)
726
727     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
728         uuid = bundle.uuid
729         machine = bundle.machine
730         fname = bundle.fname
731         pid = p.pid
732         if depth > 3:
733             logger.error(
734                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
735             )
736             p.terminate()
737             self.status.record_release_worker(
738                 bundle.worker,
739                 bundle.uuid,
740                 True,
741             )
742             self.release_worker(bundle.worker)
743             self.adjust_task_count(-1)
744             return self.emergency_retry_nasty_bundle(bundle)
745
746         # Spin until either the ssh job we scheduled finishes the
747         # bundle or some backup worker signals that they finished it
748         # before we could.
749         while True:
750             try:
751                 p.wait(timeout=0.25)
752             except subprocess.TimeoutExpired:
753                 self.heartbeat()
754                 if self.check_if_cancelled(bundle):
755                     logger.info(
756                         f'{self.bundle_prefix(bundle)}: another worker finished bundle, checking it out...'
757                     )
758                     break
759             else:
760                 logger.info(
761                     f"{self.bundle_prefix(bundle)}: pid {pid} ({machine}) our ssh finished, checking it out..."
762                 )
763                 p = None
764                 break
765
766         # If we get here we believe the bundle is done; either the ssh
767         # subprocess finished (hopefully successfully) or we noticed
768         # that some other worker seems to have completed the bundle
769         # and we're bailing out.
770         try:
771             ret = self.post_launch_work(bundle)
772             if ret is not None and p is not None:
773                 p.terminate()
774             return ret
775
776         # Something went wrong; e.g. we could not copy the results
777         # back, cleanup after ourselves on the remote machine, or
778         # unpickle the results we got from the remove machine.  If we
779         # still have an active ssh subprocess, keep waiting on it.
780         # Otherwise, time for an emergency reschedule.
781         except Exception as e:
782             logger.exception(e)
783             logger.error(f'{self.bundle_prefix(bundle)}: Something unexpected just happened...')
784             if p is not None:
785                 logger.warning(
786                     f"{self.bundle_prefix(bundle)}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
787                 )
788                 return self.wait_for_process(p, bundle, depth + 1)
789             else:
790                 self.status.record_release_worker(
791                     bundle.worker,
792                     bundle.uuid,
793                     True,
794                 )
795                 self.release_worker(bundle.worker)
796                 self.adjust_task_count(-1)
797                 return self.emergency_retry_nasty_bundle(bundle)
798
799     def post_launch_work(self, bundle: BundleDetails) -> Any:
800         with self.status.lock:
801             is_original = bundle.src_bundle is None
802             was_cancelled = bundle.was_cancelled
803             username = bundle.username
804             machine = bundle.machine
805             result_file = bundle.result_file
806             code_file = bundle.code_file
807             fname = bundle.fname
808             uuid = bundle.uuid
809
810             # Whether original or backup, if we finished first we must
811             # fetch the results if the computation happened on a
812             # remote machine.
813             bundle.end_ts = time.time()
814             if not was_cancelled:
815                 assert bundle.machine is not None
816                 if bundle.hostname not in bundle.machine:
817                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
818                     logger.info(
819                         f"{self.bundle_prefix(bundle)}: Fetching results from {username}@{machine} via {cmd}"
820                     )
821
822                     # If either of these throw they are handled in
823                     # wait_for_process.
824                     run_silently(cmd)
825                     run_silently(f'{SSH} {username}@{machine}'
826                                  f' "/bin/rm -f {code_file} {result_file}"')
827                 dur = bundle.end_ts - bundle.start_ts
828                 self.histogram.add_item(dur)
829
830         # Only the original worker should unpickle the file contents
831         # though since it's the only one whose result matters.  The
832         # original is also the only job that may delete result_file
833         # from disk.  Note that the original may have been cancelled
834         # if one of the backups finished first; it still must read the
835         # result from disk.
836         if is_original:
837             logger.debug(f"{self.bundle_prefix(bundle)}: Unpickling {result_file}.")
838             try:
839                 with open(f'{result_file}', 'rb') as rb:
840                     serialized = rb.read()
841                 result = cloudpickle.loads(serialized)
842             except Exception as e:
843                 msg = f'Failed to load {result_file}, this is bad news.'
844                 logger.critical(msg)
845                 self.status.record_release_worker(
846                     bundle.worker,
847                     bundle.uuid,
848                     True,
849                 )
850                 self.release_worker(bundle.worker)
851
852                 # Re-raise the exception; the code in wait_for_process may
853                 # decide to emergency_retry_nasty_bundle here.
854                 raise Exception(e)
855
856             logger.debug(
857                 f'Removing local (master) {code_file} and {result_file}.'
858             )
859             os.remove(f'{result_file}')
860             os.remove(f'{code_file}')
861
862             # Notify any backups that the original is done so they
863             # should stop ASAP.  Do this whether or not we
864             # finished first since there could be more than one
865             # backup.
866             if bundle.backup_bundles is not None:
867                 for backup in bundle.backup_bundles:
868                     logger.debug(
869                         f'{self.bundle_prefix(bundle)}: Notifying backup {backup.uuid} that it\'s cancelled'
870                     )
871                     backup.is_cancelled.set()
872
873         # This is a backup job and, by now, we have already fetched
874         # the bundle results.
875         else:
876             # Backup results don't matter, they just need to leave the
877             # result file in the right place for their originals to
878             # read/unpickle later.
879             result = None
880
881             # Tell the original to stop if we finished first.
882             if not was_cancelled:
883                 logger.debug(
884                     f'{self.bundle_prefix(bundle)}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
885                 )
886                 bundle.src_bundle.is_cancelled.set()
887
888         assert bundle.worker is not None
889         self.status.record_release_worker(
890             bundle.worker,
891             bundle.uuid,
892             was_cancelled,
893         )
894         self.release_worker(bundle.worker)
895         self.adjust_task_count(-1)
896         return result
897
898     def create_original_bundle(self, pickle, fname: str):
899         from string_utils import generate_uuid
900         uuid = generate_uuid(as_hex=True)
901         code_file = f'/tmp/{uuid}.code.bin'
902         result_file = f'/tmp/{uuid}.result.bin'
903
904         logger.debug(f'Writing pickled code to {code_file}')
905         with open(f'{code_file}', 'wb') as wb:
906             wb.write(pickle)
907
908         bundle = BundleDetails(
909             pickled_code = pickle,
910             uuid = uuid,
911             fname = fname,
912             worker = None,
913             username = None,
914             machine = None,
915             hostname = platform.node(),
916             code_file = code_file,
917             result_file = result_file,
918             pid = 0,
919             start_ts = time.time(),
920             end_ts = 0.0,
921             too_slow = False,
922             super_slow = False,
923             src_bundle = None,
924             is_cancelled = threading.Event(),
925             was_cancelled = False,
926             backup_bundles = [],
927             failure_count = 0,
928         )
929         self.status.record_bundle_details(bundle)
930         logger.debug(f'{self.bundle_prefix(bundle)}: Created an original bundle')
931         return bundle
932
933     def create_backup_bundle(self, src_bundle: BundleDetails):
934         assert src_bundle.backup_bundles is not None
935         n = len(src_bundle.backup_bundles)
936         uuid = src_bundle.uuid + f'_backup#{n}'
937
938         backup_bundle = BundleDetails(
939             pickled_code = src_bundle.pickled_code,
940             uuid = uuid,
941             fname = src_bundle.fname,
942             worker = None,
943             username = None,
944             machine = None,
945             hostname = src_bundle.hostname,
946             code_file = src_bundle.code_file,
947             result_file = src_bundle.result_file,
948             pid = 0,
949             start_ts = time.time(),
950             end_ts = 0.0,
951             too_slow = False,
952             super_slow = False,
953             src_bundle = src_bundle,
954             is_cancelled = threading.Event(),
955             was_cancelled = False,
956             backup_bundles = None,    # backup backups not allowed
957             failure_count = 0,
958         )
959         src_bundle.backup_bundles.append(backup_bundle)
960         self.status.record_bundle_details_already_locked(backup_bundle)
961         logger.debug(f'{self.bundle_prefix(bundle)}: Created a backup bundle')
962         return backup_bundle
963
964     def schedule_backup_for_bundle(self,
965                                    src_bundle: BundleDetails):
966         assert self.status.lock.locked()
967         backup_bundle = self.create_backup_bundle(src_bundle)
968         logger.debug(
969             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
970         )
971         self._helper_executor.submit(self.launch, backup_bundle)
972
973         # Results from backups don't matter; if they finish first
974         # they will move the result_file to this machine and let
975         # the original pick them up and unpickle them.
976
977     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
978         uuid = bundle.uuid
979         is_original = bundle.src_bundle is None
980         bundle.worker = None
981         avoid_last_machine = bundle.machine
982         bundle.machine = None
983         bundle.username = None
984         bundle.failure_count += 1
985         if is_original:
986             retry_limit = 3
987         else:
988             retry_limit = 2
989
990         if bundle.failure_count > retry_limit:
991             logger.error(
992                 f'{self.bundle_prefix(bundle)}: Tried this bundle too many times already ({retry_limit}x); giving up.'
993             )
994             if is_original:
995                 raise RemoteExecutorException(
996                     f'{self.bundle_prefix(bundle)}: This bundle can\'t be completed despite several backups and retries'
997                 )
998             else:
999                 logger.error(f'{self.bundle_prefix(bundle)}: At least it\'s only a backup; better luck with the others.')
1000             return None
1001         else:
1002             logger.warning(
1003                 f'>>> Emergency rescheduling {self.bundle_prefix(bundle)} because of unexected errors (wtf?!) <<<'
1004             )
1005             return self.launch(bundle, avoid_last_machine)
1006
1007     @overrides
1008     def submit(self,
1009                function: Callable,
1010                *args,
1011                **kwargs) -> fut.Future:
1012         pickle = make_cloud_pickle(function, *args, **kwargs)
1013         bundle = self.create_original_bundle(pickle, function.__name__)
1014         self.total_bundles_submitted += 1
1015         return self._helper_executor.submit(self.launch, bundle)
1016
1017     @overrides
1018     def shutdown(self, wait=True) -> None:
1019         self._helper_executor.shutdown(wait)
1020         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1021         print(self.histogram)
1022
1023
1024 @singleton
1025 class DefaultExecutors(object):
1026     def __init__(self):
1027         self.thread_executor: Optional[ThreadExecutor] = None
1028         self.process_executor: Optional[ProcessExecutor] = None
1029         self.remote_executor: Optional[RemoteExecutor] = None
1030
1031     def ping(self, host) -> bool:
1032         logger.debug(f'RUN> ping -c 1 {host}')
1033         try:
1034             x = cmd_with_timeout(
1035                 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1036                 timeout_seconds=1.0
1037             )
1038             return x == 0
1039         except Exception:
1040             return False
1041
1042     def thread_pool(self) -> ThreadExecutor:
1043         if self.thread_executor is None:
1044             self.thread_executor = ThreadExecutor()
1045         return self.thread_executor
1046
1047     def process_pool(self) -> ProcessExecutor:
1048         if self.process_executor is None:
1049             self.process_executor = ProcessExecutor()
1050         return self.process_executor
1051
1052     def remote_pool(self) -> RemoteExecutor:
1053         if self.remote_executor is None:
1054             logger.info('Looking for some helper machines...')
1055             pool: List[RemoteWorkerRecord] = []
1056             if self.ping('cheetah.house'):
1057                 logger.info('Found cheetah.house')
1058                 pool.append(
1059                     RemoteWorkerRecord(
1060                         username = 'scott',
1061                         machine = 'cheetah.house',
1062                         weight = 12,
1063                         count = 4,
1064                     ),
1065                 )
1066             if self.ping('video.house'):
1067                 logger.info('Found video.house')
1068                 pool.append(
1069                     RemoteWorkerRecord(
1070                         username = 'scott',
1071                         machine = 'video.house',
1072                         weight = 1,
1073                         count = 4,
1074                     ),
1075                 )
1076             if self.ping('wannabe.house'):
1077                 logger.info('Found wannabe.house')
1078                 pool.append(
1079                     RemoteWorkerRecord(
1080                         username = 'scott',
1081                         machine = 'wannabe.house',
1082                         weight = 2,
1083                         count = 4,
1084                     ),
1085                 )
1086             if self.ping('meerkat.cabin'):
1087                 logger.info('Found meerkat.cabin')
1088                 pool.append(
1089                     RemoteWorkerRecord(
1090                         username = 'scott',
1091                         machine = 'meerkat.cabin',
1092                         weight = 5,
1093                         count = 2,
1094                     ),
1095                 )
1096             if self.ping('backup.house'):
1097                 logger.info('Found backup.house')
1098                 pool.append(
1099                     RemoteWorkerRecord(
1100                         username = 'scott',
1101                         machine = 'backup.house',
1102                         weight = 1,
1103                         count = 4,
1104                     ),
1105                 )
1106             if self.ping('kiosk.house'):
1107                 logger.info('Found kiosk.house')
1108                 pool.append(
1109                     RemoteWorkerRecord(
1110                         username = 'pi',
1111                         machine = 'kiosk.house',
1112                         weight = 1,
1113                         count = 2,
1114                     ),
1115                 )
1116             if self.ping('puma.cabin'):
1117                 logger.info('Found puma.cabin')
1118                 pool.append(
1119                     RemoteWorkerRecord(
1120                         username = 'scott',
1121                         machine = 'puma.cabin',
1122                         weight = 12,
1123                         count = 4,
1124                     ),
1125                 )
1126
1127             # The controller machine has a lot to do; go easy on it.
1128             for record in pool:
1129                 if record.machine == platform.node() and record.count > 1:
1130                     logger.info(f'Reducing workload for {record.machine}.')
1131                     record.count = 1
1132
1133             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1134             policy.register_worker_pool(pool)
1135             self.remote_executor = RemoteExecutor(pool, policy)
1136         return self.remote_executor