Making remote training work better.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18
19 import cloudpickle  # type: ignore
20 from overrides import overrides
21
22 from ansi import bg, fg, underline, reset
23 import argparse_utils
24 import config
25 from exec_utils import run_silently, cmd_in_background
26 from decorator_utils import singleton
27 import histogram as hist
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Executors ({__file__})",
33     "Args related to processing executors."
34 )
35 parser.add_argument(
36     '--executors_threadpool_size',
37     type=int,
38     metavar='#THREADS',
39     help='Number of threads in the default threadpool, leave unset for default',
40     default=None
41 )
42 parser.add_argument(
43     '--executors_processpool_size',
44     type=int,
45     metavar='#PROCESSES',
46     help='Number of processes in the default processpool, leave unset for default',
47     default=None,
48 )
49 parser.add_argument(
50     '--executors_schedule_remote_backups',
51     default=True,
52     action=argparse_utils.ActionNoYes,
53     help='Should we schedule duplicative backup work if a remote bundle is slow',
54 )
55 parser.add_argument(
56     '--executors_max_bundle_failures',
57     type=int,
58     default=3,
59     metavar='#FAILURES',
60     help='Maximum number of failures before giving up on a bundle',
61 )
62
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65
66
67 def make_cloud_pickle(fun, *args, **kwargs):
68     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
69     return cloudpickle.dumps((fun, args, kwargs))
70
71
72 class BaseExecutor(ABC):
73     def __init__(self, *, title=''):
74         self.title = title
75         self.task_count = 0
76         self.histogram = hist.SimpleHistogram(
77             hist.SimpleHistogram.n_evenly_spaced_buckets(
78                 int(0), int(500), 50
79             )
80         )
81
82     @abstractmethod
83     def submit(self,
84                function: Callable,
85                *args,
86                **kwargs) -> fut.Future:
87         pass
88
89     @abstractmethod
90     def shutdown(self,
91                  wait: bool = True) -> None:
92         pass
93
94     def adjust_task_count(self, delta: int) -> None:
95         self.task_count += delta
96         logger.debug(f'Executor current task count is {self.task_count}')
97
98
99 class ThreadExecutor(BaseExecutor):
100     def __init__(self,
101                  max_workers: Optional[int] = None):
102         super().__init__()
103         workers = None
104         if max_workers is not None:
105             workers = max_workers
106         elif 'executors_threadpool_size' in config.config:
107             workers = config.config['executors_threadpool_size']
108         logger.debug(f'Creating threadpool executor with {workers} workers')
109         self._thread_pool_executor = fut.ThreadPoolExecutor(
110             max_workers=workers,
111             thread_name_prefix="thread_executor_helper"
112         )
113
114     def run_local_bundle(self, fun, *args, **kwargs):
115         logger.debug(f"Running local bundle at {fun.__name__}")
116         start = time.time()
117         result = fun(*args, **kwargs)
118         end = time.time()
119         self.adjust_task_count(-1)
120         duration = end - start
121         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
122         self.histogram.add_item(duration)
123         return result
124
125     @overrides
126     def submit(self,
127                function: Callable,
128                *args,
129                **kwargs) -> fut.Future:
130         self.adjust_task_count(+1)
131         newargs = []
132         newargs.append(function)
133         for arg in args:
134             newargs.append(arg)
135         return self._thread_pool_executor.submit(
136             self.run_local_bundle,
137             *newargs,
138             **kwargs)
139
140     @overrides
141     def shutdown(self,
142                  wait = True) -> None:
143         logger.debug(f'Shutting down threadpool executor {self.title}')
144         print(self.histogram)
145         self._thread_pool_executor.shutdown(wait)
146
147
148 class ProcessExecutor(BaseExecutor):
149     def __init__(self,
150                  max_workers=None):
151         super().__init__()
152         workers = None
153         if max_workers is not None:
154             workers = max_workers
155         elif 'executors_processpool_size' in config.config:
156             workers = config.config['executors_processpool_size']
157         logger.debug(f'Creating processpool executor with {workers} workers.')
158         self._process_executor = fut.ProcessPoolExecutor(
159             max_workers=workers,
160         )
161
162     def run_cloud_pickle(self, pickle):
163         fun, args, kwargs = cloudpickle.loads(pickle)
164         logger.debug(f"Running pickled bundle at {fun.__name__}")
165         result = fun(*args, **kwargs)
166         self.adjust_task_count(-1)
167         return result
168
169     @overrides
170     def submit(self,
171                function: Callable,
172                *args,
173                **kwargs) -> fut.Future:
174         start = time.time()
175         self.adjust_task_count(+1)
176         pickle = make_cloud_pickle(function, *args, **kwargs)
177         result = self._process_executor.submit(
178             self.run_cloud_pickle,
179             pickle
180         )
181         result.add_done_callback(
182             lambda _: self.histogram.add_item(
183                 time.time() - start
184             )
185         )
186         return result
187
188     @overrides
189     def shutdown(self, wait=True) -> None:
190         logger.debug(f'Shutting down processpool executor {self.title}')
191         self._process_executor.shutdown(wait)
192         print(self.histogram)
193
194     def __getstate__(self):
195         state = self.__dict__.copy()
196         state['_process_executor'] = None
197         return state
198
199
200 class RemoteExecutorException(Exception):
201     """Thrown when a bundle cannot be executed despite several retries."""
202     pass
203
204
205 @dataclass
206 class RemoteWorkerRecord:
207     username: str
208     machine: str
209     weight: int
210     count: int
211
212     def __hash__(self):
213         return hash((self.username, self.machine))
214
215     def __repr__(self):
216         return f'{self.username}@{self.machine}'
217
218
219 @dataclass
220 class BundleDetails:
221     pickled_code: bytes
222     uuid: str
223     fname: str
224     worker: Optional[RemoteWorkerRecord]
225     username: Optional[str]
226     machine: Optional[str]
227     hostname: str
228     code_file: str
229     result_file: str
230     pid: int
231     start_ts: float
232     end_ts: float
233     too_slow: bool
234     super_slow: bool
235     src_bundle: BundleDetails
236     is_cancelled: threading.Event
237     was_cancelled: bool
238     backup_bundles: Optional[List[BundleDetails]]
239     failure_count: int
240
241
242 class RemoteExecutorStatus:
243     def __init__(self, total_worker_count: int) -> None:
244         self.worker_count = total_worker_count
245         self.known_workers: Set[RemoteWorkerRecord] = set()
246         self.start_per_bundle: Dict[str, float] = defaultdict(float)
247         self.end_per_bundle: Dict[str, float] = defaultdict(float)
248         self.finished_bundle_timings_per_worker: Dict[
249             RemoteWorkerRecord,
250             List[float]
251         ] = {}
252         self.in_flight_bundles_by_worker: Dict[
253             RemoteWorkerRecord,
254             Set[str]
255         ] = {}
256         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
257         self.finished_bundle_timings: List[float] = []
258         self.last_periodic_dump: Optional[float] = None
259         self.total_bundles_submitted = 0
260
261         # Protects reads and modification using self.  Also used
262         # as a memory fence for modifications to bundle.
263         self.lock = threading.Lock()
264
265     def record_acquire_worker(
266             self,
267             worker: RemoteWorkerRecord,
268             uuid: str
269     ) -> None:
270         with self.lock:
271             self.record_acquire_worker_already_locked(
272                 worker,
273                 uuid
274             )
275
276     def record_acquire_worker_already_locked(
277             self,
278             worker: RemoteWorkerRecord,
279             uuid: str
280     ) -> None:
281         assert self.lock.locked()
282         self.known_workers.add(worker)
283         self.start_per_bundle[uuid] = None
284         x = self.in_flight_bundles_by_worker.get(worker, set())
285         x.add(uuid)
286         self.in_flight_bundles_by_worker[worker] = x
287
288     def record_bundle_details(
289             self,
290             details: BundleDetails) -> None:
291         with self.lock:
292             self.record_bundle_details_already_locked(details)
293
294     def record_bundle_details_already_locked(
295             self,
296             details: BundleDetails) -> None:
297         assert self.lock.locked()
298         self.bundle_details_by_uuid[details.uuid] = details
299
300     def record_release_worker(
301             self,
302             worker: RemoteWorkerRecord,
303             uuid: str,
304             was_cancelled: bool,
305     ) -> None:
306         with self.lock:
307             self.record_release_worker_already_locked(
308                 worker, uuid, was_cancelled
309             )
310
311     def record_release_worker_already_locked(
312             self,
313             worker: RemoteWorkerRecord,
314             uuid: str,
315             was_cancelled: bool,
316     ) -> None:
317         assert self.lock.locked()
318         ts = time.time()
319         self.end_per_bundle[uuid] = ts
320         self.in_flight_bundles_by_worker[worker].remove(uuid)
321         if not was_cancelled:
322             bundle_latency = ts - self.start_per_bundle[uuid]
323             x = self.finished_bundle_timings_per_worker.get(worker, list())
324             x.append(bundle_latency)
325             self.finished_bundle_timings_per_worker[worker] = x
326             self.finished_bundle_timings.append(bundle_latency)
327
328     def record_processing_began(self, uuid: str):
329         with self.lock:
330             self.start_per_bundle[uuid] = time.time()
331
332     def total_in_flight(self) -> int:
333         assert self.lock.locked()
334         total_in_flight = 0
335         for worker in self.known_workers:
336             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
337         return total_in_flight
338
339     def total_idle(self) -> int:
340         assert self.lock.locked()
341         return self.worker_count - self.total_in_flight()
342
343     def __repr__(self):
344         assert self.lock.locked()
345         ts = time.time()
346         total_finished = len(self.finished_bundle_timings)
347         total_in_flight = self.total_in_flight()
348         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
349         qall = None
350         if len(self.finished_bundle_timings) > 1:
351             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
352             ret += (
353                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
354                 f'✅={total_finished}/{self.total_bundles_submitted}, '
355                 f'💻n={total_in_flight}/{self.worker_count}\n'
356             )
357         else:
358             ret += (
359                 f' ✅={total_finished}/{self.total_bundles_submitted}, '
360                 f'💻n={total_in_flight}/{self.worker_count}\n'
361             )
362
363         for worker in self.known_workers:
364             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
365             timings = self.finished_bundle_timings_per_worker.get(worker, [])
366             count = len(timings)
367             qworker = None
368             if count > 1:
369                 qworker = numpy.quantile(timings, [0.5, 0.95])
370                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
371             else:
372                 ret += '\n'
373             if count > 0:
374                 ret += f'    ...finished {count} total bundle(s) so far\n'
375             in_flight = len(self.in_flight_bundles_by_worker[worker])
376             if in_flight > 0:
377                 ret += f'    ...{in_flight} bundles currently in flight:\n'
378                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
379                     details = self.bundle_details_by_uuid.get(
380                         bundle_uuid,
381                         None
382                     )
383                     pid = str(details.pid) if details is not None else "TBD"
384                     if self.start_per_bundle[bundle_uuid] is not None:
385                         sec = ts - self.start_per_bundle[bundle_uuid]
386                         ret += f'       (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
387                     else:
388                         ret += f'       {bundle_uuid} setting up / copying data...'
389                         sec = 0.0
390
391                     if qworker is not None:
392                         if sec > qworker[1]:
393                             ret += f'{bg("red")}>💻p95{reset()} '
394                         elif sec > qworker[0]:
395                             ret += f'{fg("red")}>💻p50{reset()} '
396                     if qall is not None:
397                         if sec > qall[1] * 1.5:
398                             ret += f'{bg("red")}!!!{reset()}'
399                             if details is not None:
400                                 logger.debug(f'Flagging {details.uuid} for another backup')
401                                 details.super_slow = True
402                         elif sec > qall[1]:
403                             ret += f'{bg("red")}>∀p95{reset()} '
404                             if details is not None:
405                                 logger.debug(f'Flagging {details.uuid} for a backup')
406                                 details.too_slow = True
407                         elif sec > qall[0]:
408                             ret += f'{fg("red")}>∀p50{reset()}'
409                     ret += '\n'
410         return ret
411
412     def periodic_dump(self, total_bundles_submitted: int) -> None:
413         assert self.lock.locked()
414         self.total_bundles_submitted = total_bundles_submitted
415         ts = time.time()
416         if (
417                 self.last_periodic_dump is None
418                 or ts - self.last_periodic_dump > 5.0
419         ):
420             print(self)
421             self.last_periodic_dump = ts
422
423
424 class RemoteWorkerSelectionPolicy(ABC):
425     def register_worker_pool(self, workers):
426         random.seed()
427         self.workers = workers
428
429     @abstractmethod
430     def is_worker_available(self) -> bool:
431         pass
432
433     @abstractmethod
434     def acquire_worker(
435             self,
436             machine_to_avoid = None
437     ) -> Optional[RemoteWorkerRecord]:
438         pass
439
440
441 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
442     def is_worker_available(self) -> bool:
443         for worker in self.workers:
444             if worker.count > 0:
445                 return True
446         return False
447
448     def acquire_worker(
449             self,
450             machine_to_avoid = None
451     ) -> Optional[RemoteWorkerRecord]:
452         grabbag = []
453         for worker in self.workers:
454             for x in range(0, worker.count):
455                 for y in range(0, worker.weight):
456                     grabbag.append(worker)
457
458         for _ in range(0, 5):
459             random.shuffle(grabbag)
460             worker = grabbag[0]
461             if worker.machine != machine_to_avoid or _ > 2:
462                 if worker.count > 0:
463                     worker.count -= 1
464                     logger.debug(f'Selected worker {worker}')
465                     return worker
466         logger.warning("Couldn't find a worker; go fish.")
467         return None
468
469
470 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
471     def __init__(self) -> None:
472         self.index = 0
473
474     def is_worker_available(self) -> bool:
475         for worker in self.workers:
476             if worker.count > 0:
477                 return True
478         return False
479
480     def acquire_worker(
481             self,
482             machine_to_avoid: str = None
483     ) -> Optional[RemoteWorkerRecord]:
484         x = self.index
485         while True:
486             worker = self.workers[x]
487             if worker.count > 0:
488                 worker.count -= 1
489                 x += 1
490                 if x >= len(self.workers):
491                     x = 0
492                 self.index = x
493                 logger.debug(f'Selected worker {worker}')
494                 return worker
495             x += 1
496             if x >= len(self.workers):
497                 x = 0
498             if x == self.index:
499                 logger.warning("Couldn't find a worker; go fish.")
500                 return None
501
502
503 class RemoteExecutor(BaseExecutor):
504     def __init__(self,
505                  workers: List[RemoteWorkerRecord],
506                  policy: RemoteWorkerSelectionPolicy) -> None:
507         super().__init__()
508         self.workers = workers
509         self.policy = policy
510         self.worker_count = 0
511         for worker in self.workers:
512             self.worker_count += worker.count
513         if self.worker_count <= 0:
514             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
515             logger.critical(msg)
516             raise RemoteExecutorException(msg)
517         self.policy.register_worker_pool(self.workers)
518         self.cv = threading.Condition()
519         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
520         self._helper_executor = fut.ThreadPoolExecutor(
521             thread_name_prefix="remote_executor_helper",
522             max_workers=self.worker_count,
523         )
524         self.status = RemoteExecutorStatus(self.worker_count)
525         self.total_bundles_submitted = 0
526
527     def is_worker_available(self) -> bool:
528         return self.policy.is_worker_available()
529
530     def acquire_worker(
531             self,
532             machine_to_avoid: str = None
533     ) -> Optional[RemoteWorkerRecord]:
534         return self.policy.acquire_worker(machine_to_avoid)
535
536     def find_available_worker_or_block(
537             self,
538             machine_to_avoid: str = None
539     ) -> RemoteWorkerRecord:
540         with self.cv:
541             while not self.is_worker_available():
542                 self.cv.wait()
543             worker = self.acquire_worker(machine_to_avoid)
544             if worker is not None:
545                 return worker
546         msg = "We should never reach this point in the code"
547         logger.critical(msg)
548         raise Exception(msg)
549
550     def release_worker(self, worker: RemoteWorkerRecord) -> None:
551         logger.debug(f'Released worker {worker}')
552         with self.cv:
553             worker.count += 1
554             self.cv.notify()
555
556     def heartbeat(self) -> None:
557         with self.status.lock:
558             # Regular progress report
559             self.status.periodic_dump(self.total_bundles_submitted)
560
561             # Look for bundles to reschedule.
562             num_done = len(self.status.finished_bundle_timings)
563             if num_done > 7 or (num_done > 5 and self.is_worker_available()):
564                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
565                     for uuid in bundle_uuids:
566                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
567                         if (
568                                 bundle is not None and
569                                 bundle.too_slow and
570                                 bundle.src_bundle is None and
571                                 config.config['executors_schedule_remote_backups']
572                         ):
573                             self.consider_backup_for_bundle(bundle)
574
575     def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
576         assert self.status.lock.locked()
577         if (
578             bundle.too_slow
579             and len(bundle.backup_bundles) == 0       # one backup per
580         ):
581             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
582             logger.debug(msg)
583             self.schedule_backup_for_bundle(bundle)
584             return
585         elif (
586                 bundle.super_slow
587                 and len(bundle.backup_bundles) < 2    # two backups in dire situations
588                 and self.status.total_idle() > 4
589         ):
590             msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
591             logger.debug(msg)
592             self.schedule_backup_for_bundle(bundle)
593             return
594
595     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
596         with self.status.lock:
597             if bundle.is_cancelled.wait(timeout=0.0):
598                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
599                 bundle.was_cancelled = True
600                 return True
601         return False
602
603     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
604         """Find a worker for bundle or block until one is available."""
605         self.adjust_task_count(+1)
606         uuid = bundle.uuid
607         hostname = bundle.hostname
608         avoid_machine = override_avoid_machine
609         is_original = bundle.src_bundle is None
610
611         # Try not to schedule a backup on the same host as the original.
612         if avoid_machine is None and bundle.src_bundle is not None:
613             avoid_machine = bundle.src_bundle.machine
614         worker = None
615         while worker is None:
616             worker = self.find_available_worker_or_block(avoid_machine)
617
618         # Ok, found a worker.
619         bundle.worker = worker
620         machine = bundle.machine = worker.machine
621         username = bundle.username = worker.username
622         fname = bundle.fname
623         self.status.record_acquire_worker(worker, uuid)
624         logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...')
625
626         # Before we do any work, make sure the bundle is still viable.
627         if self.check_if_cancelled(bundle):
628             try:
629                 return self.post_launch_work(bundle)
630             except Exception as e:
631                 logger.exception(e)
632                 logger.error(
633                     f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!'
634                 )
635                 assert bundle.worker is not None
636                 self.status.record_release_worker(
637                     bundle.worker,
638                     bundle.uuid,
639                     True,
640                 )
641                 self.release_worker(bundle.worker)
642                 self.adjust_task_count(-1)
643                 if is_original:
644                     # Weird.  We are the original owner of this
645                     # bundle.  For it to have been cancelled, a backup
646                     # must have already started and completed before
647                     # we even for started.  Moreover, the backup says
648                     # it is done but we can't find the results it
649                     # should have copied over.  Reschedule the whole
650                     # thing.
651                     return self.emergency_retry_nasty_bundle(bundle)
652                 else:
653                     # Expected(?).  We're a backup and our bundle is
654                     # cancelled before we even got started.  Something
655                     # went bad in post_launch_work (I acutually don't
656                     # see what?) but probably not worth worrying
657                     # about.
658                     return None
659
660         # Send input code / data to worker machine if it's not local.
661         if hostname not in machine:
662             try:
663                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
664                 start_ts = time.time()
665                 logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.")
666                 run_silently(cmd)
667                 xfer_latency = time.time() - start_ts
668                 logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.")
669             except Exception as e:
670                 logger.exception(e)
671                 logger.error(
672                     f'{uuid}/{fname}: failed to send instructions to worker machine?!?'
673                 )
674                 assert bundle.worker is not None
675                 self.status.record_release_worker(
676                     bundle.worker,
677                     bundle.uuid,
678                     True,
679                 )
680                 self.release_worker(bundle.worker)
681                 self.adjust_task_count(-1)
682                 if is_original:
683                     # Weird.  We tried to copy the code to the worker and it failed...
684                     # And we're the original bundle.  We have to retry.
685                     return self.emergency_retry_nasty_bundle(bundle)
686                 else:
687                     # This is actually expected; we're a backup.
688                     # There's a race condition where someone else
689                     # already finished the work and removed the source
690                     # code file before we could copy it.  No biggie.
691                     return None
692
693         # Kick off the work.  Note that if this fails we let
694         # wait_for_process deal with it.
695         self.status.record_processing_began(uuid)
696         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
697                f'"source py39-venv/bin/activate &&'
698                f' /home/scott/lib/python_modules/remote_worker.py'
699                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
700         logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...')
701         p = cmd_in_background(cmd, silent=True)
702         bundle.pid = pid = p.pid
703         logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.')
704         return self.wait_for_process(p, bundle, 0)
705
706     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
707         uuid = bundle.uuid
708         machine = bundle.machine
709         fname = bundle.fname
710         pid = p.pid
711         if depth > 3:
712             logger.error(
713                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
714             )
715             p.terminate()
716             self.status.record_release_worker(
717                 bundle.worker,
718                 bundle.uuid,
719                 True,
720             )
721             self.release_worker(bundle.worker)
722             self.adjust_task_count(-1)
723             return self.emergency_retry_nasty_bundle(bundle)
724
725         # Spin until either the ssh job we scheduled finishes the
726         # bundle or some backup worker signals that they finished it
727         # before we could.
728         while True:
729             try:
730                 p.wait(timeout=0.25)
731             except subprocess.TimeoutExpired:
732                 self.heartbeat()
733                 if self.check_if_cancelled(bundle):
734                     logger.info(
735                         f'{uuid}/{fname}: another worker finished bundle, checking it out...'
736                     )
737                     break
738             else:
739                 logger.info(
740                     f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..."
741                 )
742                 p = None
743                 break
744
745         # If we get here we believe the bundle is done; either the ssh
746         # subprocess finished (hopefully successfully) or we noticed
747         # that some other worker seems to have completed the bundle
748         # and we're bailing out.
749         try:
750             ret = self.post_launch_work(bundle)
751             if ret is not None and p is not None:
752                 p.terminate()
753             return ret
754
755         # Something went wrong; e.g. we could not copy the results
756         # back, cleanup after ourselves on the remote machine, or
757         # unpickle the results we got from the remove machine.  If we
758         # still have an active ssh subprocess, keep waiting on it.
759         # Otherwise, time for an emergency reschedule.
760         except Exception as e:
761             logger.exception(e)
762             logger.error(f'{uuid}/{fname}: Something unexpected just happened...')
763             if p is not None:
764                 logger.warning(
765                     f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
766                 )
767                 return self.wait_for_process(p, bundle, depth + 1)
768             else:
769                 self.status.record_release_worker(
770                     bundle.worker,
771                     bundle.uuid,
772                     True,
773                 )
774                 self.release_worker(bundle.worker)
775                 self.adjust_task_count(-1)
776                 return self.emergency_retry_nasty_bundle(bundle)
777
778     def post_launch_work(self, bundle: BundleDetails) -> Any:
779         with self.status.lock:
780             is_original = bundle.src_bundle is None
781             was_cancelled = bundle.was_cancelled
782             username = bundle.username
783             machine = bundle.machine
784             result_file = bundle.result_file
785             code_file = bundle.code_file
786             fname = bundle.fname
787             uuid = bundle.uuid
788
789             # Whether original or backup, if we finished first we must
790             # fetch the results if the computation happened on a
791             # remote machine.
792             bundle.end_ts = time.time()
793             if not was_cancelled:
794                 assert bundle.machine is not None
795                 if bundle.hostname not in bundle.machine:
796                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
797                     logger.info(
798                         f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}"
799                     )
800
801                     # If either of these throw they are handled in
802                     # wait_for_process.
803                     run_silently(cmd)
804                     run_silently(f'{SSH} {username}@{machine}'
805                                  f' "/bin/rm -f {code_file} {result_file}"')
806                 dur = bundle.end_ts - bundle.start_ts
807                 self.histogram.add_item(dur)
808
809         # Only the original worker should unpickle the file contents
810         # though since it's the only one whose result matters.  The
811         # original is also the only job that may delete result_file
812         # from disk.  Note that the original may have been cancelled
813         # if one of the backups finished first; it still must read the
814         # result from disk.
815         if is_original:
816             logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.")
817             try:
818                 with open(f'{result_file}', 'rb') as rb:
819                     serialized = rb.read()
820                 result = cloudpickle.loads(serialized)
821             except Exception as e:
822                 msg = f'Failed to load {result_file}, this is bad news.'
823                 logger.critical(msg)
824                 self.status.record_release_worker(
825                     bundle.worker,
826                     bundle.uuid,
827                     True,
828                 )
829                 self.release_worker(bundle.worker)
830
831                 # Re-raise the exception; the code in wait_for_process may
832                 # decide to emergency_retry_nasty_bundle here.
833                 raise Exception(e)
834
835             logger.debug(
836                 f'Removing local (master) {code_file} and {result_file}.'
837             )
838             os.remove(f'{result_file}')
839             os.remove(f'{code_file}')
840
841             # Notify any backups that the original is done so they
842             # should stop ASAP.  Do this whether or not we
843             # finished first since there could be more than one
844             # backup.
845             if bundle.backup_bundles is not None:
846                 for backup in bundle.backup_bundles:
847                     logger.debug(
848                         f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled'
849                     )
850                     backup.is_cancelled.set()
851
852         # This is a backup job and, by now, we have already fetched
853         # the bundle results.
854         else:
855             # Backup results don't matter, they just need to leave the
856             # result file in the right place for their originals to
857             # read/unpickle later.
858             result = None
859
860             # Tell the original to stop if we finished first.
861             if not was_cancelled:
862                 logger.debug(
863                     f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
864                 )
865                 bundle.src_bundle.is_cancelled.set()
866
867         assert bundle.worker is not None
868         self.status.record_release_worker(
869             bundle.worker,
870             bundle.uuid,
871             was_cancelled,
872         )
873         self.release_worker(bundle.worker)
874         self.adjust_task_count(-1)
875         return result
876
877     def create_original_bundle(self, pickle, fname: str):
878         from string_utils import generate_uuid
879         uuid = generate_uuid(as_hex=True)
880         code_file = f'/tmp/{uuid}.code.bin'
881         result_file = f'/tmp/{uuid}.result.bin'
882
883         logger.debug(f'Writing pickled code to {code_file}')
884         with open(f'{code_file}', 'wb') as wb:
885             wb.write(pickle)
886
887         bundle = BundleDetails(
888             pickled_code = pickle,
889             uuid = uuid,
890             fname = fname,
891             worker = None,
892             username = None,
893             machine = None,
894             hostname = platform.node(),
895             code_file = code_file,
896             result_file = result_file,
897             pid = 0,
898             start_ts = time.time(),
899             end_ts = 0.0,
900             too_slow = False,
901             super_slow = False,
902             src_bundle = None,
903             is_cancelled = threading.Event(),
904             was_cancelled = False,
905             backup_bundles = [],
906             failure_count = 0,
907         )
908         self.status.record_bundle_details(bundle)
909         logger.debug(f'{uuid}/{fname}: Created original bundle')
910         return bundle
911
912     def create_backup_bundle(self, src_bundle: BundleDetails):
913         assert src_bundle.backup_bundles is not None
914         n = len(src_bundle.backup_bundles)
915         uuid = src_bundle.uuid + f'_backup#{n}'
916
917         backup_bundle = BundleDetails(
918             pickled_code = src_bundle.pickled_code,
919             uuid = uuid,
920             fname = src_bundle.fname,
921             worker = None,
922             username = None,
923             machine = None,
924             hostname = src_bundle.hostname,
925             code_file = src_bundle.code_file,
926             result_file = src_bundle.result_file,
927             pid = 0,
928             start_ts = time.time(),
929             end_ts = 0.0,
930             too_slow = False,
931             super_slow = False,
932             src_bundle = src_bundle,
933             is_cancelled = threading.Event(),
934             was_cancelled = False,
935             backup_bundles = None,    # backup backups not allowed
936             failure_count = 0,
937         )
938         src_bundle.backup_bundles.append(backup_bundle)
939         self.status.record_bundle_details_already_locked(backup_bundle)
940         logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle')
941         return backup_bundle
942
943     def schedule_backup_for_bundle(self,
944                                    src_bundle: BundleDetails):
945         assert self.status.lock.locked()
946         backup_bundle = self.create_backup_bundle(src_bundle)
947         logger.debug(
948             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
949         )
950         self._helper_executor.submit(self.launch, backup_bundle)
951
952         # Results from backups don't matter; if they finish first
953         # they will move the result_file to this machine and let
954         # the original pick them up and unpickle them.
955
956     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
957         uuid = bundle.uuid
958         is_original = bundle.src_bundle is None
959         bundle.worker = None
960         avoid_last_machine = bundle.machine
961         bundle.machine = None
962         bundle.username = None
963         bundle.failure_count += 1
964         if is_original:
965             retry_limit = 3
966         else:
967             retry_limit = 2
968
969         if bundle.failure_count > retry_limit:
970             logger.error(
971                 f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.'
972             )
973             if is_original:
974                 raise RemoteExecutorException(
975                     f'{uuid}: This bundle can\'t be completed despite several backups and retries'
976                 )
977             else:
978                 logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')
979             return None
980         else:
981             logger.warning(
982                 f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<'
983             )
984             return self.launch(bundle, avoid_last_machine)
985
986     @overrides
987     def submit(self,
988                function: Callable,
989                *args,
990                **kwargs) -> fut.Future:
991         pickle = make_cloud_pickle(function, *args, **kwargs)
992         bundle = self.create_original_bundle(pickle, function.__name__)
993         self.total_bundles_submitted += 1
994         return self._helper_executor.submit(self.launch, bundle)
995
996     @overrides
997     def shutdown(self, wait=True) -> None:
998         self._helper_executor.shutdown(wait)
999         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1000         print(self.histogram)
1001
1002
1003 @singleton
1004 class DefaultExecutors(object):
1005     def __init__(self):
1006         self.thread_executor: Optional[ThreadExecutor] = None
1007         self.process_executor: Optional[ProcessExecutor] = None
1008         self.remote_executor: Optional[RemoteExecutor] = None
1009
1010     def ping(self, host) -> bool:
1011         logger.debug(f'RUN> ping -c 1 {host}')
1012         command = ['ping', '-c', '1', host]
1013         return subprocess.call(
1014             command,
1015             stdout=subprocess.DEVNULL,
1016             stderr=subprocess.DEVNULL,
1017         ) == 0
1018
1019     def thread_pool(self) -> ThreadExecutor:
1020         if self.thread_executor is None:
1021             self.thread_executor = ThreadExecutor()
1022         return self.thread_executor
1023
1024     def process_pool(self) -> ProcessExecutor:
1025         if self.process_executor is None:
1026             self.process_executor = ProcessExecutor()
1027         return self.process_executor
1028
1029     def remote_pool(self) -> RemoteExecutor:
1030         if self.remote_executor is None:
1031             logger.info('Looking for some helper machines...')
1032             pool: List[RemoteWorkerRecord] = []
1033             if self.ping('cheetah.house'):
1034                 logger.info('Found cheetah.house')
1035                 pool.append(
1036                     RemoteWorkerRecord(
1037                         username = 'scott',
1038                         machine = 'cheetah.house',
1039                         weight = 12,
1040                         count = 4,
1041                     ),
1042                 )
1043             if self.ping('video.house'):
1044                 logger.info('Found video.house')
1045                 pool.append(
1046                     RemoteWorkerRecord(
1047                         username = 'scott',
1048                         machine = 'video.house',
1049                         weight = 1,
1050                         count = 4,
1051                     ),
1052                 )
1053             if self.ping('wannabe.house'):
1054                 logger.info('Found wannabe.house')
1055                 pool.append(
1056                     RemoteWorkerRecord(
1057                         username = 'scott',
1058                         machine = 'wannabe.house',
1059                         weight = 2,
1060                         count = 4,
1061                     ),
1062                 )
1063             if self.ping('meerkat.cabin'):
1064                 logger.info('Found meerkat.cabin')
1065                 pool.append(
1066                     RemoteWorkerRecord(
1067                         username = 'scott',
1068                         machine = 'meerkat.cabin',
1069                         weight = 5,
1070                         count = 2,
1071                     ),
1072                 )
1073             if self.ping('backup.house'):
1074                 logger.info('Found backup.house')
1075                 pool.append(
1076                     RemoteWorkerRecord(
1077                         username = 'scott',
1078                         machine = 'backup.house',
1079                         weight = 1,
1080                         count = 4,
1081                     ),
1082                 )
1083             if self.ping('kiosk.house'):
1084                 logger.info('Found kiosk.house')
1085                 pool.append(
1086                     RemoteWorkerRecord(
1087                         username = 'pi',
1088                         machine = 'kiosk.house',
1089                         weight = 1,
1090                         count = 2,
1091                     ),
1092                 )
1093             if self.ping('puma.cabin'):
1094                 logger.info('Found puma.cabin')
1095                 pool.append(
1096                     RemoteWorkerRecord(
1097                         username = 'scott',
1098                         machine = 'puma.cabin',
1099                         weight = 12,
1100                         count = 4,
1101                     ),
1102                 )
1103
1104             # The controller machine has a lot to do; go easy on it.
1105             for record in pool:
1106                 if record.machine == platform.node() and record.count > 1:
1107                     logger.info(f'Reducing workload for {record.machine}.')
1108                     record.count = 1
1109
1110             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1111             policy.register_worker_pool(pool)
1112             self.remote_executor = RemoteExecutor(pool, policy)
1113         return self.remote_executor