Report overall runtime in periodic status dumps.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.task_count = 0
78         self.histogram = hist.SimpleHistogram(
79             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
80         )
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, wait: bool = True) -> None:
88         pass
89
90     def adjust_task_count(self, delta: int) -> None:
91         self.task_count += delta
92         logger.debug(f'Executor current task count is {self.task_count}')
93
94
95 class ThreadExecutor(BaseExecutor):
96     def __init__(self, max_workers: Optional[int] = None):
97         super().__init__()
98         workers = None
99         if max_workers is not None:
100             workers = max_workers
101         elif 'executors_threadpool_size' in config.config:
102             workers = config.config['executors_threadpool_size']
103         logger.debug(f'Creating threadpool executor with {workers} workers')
104         self._thread_pool_executor = fut.ThreadPoolExecutor(
105             max_workers=workers, thread_name_prefix="thread_executor_helper"
106         )
107
108     def run_local_bundle(self, fun, *args, **kwargs):
109         logger.debug(f"Running local bundle at {fun.__name__}")
110         start = time.time()
111         result = fun(*args, **kwargs)
112         end = time.time()
113         self.adjust_task_count(-1)
114         duration = end - start
115         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
116         self.histogram.add_item(duration)
117         return result
118
119     @overrides
120     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
121         self.adjust_task_count(+1)
122         newargs = []
123         newargs.append(function)
124         for arg in args:
125             newargs.append(arg)
126         return self._thread_pool_executor.submit(
127             self.run_local_bundle, *newargs, **kwargs
128         )
129
130     @overrides
131     def shutdown(self, wait=True) -> None:
132         logger.debug(f'Shutting down threadpool executor {self.title}')
133         print(self.histogram)
134         self._thread_pool_executor.shutdown(wait)
135
136
137 class ProcessExecutor(BaseExecutor):
138     def __init__(self, max_workers=None):
139         super().__init__()
140         workers = None
141         if max_workers is not None:
142             workers = max_workers
143         elif 'executors_processpool_size' in config.config:
144             workers = config.config['executors_processpool_size']
145         logger.debug(f'Creating processpool executor with {workers} workers.')
146         self._process_executor = fut.ProcessPoolExecutor(
147             max_workers=workers,
148         )
149
150     def run_cloud_pickle(self, pickle):
151         fun, args, kwargs = cloudpickle.loads(pickle)
152         logger.debug(f"Running pickled bundle at {fun.__name__}")
153         result = fun(*args, **kwargs)
154         self.adjust_task_count(-1)
155         return result
156
157     @overrides
158     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
159         start = time.time()
160         self.adjust_task_count(+1)
161         pickle = make_cloud_pickle(function, *args, **kwargs)
162         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
163         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
164         return result
165
166     @overrides
167     def shutdown(self, wait=True) -> None:
168         logger.debug(f'Shutting down processpool executor {self.title}')
169         self._process_executor.shutdown(wait)
170         print(self.histogram)
171
172     def __getstate__(self):
173         state = self.__dict__.copy()
174         state['_process_executor'] = None
175         return state
176
177
178 class RemoteExecutorException(Exception):
179     """Thrown when a bundle cannot be executed despite several retries."""
180
181     pass
182
183
184 @dataclass
185 class RemoteWorkerRecord:
186     username: str
187     machine: str
188     weight: int
189     count: int
190
191     def __hash__(self):
192         return hash((self.username, self.machine))
193
194     def __repr__(self):
195         return f'{self.username}@{self.machine}'
196
197
198 @dataclass
199 class BundleDetails:
200     pickled_code: bytes
201     uuid: str
202     fname: str
203     worker: Optional[RemoteWorkerRecord]
204     username: Optional[str]
205     machine: Optional[str]
206     hostname: str
207     code_file: str
208     result_file: str
209     pid: int
210     start_ts: float
211     end_ts: float
212     slower_than_local_p95: bool
213     slower_than_global_p95: bool
214     src_bundle: BundleDetails
215     is_cancelled: threading.Event
216     was_cancelled: bool
217     backup_bundles: Optional[List[BundleDetails]]
218     failure_count: int
219
220     def __repr__(self):
221         uuid = self.uuid
222         if uuid[-9:-2] == '_backup':
223             uuid = uuid[:-9]
224             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
225         else:
226             suffix = uuid[-6:]
227
228         colorz = [
229             fg('violet red'),
230             fg('red'),
231             fg('orange'),
232             fg('peach orange'),
233             fg('yellow'),
234             fg('marigold yellow'),
235             fg('green yellow'),
236             fg('tea green'),
237             fg('cornflower blue'),
238             fg('turquoise blue'),
239             fg('tropical blue'),
240             fg('lavender purple'),
241             fg('medium purple'),
242         ]
243         c = colorz[int(uuid[-2:], 16) % len(colorz)]
244         fname = self.fname if self.fname is not None else 'nofname'
245         machine = self.machine if self.machine is not None else 'nomachine'
246         return f'{c}{suffix}/{fname}/{machine}{reset()}'
247
248
249 class RemoteExecutorStatus:
250     def __init__(self, total_worker_count: int) -> None:
251         self.worker_count: int = total_worker_count
252         self.known_workers: Set[RemoteWorkerRecord] = set()
253         self.start_time: float = time.time()
254         self.start_per_bundle: Dict[str, float] = defaultdict(float)
255         self.end_per_bundle: Dict[str, float] = defaultdict(float)
256         self.finished_bundle_timings_per_worker: Dict[
257             RemoteWorkerRecord, List[float]
258         ] = {}
259         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
260         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
261         self.finished_bundle_timings: List[float] = []
262         self.last_periodic_dump: Optional[float] = None
263         self.total_bundles_submitted: int = 0
264
265         # Protects reads and modification using self.  Also used
266         # as a memory fence for modifications to bundle.
267         self.lock: threading.Lock = threading.Lock()
268
269     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
270         with self.lock:
271             self.record_acquire_worker_already_locked(worker, uuid)
272
273     def record_acquire_worker_already_locked(
274         self, worker: RemoteWorkerRecord, uuid: str
275     ) -> None:
276         assert self.lock.locked()
277         self.known_workers.add(worker)
278         self.start_per_bundle[uuid] = None
279         x = self.in_flight_bundles_by_worker.get(worker, set())
280         x.add(uuid)
281         self.in_flight_bundles_by_worker[worker] = x
282
283     def record_bundle_details(self, details: BundleDetails) -> None:
284         with self.lock:
285             self.record_bundle_details_already_locked(details)
286
287     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
288         assert self.lock.locked()
289         self.bundle_details_by_uuid[details.uuid] = details
290
291     def record_release_worker(
292         self,
293         worker: RemoteWorkerRecord,
294         uuid: str,
295         was_cancelled: bool,
296     ) -> None:
297         with self.lock:
298             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
299
300     def record_release_worker_already_locked(
301         self,
302         worker: RemoteWorkerRecord,
303         uuid: str,
304         was_cancelled: bool,
305     ) -> None:
306         assert self.lock.locked()
307         ts = time.time()
308         self.end_per_bundle[uuid] = ts
309         self.in_flight_bundles_by_worker[worker].remove(uuid)
310         if not was_cancelled:
311             bundle_latency = ts - self.start_per_bundle[uuid]
312             x = self.finished_bundle_timings_per_worker.get(worker, list())
313             x.append(bundle_latency)
314             self.finished_bundle_timings_per_worker[worker] = x
315             self.finished_bundle_timings.append(bundle_latency)
316
317     def record_processing_began(self, uuid: str):
318         with self.lock:
319             self.start_per_bundle[uuid] = time.time()
320
321     def total_in_flight(self) -> int:
322         assert self.lock.locked()
323         total_in_flight = 0
324         for worker in self.known_workers:
325             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
326         return total_in_flight
327
328     def total_idle(self) -> int:
329         assert self.lock.locked()
330         return self.worker_count - self.total_in_flight()
331
332     def __repr__(self):
333         assert self.lock.locked()
334         ts = time.time()
335         total_finished = len(self.finished_bundle_timings)
336         total_in_flight = self.total_in_flight()
337         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
338         qall = None
339         if len(self.finished_bundle_timings) > 1:
340             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
341             ret += (
342                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
343                 f'✅={total_finished}/{self.total_bundles_submitted}, '
344                 f'💻n={total_in_flight}/{self.worker_count}\n'
345             )
346         else:
347             ret += (
348                 f'⏱={ts-self.start_time:.1f}s, '
349                 f'✅={total_finished}/{self.total_bundles_submitted}, '
350                 f'💻n={total_in_flight}/{self.worker_count}\n'
351             )
352
353         for worker in self.known_workers:
354             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
355             timings = self.finished_bundle_timings_per_worker.get(worker, [])
356             count = len(timings)
357             qworker = None
358             if count > 1:
359                 qworker = numpy.quantile(timings, [0.5, 0.95])
360                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
361             else:
362                 ret += '\n'
363             if count > 0:
364                 ret += f'    ...finished {count} total bundle(s) so far\n'
365             in_flight = len(self.in_flight_bundles_by_worker[worker])
366             if in_flight > 0:
367                 ret += f'    ...{in_flight} bundles currently in flight:\n'
368                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
369                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
370                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
371                     if self.start_per_bundle[bundle_uuid] is not None:
372                         sec = ts - self.start_per_bundle[bundle_uuid]
373                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
374                     else:
375                         ret += f'       {details} setting up / copying data...'
376                         sec = 0.0
377
378                     if qworker is not None:
379                         if sec > qworker[1]:
380                             ret += f'{bg("red")}>💻p95{reset()} '
381                             if details is not None:
382                                 details.slower_than_local_p95 = True
383                         else:
384                             if details is not None:
385                                 details.slower_than_local_p95 = False
386
387                     if qall is not None:
388                         if sec > qall[1]:
389                             ret += f'{bg("red")}>∀p95{reset()} '
390                             if details is not None:
391                                 details.slower_than_global_p95 = True
392                         else:
393                             details.slower_than_global_p95 = False
394                     ret += '\n'
395         return ret
396
397     def periodic_dump(self, total_bundles_submitted: int) -> None:
398         assert self.lock.locked()
399         self.total_bundles_submitted = total_bundles_submitted
400         ts = time.time()
401         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
402             print(self)
403             self.last_periodic_dump = ts
404
405
406 class RemoteWorkerSelectionPolicy(ABC):
407     def register_worker_pool(self, workers):
408         self.workers = workers
409
410     @abstractmethod
411     def is_worker_available(self) -> bool:
412         pass
413
414     @abstractmethod
415     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
416         pass
417
418
419 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
420     @overrides
421     def is_worker_available(self) -> bool:
422         for worker in self.workers:
423             if worker.count > 0:
424                 return True
425         return False
426
427     @overrides
428     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
429         grabbag = []
430         for worker in self.workers:
431             for x in range(0, worker.count):
432                 for y in range(0, worker.weight):
433                     grabbag.append(worker)
434
435         for _ in range(0, 5):
436             random.shuffle(grabbag)
437             worker = grabbag[0]
438             if worker.machine != machine_to_avoid or _ > 2:
439                 if worker.count > 0:
440                     worker.count -= 1
441                     logger.debug(f'Selected worker {worker}')
442                     return worker
443         msg = 'Unexpectedly could not find a worker, retrying...'
444         logger.warning(msg)
445         return None
446
447
448 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
449     def __init__(self) -> None:
450         self.index = 0
451
452     @overrides
453     def is_worker_available(self) -> bool:
454         for worker in self.workers:
455             if worker.count > 0:
456                 return True
457         return False
458
459     @overrides
460     def acquire_worker(
461         self, machine_to_avoid: str = None
462     ) -> Optional[RemoteWorkerRecord]:
463         x = self.index
464         while True:
465             worker = self.workers[x]
466             if worker.count > 0:
467                 worker.count -= 1
468                 x += 1
469                 if x >= len(self.workers):
470                     x = 0
471                 self.index = x
472                 logger.debug(f'Selected worker {worker}')
473                 return worker
474             x += 1
475             if x >= len(self.workers):
476                 x = 0
477             if x == self.index:
478                 msg = 'Unexpectedly could not find a worker, retrying...'
479                 logger.warning(msg)
480                 return None
481
482
483 class RemoteExecutor(BaseExecutor):
484     def __init__(
485         self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
486     ) -> None:
487         super().__init__()
488         self.workers = workers
489         self.policy = policy
490         self.worker_count = 0
491         for worker in self.workers:
492             self.worker_count += worker.count
493         if self.worker_count <= 0:
494             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
495             logger.critical(msg)
496             raise RemoteExecutorException(msg)
497         self.policy.register_worker_pool(self.workers)
498         self.cv = threading.Condition()
499         logger.debug(
500             f'Creating {self.worker_count} local threads, one per remote worker.'
501         )
502         self._helper_executor = fut.ThreadPoolExecutor(
503             thread_name_prefix="remote_executor_helper",
504             max_workers=self.worker_count,
505         )
506         self.status = RemoteExecutorStatus(self.worker_count)
507         self.total_bundles_submitted = 0
508         self.backup_lock = threading.Lock()
509         self.last_backup = None
510         (
511             self.heartbeat_thread,
512             self.heartbeat_stop_event,
513         ) = self.run_periodic_heartbeat()
514
515     @background_thread
516     def run_periodic_heartbeat(self, stop_event) -> None:
517         while not stop_event.is_set():
518             time.sleep(5.0)
519             logger.debug('Running periodic heartbeat code...')
520             self.heartbeat()
521         logger.debug('Periodic heartbeat thread shutting down.')
522
523     def heartbeat(self) -> None:
524         with self.status.lock:
525             # Dump regular progress report
526             self.status.periodic_dump(self.total_bundles_submitted)
527
528             # Look for bundles to reschedule via executor.submit
529             if config.config['executors_schedule_remote_backups']:
530                 self.maybe_schedule_backup_bundles()
531
532     def maybe_schedule_backup_bundles(self):
533         assert self.status.lock.locked()
534         num_done = len(self.status.finished_bundle_timings)
535         num_idle_workers = self.worker_count - self.task_count
536         now = time.time()
537         if (
538             num_done > 2
539             and num_idle_workers > 1
540             and (self.last_backup is None or (now - self.last_backup > 6.0))
541             and self.backup_lock.acquire(blocking=False)
542         ):
543             try:
544                 assert self.backup_lock.locked()
545
546                 bundle_to_backup = None
547                 best_score = None
548                 for (
549                     worker,
550                     bundle_uuids,
551                 ) in self.status.in_flight_bundles_by_worker.items():
552
553                     # Prefer to schedule backups of bundles running on
554                     # slower machines.
555                     base_score = 0
556                     for record in self.workers:
557                         if worker.machine == record.machine:
558                             base_score = float(record.weight)
559                             base_score = 1.0 / base_score
560                             base_score *= 200.0
561                             base_score = int(base_score)
562                             break
563
564                     for uuid in bundle_uuids:
565                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
566                         if (
567                             bundle is not None
568                             and bundle.src_bundle is None
569                             and bundle.backup_bundles is not None
570                         ):
571                             score = base_score
572
573                             # Schedule backups of bundles running
574                             # longer; especially those that are
575                             # unexpectedly slow.
576                             start_ts = self.status.start_per_bundle[uuid]
577                             if start_ts is not None:
578                                 runtime = now - start_ts
579                                 score += runtime
580                                 logger.debug(
581                                     f'score[{bundle}] => {score}  # latency boost'
582                                 )
583
584                                 if bundle.slower_than_local_p95:
585                                     score += runtime / 2
586                                     logger.debug(
587                                         f'score[{bundle}] => {score}  # >worker p95'
588                                     )
589
590                                 if bundle.slower_than_global_p95:
591                                     score += runtime / 4
592                                     logger.debug(
593                                         f'score[{bundle}] => {score}  # >global p95'
594                                     )
595
596                             # Prefer backups of bundles that don't
597                             # have backups already.
598                             backup_count = len(bundle.backup_bundles)
599                             if backup_count == 0:
600                                 score *= 2
601                             elif backup_count == 1:
602                                 score /= 2
603                             elif backup_count == 2:
604                                 score /= 8
605                             else:
606                                 score = 0
607                             logger.debug(
608                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
609                             )
610
611                             if score != 0 and (
612                                 best_score is None or score > best_score
613                             ):
614                                 bundle_to_backup = bundle
615                                 assert bundle is not None
616                                 assert bundle.backup_bundles is not None
617                                 assert bundle.src_bundle is None
618                                 best_score = score
619
620                 # Note: this is all still happening on the heartbeat
621                 # runner thread.  That's ok because
622                 # schedule_backup_for_bundle uses the executor to
623                 # submit the bundle again which will cause it to be
624                 # picked up by a worker thread and allow this thread
625                 # to return to run future heartbeats.
626                 if bundle_to_backup is not None:
627                     self.last_backup = now
628                     logger.info(
629                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
630                     )
631                     self.schedule_backup_for_bundle(bundle_to_backup)
632             finally:
633                 self.backup_lock.release()
634
635     def is_worker_available(self) -> bool:
636         return self.policy.is_worker_available()
637
638     def acquire_worker(
639         self, machine_to_avoid: str = None
640     ) -> Optional[RemoteWorkerRecord]:
641         return self.policy.acquire_worker(machine_to_avoid)
642
643     def find_available_worker_or_block(
644         self, machine_to_avoid: str = None
645     ) -> RemoteWorkerRecord:
646         with self.cv:
647             while not self.is_worker_available():
648                 self.cv.wait()
649             worker = self.acquire_worker(machine_to_avoid)
650             if worker is not None:
651                 return worker
652         msg = "We should never reach this point in the code"
653         logger.critical(msg)
654         raise Exception(msg)
655
656     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
657         worker = bundle.worker
658         assert worker is not None
659         logger.debug(f'Released worker {worker}')
660         self.status.record_release_worker(
661             worker,
662             bundle.uuid,
663             was_cancelled,
664         )
665         with self.cv:
666             worker.count += 1
667             self.cv.notify()
668         self.adjust_task_count(-1)
669
670     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
671         with self.status.lock:
672             if bundle.is_cancelled.wait(timeout=0.0):
673                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
674                 bundle.was_cancelled = True
675                 return True
676         return False
677
678     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
679         """Find a worker for bundle or block until one is available."""
680         self.adjust_task_count(+1)
681         uuid = bundle.uuid
682         hostname = bundle.hostname
683         avoid_machine = override_avoid_machine
684         is_original = bundle.src_bundle is None
685
686         # Try not to schedule a backup on the same host as the original.
687         if avoid_machine is None and bundle.src_bundle is not None:
688             avoid_machine = bundle.src_bundle.machine
689         worker = None
690         while worker is None:
691             worker = self.find_available_worker_or_block(avoid_machine)
692         assert worker
693
694         # Ok, found a worker.
695         bundle.worker = worker
696         machine = bundle.machine = worker.machine
697         username = bundle.username = worker.username
698         self.status.record_acquire_worker(worker, uuid)
699         logger.debug(f'{bundle}: Running bundle on {worker}...')
700
701         # Before we do any work, make sure the bundle is still viable.
702         # It may have been some time between when it was submitted and
703         # now due to lack of worker availability and someone else may
704         # have already finished it.
705         if self.check_if_cancelled(bundle):
706             try:
707                 return self.process_work_result(bundle)
708             except Exception as e:
709                 logger.warning(
710                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
711                 )
712                 self.release_worker(bundle)
713                 if is_original:
714                     # Weird.  We are the original owner of this
715                     # bundle.  For it to have been cancelled, a backup
716                     # must have already started and completed before
717                     # we even for started.  Moreover, the backup says
718                     # it is done but we can't find the results it
719                     # should have copied over.  Reschedule the whole
720                     # thing.
721                     logger.exception(e)
722                     logger.error(
723                         f'{bundle}: We are the original owner thread and yet there are '
724                         + 'no results for this bundle.  This is unexpected and bad.'
725                     )
726                     return self.emergency_retry_nasty_bundle(bundle)
727                 else:
728                     # Expected(?).  We're a backup and our bundle is
729                     # cancelled before we even got started.  Something
730                     # went bad in process_work_result (I acutually don't
731                     # see what?) but probably not worth worrying
732                     # about.  Let the original thread worry about
733                     # either finding the results or complaining about
734                     # it.
735                     return None
736
737         # Send input code / data to worker machine if it's not local.
738         if hostname not in machine:
739             try:
740                 cmd = (
741                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
742                 )
743                 start_ts = time.time()
744                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
745                 run_silently(cmd)
746                 xfer_latency = time.time() - start_ts
747                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
748             except Exception as e:
749                 self.release_worker(bundle)
750                 if is_original:
751                     # Weird.  We tried to copy the code to the worker and it failed...
752                     # And we're the original bundle.  We have to retry.
753                     logger.exception(e)
754                     logger.error(
755                         f"{bundle}: Failed to send instructions to the worker machine?! "
756                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
757                         + "be a race condition.  Attempting an emergency retry..."
758                     )
759                     return self.emergency_retry_nasty_bundle(bundle)
760                 else:
761                     # This is actually expected; we're a backup.
762                     # There's a race condition where someone else
763                     # already finished the work and removed the source
764                     # code file before we could copy it.  No biggie.
765                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
766                     msg += 'We\'re a backup and this may be caused by the original (or some '
767                     msg += 'other backup) already finishing this work.  Ignoring this.'
768                     logger.warning(msg)
769                     return None
770
771         # Kick off the work.  Note that if this fails we let
772         # wait_for_process deal with it.
773         self.status.record_processing_began(uuid)
774         cmd = (
775             f'{SSH} {bundle.username}@{bundle.machine} '
776             f'"source py38-venv/bin/activate &&'
777             f' /home/scott/lib/python_modules/remote_worker.py'
778             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
779         )
780         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
781         p = cmd_in_background(cmd, silent=True)
782         bundle.pid = p.pid
783         logger.debug(
784             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
785         )
786         return self.wait_for_process(p, bundle, 0)
787
788     def wait_for_process(
789         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
790     ) -> Any:
791         machine = bundle.machine
792         pid = p.pid
793         if depth > 3:
794             logger.error(
795                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
796             )
797             p.terminate()
798             self.release_worker(bundle)
799             return self.emergency_retry_nasty_bundle(bundle)
800
801         # Spin until either the ssh job we scheduled finishes the
802         # bundle or some backup worker signals that they finished it
803         # before we could.
804         while True:
805             try:
806                 p.wait(timeout=0.25)
807             except subprocess.TimeoutExpired:
808                 if self.check_if_cancelled(bundle):
809                     logger.info(
810                         f'{bundle}: looks like another worker finished bundle...'
811                     )
812                     break
813             else:
814                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
815                 p = None
816                 break
817
818         # If we get here we believe the bundle is done; either the ssh
819         # subprocess finished (hopefully successfully) or we noticed
820         # that some other worker seems to have completed the bundle
821         # and we're bailing out.
822         try:
823             ret = self.process_work_result(bundle)
824             if ret is not None and p is not None:
825                 p.terminate()
826             return ret
827
828         # Something went wrong; e.g. we could not copy the results
829         # back, cleanup after ourselves on the remote machine, or
830         # unpickle the results we got from the remove machine.  If we
831         # still have an active ssh subprocess, keep waiting on it.
832         # Otherwise, time for an emergency reschedule.
833         except Exception as e:
834             logger.exception(e)
835             logger.error(f'{bundle}: Something unexpected just happened...')
836             if p is not None:
837                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
838                 logger.warning(msg)
839                 return self.wait_for_process(p, bundle, depth + 1)
840             else:
841                 self.release_worker(bundle)
842                 return self.emergency_retry_nasty_bundle(bundle)
843
844     def process_work_result(self, bundle: BundleDetails) -> Any:
845         with self.status.lock:
846             is_original = bundle.src_bundle is None
847             was_cancelled = bundle.was_cancelled
848             username = bundle.username
849             machine = bundle.machine
850             result_file = bundle.result_file
851             code_file = bundle.code_file
852
853             # Whether original or backup, if we finished first we must
854             # fetch the results if the computation happened on a
855             # remote machine.
856             bundle.end_ts = time.time()
857             if not was_cancelled:
858                 assert bundle.machine is not None
859                 if bundle.hostname not in bundle.machine:
860                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
861                     logger.info(
862                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
863                     )
864
865                     # If either of these throw they are handled in
866                     # wait_for_process.
867                     attempts = 0
868                     while True:
869                         try:
870                             run_silently(cmd)
871                         except Exception as e:
872                             attempts += 1
873                             if attempts >= 3:
874                                 raise (e)
875                         else:
876                             break
877
878                     run_silently(
879                         f'{SSH} {username}@{machine}'
880                         f' "/bin/rm -f {code_file} {result_file}"'
881                     )
882                     logger.debug(
883                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
884                     )
885                 dur = bundle.end_ts - bundle.start_ts
886                 self.histogram.add_item(dur)
887
888         # Only the original worker should unpickle the file contents
889         # though since it's the only one whose result matters.  The
890         # original is also the only job that may delete result_file
891         # from disk.  Note that the original may have been cancelled
892         # if one of the backups finished first; it still must read the
893         # result from disk.
894         if is_original:
895             logger.debug(f"{bundle}: Unpickling {result_file}.")
896             try:
897                 with open(result_file, 'rb') as rb:
898                     serialized = rb.read()
899                 result = cloudpickle.loads(serialized)
900             except Exception as e:
901                 logger.exception(e)
902                 msg = f'Failed to load {result_file}... this is bad news.'
903                 logger.critical(msg)
904                 self.release_worker(bundle)
905
906                 # Re-raise the exception; the code in wait_for_process may
907                 # decide to emergency_retry_nasty_bundle here.
908                 raise Exception(e)
909             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
910             os.remove(f'{result_file}')
911             os.remove(f'{code_file}')
912
913             # Notify any backups that the original is done so they
914             # should stop ASAP.  Do this whether or not we
915             # finished first since there could be more than one
916             # backup.
917             if bundle.backup_bundles is not None:
918                 for backup in bundle.backup_bundles:
919                     logger.debug(
920                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
921                     )
922                     backup.is_cancelled.set()
923
924         # This is a backup job and, by now, we have already fetched
925         # the bundle results.
926         else:
927             # Backup results don't matter, they just need to leave the
928             # result file in the right place for their originals to
929             # read/unpickle later.
930             result = None
931
932             # Tell the original to stop if we finished first.
933             if not was_cancelled:
934                 logger.debug(
935                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
936                 )
937                 bundle.src_bundle.is_cancelled.set()
938         self.release_worker(bundle, was_cancelled=was_cancelled)
939         return result
940
941     def create_original_bundle(self, pickle, fname: str):
942         from string_utils import generate_uuid
943
944         uuid = generate_uuid(omit_dashes=True)
945         code_file = f'/tmp/{uuid}.code.bin'
946         result_file = f'/tmp/{uuid}.result.bin'
947
948         logger.debug(f'Writing pickled code to {code_file}')
949         with open(f'{code_file}', 'wb') as wb:
950             wb.write(pickle)
951
952         bundle = BundleDetails(
953             pickled_code=pickle,
954             uuid=uuid,
955             fname=fname,
956             worker=None,
957             username=None,
958             machine=None,
959             hostname=platform.node(),
960             code_file=code_file,
961             result_file=result_file,
962             pid=0,
963             start_ts=time.time(),
964             end_ts=0.0,
965             slower_than_local_p95=False,
966             slower_than_global_p95=False,
967             src_bundle=None,
968             is_cancelled=threading.Event(),
969             was_cancelled=False,
970             backup_bundles=[],
971             failure_count=0,
972         )
973         self.status.record_bundle_details(bundle)
974         logger.debug(f'{bundle}: Created an original bundle')
975         return bundle
976
977     def create_backup_bundle(self, src_bundle: BundleDetails):
978         assert src_bundle.backup_bundles is not None
979         n = len(src_bundle.backup_bundles)
980         uuid = src_bundle.uuid + f'_backup#{n}'
981
982         backup_bundle = BundleDetails(
983             pickled_code=src_bundle.pickled_code,
984             uuid=uuid,
985             fname=src_bundle.fname,
986             worker=None,
987             username=None,
988             machine=None,
989             hostname=src_bundle.hostname,
990             code_file=src_bundle.code_file,
991             result_file=src_bundle.result_file,
992             pid=0,
993             start_ts=time.time(),
994             end_ts=0.0,
995             slower_than_local_p95=False,
996             slower_than_global_p95=False,
997             src_bundle=src_bundle,
998             is_cancelled=threading.Event(),
999             was_cancelled=False,
1000             backup_bundles=None,  # backup backups not allowed
1001             failure_count=0,
1002         )
1003         src_bundle.backup_bundles.append(backup_bundle)
1004         self.status.record_bundle_details_already_locked(backup_bundle)
1005         logger.debug(f'{backup_bundle}: Created a backup bundle')
1006         return backup_bundle
1007
1008     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1009         assert self.status.lock.locked()
1010         assert src_bundle is not None
1011         backup_bundle = self.create_backup_bundle(src_bundle)
1012         logger.debug(
1013             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1014         )
1015         self._helper_executor.submit(self.launch, backup_bundle)
1016
1017         # Results from backups don't matter; if they finish first
1018         # they will move the result_file to this machine and let
1019         # the original pick them up and unpickle them.
1020
1021     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1022         is_original = bundle.src_bundle is None
1023         bundle.worker = None
1024         avoid_last_machine = bundle.machine
1025         bundle.machine = None
1026         bundle.username = None
1027         bundle.failure_count += 1
1028         if is_original:
1029             retry_limit = 3
1030         else:
1031             retry_limit = 2
1032
1033         if bundle.failure_count > retry_limit:
1034             logger.error(
1035                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1036             )
1037             if is_original:
1038                 raise RemoteExecutorException(
1039                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1040                 )
1041             else:
1042                 logger.error(
1043                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1044                 )
1045             return None
1046         else:
1047             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1048             logger.warning(msg)
1049             warnings.warn(msg)
1050             return self.launch(bundle, avoid_last_machine)
1051
1052     @overrides
1053     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1054         pickle = make_cloud_pickle(function, *args, **kwargs)
1055         bundle = self.create_original_bundle(pickle, function.__name__)
1056         self.total_bundles_submitted += 1
1057         return self._helper_executor.submit(self.launch, bundle)
1058
1059     @overrides
1060     def shutdown(self, wait=True) -> None:
1061         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1062         self.heartbeat_stop_event.set()
1063         self.heartbeat_thread.join()
1064         self._helper_executor.shutdown(wait)
1065         print(self.histogram)
1066
1067
1068 @singleton
1069 class DefaultExecutors(object):
1070     def __init__(self):
1071         self.thread_executor: Optional[ThreadExecutor] = None
1072         self.process_executor: Optional[ProcessExecutor] = None
1073         self.remote_executor: Optional[RemoteExecutor] = None
1074
1075     def ping(self, host) -> bool:
1076         logger.debug(f'RUN> ping -c 1 {host}')
1077         try:
1078             x = cmd_with_timeout(
1079                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1080             )
1081             return x == 0
1082         except Exception:
1083             return False
1084
1085     def thread_pool(self) -> ThreadExecutor:
1086         if self.thread_executor is None:
1087             self.thread_executor = ThreadExecutor()
1088         return self.thread_executor
1089
1090     def process_pool(self) -> ProcessExecutor:
1091         if self.process_executor is None:
1092             self.process_executor = ProcessExecutor()
1093         return self.process_executor
1094
1095     def remote_pool(self) -> RemoteExecutor:
1096         if self.remote_executor is None:
1097             logger.info('Looking for some helper machines...')
1098             pool: List[RemoteWorkerRecord] = []
1099             if self.ping('cheetah.house'):
1100                 logger.info('Found cheetah.house')
1101                 pool.append(
1102                     RemoteWorkerRecord(
1103                         username='scott',
1104                         machine='cheetah.house',
1105                         weight=25,
1106                         count=6,
1107                     ),
1108                 )
1109             if self.ping('meerkat.cabin'):
1110                 logger.info('Found meerkat.cabin')
1111                 pool.append(
1112                     RemoteWorkerRecord(
1113                         username='scott',
1114                         machine='meerkat.cabin',
1115                         weight=12,
1116                         count=2,
1117                     ),
1118                 )
1119             if self.ping('wannabe.house'):
1120                 logger.info('Found wannabe.house')
1121                 pool.append(
1122                     RemoteWorkerRecord(
1123                         username='scott',
1124                         machine='wannabe.house',
1125                         weight=30,
1126                         count=10,
1127                     ),
1128                 )
1129             if self.ping('puma.cabin'):
1130                 logger.info('Found puma.cabin')
1131                 pool.append(
1132                     RemoteWorkerRecord(
1133                         username='scott',
1134                         machine='puma.cabin',
1135                         weight=25,
1136                         count=6,
1137                     ),
1138                 )
1139             if self.ping('backup.house'):
1140                 logger.info('Found backup.house')
1141                 pool.append(
1142                     RemoteWorkerRecord(
1143                         username='scott',
1144                         machine='backup.house',
1145                         weight=7,
1146                         count=2,
1147                     ),
1148                 )
1149
1150             # The controller machine has a lot to do; go easy on it.
1151             for record in pool:
1152                 if record.machine == platform.node() and record.count > 1:
1153                     logger.info(f'Reducing workload for {record.machine}.')
1154                     record.count = 1
1155
1156             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1157             policy.register_worker_pool(pool)
1158             self.remote_executor = RemoteExecutor(pool, policy)
1159         return self.remote_executor
1160
1161     def shutdown(self) -> None:
1162         if self.thread_executor is not None:
1163             self.thread_executor.shutdown()
1164             self.thread_executor = None
1165         if self.process_executor is not None:
1166             self.process_executor.shutdown()
1167             self.process_executor = None
1168         if self.remote_executor is not None:
1169             self.remote_executor.shutdown()
1170             self.remote_executor = None