Ahem. Still running black?
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.task_count = 0
78         self.histogram = hist.SimpleHistogram(
79             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
80         )
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, wait: bool = True) -> None:
88         pass
89
90     def adjust_task_count(self, delta: int) -> None:
91         self.task_count += delta
92         logger.debug(f'Executor current task count is {self.task_count}')
93
94
95 class ThreadExecutor(BaseExecutor):
96     def __init__(self, max_workers: Optional[int] = None):
97         super().__init__()
98         workers = None
99         if max_workers is not None:
100             workers = max_workers
101         elif 'executors_threadpool_size' in config.config:
102             workers = config.config['executors_threadpool_size']
103         logger.debug(f'Creating threadpool executor with {workers} workers')
104         self._thread_pool_executor = fut.ThreadPoolExecutor(
105             max_workers=workers, thread_name_prefix="thread_executor_helper"
106         )
107
108     def run_local_bundle(self, fun, *args, **kwargs):
109         logger.debug(f"Running local bundle at {fun.__name__}")
110         start = time.time()
111         result = fun(*args, **kwargs)
112         end = time.time()
113         self.adjust_task_count(-1)
114         duration = end - start
115         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
116         self.histogram.add_item(duration)
117         return result
118
119     @overrides
120     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
121         self.adjust_task_count(+1)
122         newargs = []
123         newargs.append(function)
124         for arg in args:
125             newargs.append(arg)
126         return self._thread_pool_executor.submit(
127             self.run_local_bundle, *newargs, **kwargs
128         )
129
130     @overrides
131     def shutdown(self, wait=True) -> None:
132         logger.debug(f'Shutting down threadpool executor {self.title}')
133         print(self.histogram)
134         self._thread_pool_executor.shutdown(wait)
135
136
137 class ProcessExecutor(BaseExecutor):
138     def __init__(self, max_workers=None):
139         super().__init__()
140         workers = None
141         if max_workers is not None:
142             workers = max_workers
143         elif 'executors_processpool_size' in config.config:
144             workers = config.config['executors_processpool_size']
145         logger.debug(f'Creating processpool executor with {workers} workers.')
146         self._process_executor = fut.ProcessPoolExecutor(
147             max_workers=workers,
148         )
149
150     def run_cloud_pickle(self, pickle):
151         fun, args, kwargs = cloudpickle.loads(pickle)
152         logger.debug(f"Running pickled bundle at {fun.__name__}")
153         result = fun(*args, **kwargs)
154         self.adjust_task_count(-1)
155         return result
156
157     @overrides
158     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
159         start = time.time()
160         self.adjust_task_count(+1)
161         pickle = make_cloud_pickle(function, *args, **kwargs)
162         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
163         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
164         return result
165
166     @overrides
167     def shutdown(self, wait=True) -> None:
168         logger.debug(f'Shutting down processpool executor {self.title}')
169         self._process_executor.shutdown(wait)
170         print(self.histogram)
171
172     def __getstate__(self):
173         state = self.__dict__.copy()
174         state['_process_executor'] = None
175         return state
176
177
178 class RemoteExecutorException(Exception):
179     """Thrown when a bundle cannot be executed despite several retries."""
180
181     pass
182
183
184 @dataclass
185 class RemoteWorkerRecord:
186     username: str
187     machine: str
188     weight: int
189     count: int
190
191     def __hash__(self):
192         return hash((self.username, self.machine))
193
194     def __repr__(self):
195         return f'{self.username}@{self.machine}'
196
197
198 @dataclass
199 class BundleDetails:
200     pickled_code: bytes
201     uuid: str
202     fname: str
203     worker: Optional[RemoteWorkerRecord]
204     username: Optional[str]
205     machine: Optional[str]
206     hostname: str
207     code_file: str
208     result_file: str
209     pid: int
210     start_ts: float
211     end_ts: float
212     slower_than_local_p95: bool
213     slower_than_global_p95: bool
214     src_bundle: BundleDetails
215     is_cancelled: threading.Event
216     was_cancelled: bool
217     backup_bundles: Optional[List[BundleDetails]]
218     failure_count: int
219
220     def __repr__(self):
221         uuid = self.uuid
222         if uuid[-9:-2] == '_backup':
223             uuid = uuid[:-9]
224             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
225         else:
226             suffix = uuid[-6:]
227
228         colorz = [
229             fg('violet red'),
230             fg('red'),
231             fg('orange'),
232             fg('peach orange'),
233             fg('yellow'),
234             fg('marigold yellow'),
235             fg('green yellow'),
236             fg('tea green'),
237             fg('cornflower blue'),
238             fg('turquoise blue'),
239             fg('tropical blue'),
240             fg('lavender purple'),
241             fg('medium purple'),
242         ]
243         c = colorz[int(uuid[-2:], 16) % len(colorz)]
244         fname = self.fname if self.fname is not None else 'nofname'
245         machine = self.machine if self.machine is not None else 'nomachine'
246         return f'{c}{suffix}/{fname}/{machine}{reset()}'
247
248
249 class RemoteExecutorStatus:
250     def __init__(self, total_worker_count: int) -> None:
251         self.worker_count: int = total_worker_count
252         self.known_workers: Set[RemoteWorkerRecord] = set()
253         self.start_time: float = time.time()
254         self.start_per_bundle: Dict[str, float] = defaultdict(float)
255         self.end_per_bundle: Dict[str, float] = defaultdict(float)
256         self.finished_bundle_timings_per_worker: Dict[
257             RemoteWorkerRecord, List[float]
258         ] = {}
259         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
260         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
261         self.finished_bundle_timings: List[float] = []
262         self.last_periodic_dump: Optional[float] = None
263         self.total_bundles_submitted: int = 0
264
265         # Protects reads and modification using self.  Also used
266         # as a memory fence for modifications to bundle.
267         self.lock: threading.Lock = threading.Lock()
268
269     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
270         with self.lock:
271             self.record_acquire_worker_already_locked(worker, uuid)
272
273     def record_acquire_worker_already_locked(
274         self, worker: RemoteWorkerRecord, uuid: str
275     ) -> None:
276         assert self.lock.locked()
277         self.known_workers.add(worker)
278         self.start_per_bundle[uuid] = None
279         x = self.in_flight_bundles_by_worker.get(worker, set())
280         x.add(uuid)
281         self.in_flight_bundles_by_worker[worker] = x
282
283     def record_bundle_details(self, details: BundleDetails) -> None:
284         with self.lock:
285             self.record_bundle_details_already_locked(details)
286
287     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
288         assert self.lock.locked()
289         self.bundle_details_by_uuid[details.uuid] = details
290
291     def record_release_worker(
292         self,
293         worker: RemoteWorkerRecord,
294         uuid: str,
295         was_cancelled: bool,
296     ) -> None:
297         with self.lock:
298             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
299
300     def record_release_worker_already_locked(
301         self,
302         worker: RemoteWorkerRecord,
303         uuid: str,
304         was_cancelled: bool,
305     ) -> None:
306         assert self.lock.locked()
307         ts = time.time()
308         self.end_per_bundle[uuid] = ts
309         self.in_flight_bundles_by_worker[worker].remove(uuid)
310         if not was_cancelled:
311             bundle_latency = ts - self.start_per_bundle[uuid]
312             x = self.finished_bundle_timings_per_worker.get(worker, list())
313             x.append(bundle_latency)
314             self.finished_bundle_timings_per_worker[worker] = x
315             self.finished_bundle_timings.append(bundle_latency)
316
317     def record_processing_began(self, uuid: str):
318         with self.lock:
319             self.start_per_bundle[uuid] = time.time()
320
321     def total_in_flight(self) -> int:
322         assert self.lock.locked()
323         total_in_flight = 0
324         for worker in self.known_workers:
325             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
326         return total_in_flight
327
328     def total_idle(self) -> int:
329         assert self.lock.locked()
330         return self.worker_count - self.total_in_flight()
331
332     def __repr__(self):
333         assert self.lock.locked()
334         ts = time.time()
335         total_finished = len(self.finished_bundle_timings)
336         total_in_flight = self.total_in_flight()
337         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
338         qall = None
339         if len(self.finished_bundle_timings) > 1:
340             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
341             ret += (
342                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
343                 f'✅={total_finished}/{self.total_bundles_submitted}, '
344                 f'💻n={total_in_flight}/{self.worker_count}\n'
345             )
346         else:
347             ret += (
348                 f'⏱={ts-self.start_time:.1f}s, '
349                 f'✅={total_finished}/{self.total_bundles_submitted}, '
350                 f'💻n={total_in_flight}/{self.worker_count}\n'
351             )
352
353         for worker in self.known_workers:
354             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
355             timings = self.finished_bundle_timings_per_worker.get(worker, [])
356             count = len(timings)
357             qworker = None
358             if count > 1:
359                 qworker = numpy.quantile(timings, [0.5, 0.95])
360                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
361             else:
362                 ret += '\n'
363             if count > 0:
364                 ret += f'    ...finished {count} total bundle(s) so far\n'
365             in_flight = len(self.in_flight_bundles_by_worker[worker])
366             if in_flight > 0:
367                 ret += f'    ...{in_flight} bundles currently in flight:\n'
368                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
369                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
370                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
371                     if self.start_per_bundle[bundle_uuid] is not None:
372                         sec = ts - self.start_per_bundle[bundle_uuid]
373                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
374                     else:
375                         ret += f'       {details} setting up / copying data...'
376                         sec = 0.0
377
378                     if qworker is not None:
379                         if sec > qworker[1]:
380                             ret += f'{bg("red")}>💻p95{reset()} '
381                             if details is not None:
382                                 details.slower_than_local_p95 = True
383                         else:
384                             if details is not None:
385                                 details.slower_than_local_p95 = False
386
387                     if qall is not None:
388                         if sec > qall[1]:
389                             ret += f'{bg("red")}>∀p95{reset()} '
390                             if details is not None:
391                                 details.slower_than_global_p95 = True
392                         else:
393                             details.slower_than_global_p95 = False
394                     ret += '\n'
395         return ret
396
397     def periodic_dump(self, total_bundles_submitted: int) -> None:
398         assert self.lock.locked()
399         self.total_bundles_submitted = total_bundles_submitted
400         ts = time.time()
401         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
402             print(self)
403             self.last_periodic_dump = ts
404
405
406 class RemoteWorkerSelectionPolicy(ABC):
407     def register_worker_pool(self, workers):
408         self.workers = workers
409
410     @abstractmethod
411     def is_worker_available(self) -> bool:
412         pass
413
414     @abstractmethod
415     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
416         pass
417
418
419 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
420     @overrides
421     def is_worker_available(self) -> bool:
422         for worker in self.workers:
423             if worker.count > 0:
424                 return True
425         return False
426
427     @overrides
428     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
429         grabbag = []
430         for worker in self.workers:
431             for x in range(0, worker.count):
432                 for y in range(0, worker.weight):
433                     grabbag.append(worker)
434
435         for _ in range(0, 5):
436             random.shuffle(grabbag)
437             worker = grabbag[0]
438             if worker.machine != machine_to_avoid or _ > 2:
439                 if worker.count > 0:
440                     worker.count -= 1
441                     logger.debug(f'Selected worker {worker}')
442                     return worker
443         msg = 'Unexpectedly could not find a worker, retrying...'
444         logger.warning(msg)
445         return None
446
447
448 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
449     def __init__(self) -> None:
450         self.index = 0
451
452     @overrides
453     def is_worker_available(self) -> bool:
454         for worker in self.workers:
455             if worker.count > 0:
456                 return True
457         return False
458
459     @overrides
460     def acquire_worker(
461         self, machine_to_avoid: str = None
462     ) -> Optional[RemoteWorkerRecord]:
463         x = self.index
464         while True:
465             worker = self.workers[x]
466             if worker.count > 0:
467                 worker.count -= 1
468                 x += 1
469                 if x >= len(self.workers):
470                     x = 0
471                 self.index = x
472                 logger.debug(f'Selected worker {worker}')
473                 return worker
474             x += 1
475             if x >= len(self.workers):
476                 x = 0
477             if x == self.index:
478                 msg = 'Unexpectedly could not find a worker, retrying...'
479                 logger.warning(msg)
480                 return None
481
482
483 class RemoteExecutor(BaseExecutor):
484     def __init__(
485         self,
486         workers: List[RemoteWorkerRecord],
487         policy: RemoteWorkerSelectionPolicy,
488     ) -> None:
489         super().__init__()
490         self.workers = workers
491         self.policy = policy
492         self.worker_count = 0
493         for worker in self.workers:
494             self.worker_count += worker.count
495         if self.worker_count <= 0:
496             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
497             logger.critical(msg)
498             raise RemoteExecutorException(msg)
499         self.policy.register_worker_pool(self.workers)
500         self.cv = threading.Condition()
501         logger.debug(
502             f'Creating {self.worker_count} local threads, one per remote worker.'
503         )
504         self._helper_executor = fut.ThreadPoolExecutor(
505             thread_name_prefix="remote_executor_helper",
506             max_workers=self.worker_count,
507         )
508         self.status = RemoteExecutorStatus(self.worker_count)
509         self.total_bundles_submitted = 0
510         self.backup_lock = threading.Lock()
511         self.last_backup = None
512         (
513             self.heartbeat_thread,
514             self.heartbeat_stop_event,
515         ) = self.run_periodic_heartbeat()
516
517     @background_thread
518     def run_periodic_heartbeat(self, stop_event) -> None:
519         while not stop_event.is_set():
520             time.sleep(5.0)
521             logger.debug('Running periodic heartbeat code...')
522             self.heartbeat()
523         logger.debug('Periodic heartbeat thread shutting down.')
524
525     def heartbeat(self) -> None:
526         with self.status.lock:
527             # Dump regular progress report
528             self.status.periodic_dump(self.total_bundles_submitted)
529
530             # Look for bundles to reschedule via executor.submit
531             if config.config['executors_schedule_remote_backups']:
532                 self.maybe_schedule_backup_bundles()
533
534     def maybe_schedule_backup_bundles(self):
535         assert self.status.lock.locked()
536         num_done = len(self.status.finished_bundle_timings)
537         num_idle_workers = self.worker_count - self.task_count
538         now = time.time()
539         if (
540             num_done > 2
541             and num_idle_workers > 1
542             and (self.last_backup is None or (now - self.last_backup > 6.0))
543             and self.backup_lock.acquire(blocking=False)
544         ):
545             try:
546                 assert self.backup_lock.locked()
547
548                 bundle_to_backup = None
549                 best_score = None
550                 for (
551                     worker,
552                     bundle_uuids,
553                 ) in self.status.in_flight_bundles_by_worker.items():
554
555                     # Prefer to schedule backups of bundles running on
556                     # slower machines.
557                     base_score = 0
558                     for record in self.workers:
559                         if worker.machine == record.machine:
560                             base_score = float(record.weight)
561                             base_score = 1.0 / base_score
562                             base_score *= 200.0
563                             base_score = int(base_score)
564                             break
565
566                     for uuid in bundle_uuids:
567                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
568                         if (
569                             bundle is not None
570                             and bundle.src_bundle is None
571                             and bundle.backup_bundles is not None
572                         ):
573                             score = base_score
574
575                             # Schedule backups of bundles running
576                             # longer; especially those that are
577                             # unexpectedly slow.
578                             start_ts = self.status.start_per_bundle[uuid]
579                             if start_ts is not None:
580                                 runtime = now - start_ts
581                                 score += runtime
582                                 logger.debug(
583                                     f'score[{bundle}] => {score}  # latency boost'
584                                 )
585
586                                 if bundle.slower_than_local_p95:
587                                     score += runtime / 2
588                                     logger.debug(
589                                         f'score[{bundle}] => {score}  # >worker p95'
590                                     )
591
592                                 if bundle.slower_than_global_p95:
593                                     score += runtime / 4
594                                     logger.debug(
595                                         f'score[{bundle}] => {score}  # >global p95'
596                                     )
597
598                             # Prefer backups of bundles that don't
599                             # have backups already.
600                             backup_count = len(bundle.backup_bundles)
601                             if backup_count == 0:
602                                 score *= 2
603                             elif backup_count == 1:
604                                 score /= 2
605                             elif backup_count == 2:
606                                 score /= 8
607                             else:
608                                 score = 0
609                             logger.debug(
610                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
611                             )
612
613                             if score != 0 and (
614                                 best_score is None or score > best_score
615                             ):
616                                 bundle_to_backup = bundle
617                                 assert bundle is not None
618                                 assert bundle.backup_bundles is not None
619                                 assert bundle.src_bundle is None
620                                 best_score = score
621
622                 # Note: this is all still happening on the heartbeat
623                 # runner thread.  That's ok because
624                 # schedule_backup_for_bundle uses the executor to
625                 # submit the bundle again which will cause it to be
626                 # picked up by a worker thread and allow this thread
627                 # to return to run future heartbeats.
628                 if bundle_to_backup is not None:
629                     self.last_backup = now
630                     logger.info(
631                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
632                     )
633                     self.schedule_backup_for_bundle(bundle_to_backup)
634             finally:
635                 self.backup_lock.release()
636
637     def is_worker_available(self) -> bool:
638         return self.policy.is_worker_available()
639
640     def acquire_worker(
641         self, machine_to_avoid: str = None
642     ) -> Optional[RemoteWorkerRecord]:
643         return self.policy.acquire_worker(machine_to_avoid)
644
645     def find_available_worker_or_block(
646         self, machine_to_avoid: str = None
647     ) -> RemoteWorkerRecord:
648         with self.cv:
649             while not self.is_worker_available():
650                 self.cv.wait()
651             worker = self.acquire_worker(machine_to_avoid)
652             if worker is not None:
653                 return worker
654         msg = "We should never reach this point in the code"
655         logger.critical(msg)
656         raise Exception(msg)
657
658     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
659         worker = bundle.worker
660         assert worker is not None
661         logger.debug(f'Released worker {worker}')
662         self.status.record_release_worker(
663             worker,
664             bundle.uuid,
665             was_cancelled,
666         )
667         with self.cv:
668             worker.count += 1
669             self.cv.notify()
670         self.adjust_task_count(-1)
671
672     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
673         with self.status.lock:
674             if bundle.is_cancelled.wait(timeout=0.0):
675                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
676                 bundle.was_cancelled = True
677                 return True
678         return False
679
680     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
681         """Find a worker for bundle or block until one is available."""
682         self.adjust_task_count(+1)
683         uuid = bundle.uuid
684         hostname = bundle.hostname
685         avoid_machine = override_avoid_machine
686         is_original = bundle.src_bundle is None
687
688         # Try not to schedule a backup on the same host as the original.
689         if avoid_machine is None and bundle.src_bundle is not None:
690             avoid_machine = bundle.src_bundle.machine
691         worker = None
692         while worker is None:
693             worker = self.find_available_worker_or_block(avoid_machine)
694         assert worker
695
696         # Ok, found a worker.
697         bundle.worker = worker
698         machine = bundle.machine = worker.machine
699         username = bundle.username = worker.username
700         self.status.record_acquire_worker(worker, uuid)
701         logger.debug(f'{bundle}: Running bundle on {worker}...')
702
703         # Before we do any work, make sure the bundle is still viable.
704         # It may have been some time between when it was submitted and
705         # now due to lack of worker availability and someone else may
706         # have already finished it.
707         if self.check_if_cancelled(bundle):
708             try:
709                 return self.process_work_result(bundle)
710             except Exception as e:
711                 logger.warning(
712                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
713                 )
714                 self.release_worker(bundle)
715                 if is_original:
716                     # Weird.  We are the original owner of this
717                     # bundle.  For it to have been cancelled, a backup
718                     # must have already started and completed before
719                     # we even for started.  Moreover, the backup says
720                     # it is done but we can't find the results it
721                     # should have copied over.  Reschedule the whole
722                     # thing.
723                     logger.exception(e)
724                     logger.error(
725                         f'{bundle}: We are the original owner thread and yet there are '
726                         + 'no results for this bundle.  This is unexpected and bad.'
727                     )
728                     return self.emergency_retry_nasty_bundle(bundle)
729                 else:
730                     # Expected(?).  We're a backup and our bundle is
731                     # cancelled before we even got started.  Something
732                     # went bad in process_work_result (I acutually don't
733                     # see what?) but probably not worth worrying
734                     # about.  Let the original thread worry about
735                     # either finding the results or complaining about
736                     # it.
737                     return None
738
739         # Send input code / data to worker machine if it's not local.
740         if hostname not in machine:
741             try:
742                 cmd = (
743                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
744                 )
745                 start_ts = time.time()
746                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
747                 run_silently(cmd)
748                 xfer_latency = time.time() - start_ts
749                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
750             except Exception as e:
751                 self.release_worker(bundle)
752                 if is_original:
753                     # Weird.  We tried to copy the code to the worker and it failed...
754                     # And we're the original bundle.  We have to retry.
755                     logger.exception(e)
756                     logger.error(
757                         f"{bundle}: Failed to send instructions to the worker machine?! "
758                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
759                         + "be a race condition.  Attempting an emergency retry..."
760                     )
761                     return self.emergency_retry_nasty_bundle(bundle)
762                 else:
763                     # This is actually expected; we're a backup.
764                     # There's a race condition where someone else
765                     # already finished the work and removed the source
766                     # code file before we could copy it.  No biggie.
767                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
768                     msg += 'We\'re a backup and this may be caused by the original (or some '
769                     msg += 'other backup) already finishing this work.  Ignoring this.'
770                     logger.warning(msg)
771                     return None
772
773         # Kick off the work.  Note that if this fails we let
774         # wait_for_process deal with it.
775         self.status.record_processing_began(uuid)
776         cmd = (
777             f'{SSH} {bundle.username}@{bundle.machine} '
778             f'"source py38-venv/bin/activate &&'
779             f' /home/scott/lib/python_modules/remote_worker.py'
780             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
781         )
782         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
783         p = cmd_in_background(cmd, silent=True)
784         bundle.pid = p.pid
785         logger.debug(
786             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
787         )
788         return self.wait_for_process(p, bundle, 0)
789
790     def wait_for_process(
791         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
792     ) -> Any:
793         machine = bundle.machine
794         pid = p.pid
795         if depth > 3:
796             logger.error(
797                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
798             )
799             p.terminate()
800             self.release_worker(bundle)
801             return self.emergency_retry_nasty_bundle(bundle)
802
803         # Spin until either the ssh job we scheduled finishes the
804         # bundle or some backup worker signals that they finished it
805         # before we could.
806         while True:
807             try:
808                 p.wait(timeout=0.25)
809             except subprocess.TimeoutExpired:
810                 if self.check_if_cancelled(bundle):
811                     logger.info(
812                         f'{bundle}: looks like another worker finished bundle...'
813                     )
814                     break
815             else:
816                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
817                 p = None
818                 break
819
820         # If we get here we believe the bundle is done; either the ssh
821         # subprocess finished (hopefully successfully) or we noticed
822         # that some other worker seems to have completed the bundle
823         # and we're bailing out.
824         try:
825             ret = self.process_work_result(bundle)
826             if ret is not None and p is not None:
827                 p.terminate()
828             return ret
829
830         # Something went wrong; e.g. we could not copy the results
831         # back, cleanup after ourselves on the remote machine, or
832         # unpickle the results we got from the remove machine.  If we
833         # still have an active ssh subprocess, keep waiting on it.
834         # Otherwise, time for an emergency reschedule.
835         except Exception as e:
836             logger.exception(e)
837             logger.error(f'{bundle}: Something unexpected just happened...')
838             if p is not None:
839                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
840                 logger.warning(msg)
841                 return self.wait_for_process(p, bundle, depth + 1)
842             else:
843                 self.release_worker(bundle)
844                 return self.emergency_retry_nasty_bundle(bundle)
845
846     def process_work_result(self, bundle: BundleDetails) -> Any:
847         with self.status.lock:
848             is_original = bundle.src_bundle is None
849             was_cancelled = bundle.was_cancelled
850             username = bundle.username
851             machine = bundle.machine
852             result_file = bundle.result_file
853             code_file = bundle.code_file
854
855             # Whether original or backup, if we finished first we must
856             # fetch the results if the computation happened on a
857             # remote machine.
858             bundle.end_ts = time.time()
859             if not was_cancelled:
860                 assert bundle.machine is not None
861                 if bundle.hostname not in bundle.machine:
862                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
863                     logger.info(
864                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
865                     )
866
867                     # If either of these throw they are handled in
868                     # wait_for_process.
869                     attempts = 0
870                     while True:
871                         try:
872                             run_silently(cmd)
873                         except Exception as e:
874                             attempts += 1
875                             if attempts >= 3:
876                                 raise (e)
877                         else:
878                             break
879
880                     run_silently(
881                         f'{SSH} {username}@{machine}'
882                         f' "/bin/rm -f {code_file} {result_file}"'
883                     )
884                     logger.debug(
885                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
886                     )
887                 dur = bundle.end_ts - bundle.start_ts
888                 self.histogram.add_item(dur)
889
890         # Only the original worker should unpickle the file contents
891         # though since it's the only one whose result matters.  The
892         # original is also the only job that may delete result_file
893         # from disk.  Note that the original may have been cancelled
894         # if one of the backups finished first; it still must read the
895         # result from disk.
896         if is_original:
897             logger.debug(f"{bundle}: Unpickling {result_file}.")
898             try:
899                 with open(result_file, 'rb') as rb:
900                     serialized = rb.read()
901                 result = cloudpickle.loads(serialized)
902             except Exception as e:
903                 logger.exception(e)
904                 msg = f'Failed to load {result_file}... this is bad news.'
905                 logger.critical(msg)
906                 self.release_worker(bundle)
907
908                 # Re-raise the exception; the code in wait_for_process may
909                 # decide to emergency_retry_nasty_bundle here.
910                 raise Exception(e)
911             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
912             os.remove(f'{result_file}')
913             os.remove(f'{code_file}')
914
915             # Notify any backups that the original is done so they
916             # should stop ASAP.  Do this whether or not we
917             # finished first since there could be more than one
918             # backup.
919             if bundle.backup_bundles is not None:
920                 for backup in bundle.backup_bundles:
921                     logger.debug(
922                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
923                     )
924                     backup.is_cancelled.set()
925
926         # This is a backup job and, by now, we have already fetched
927         # the bundle results.
928         else:
929             # Backup results don't matter, they just need to leave the
930             # result file in the right place for their originals to
931             # read/unpickle later.
932             result = None
933
934             # Tell the original to stop if we finished first.
935             if not was_cancelled:
936                 logger.debug(
937                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
938                 )
939                 bundle.src_bundle.is_cancelled.set()
940         self.release_worker(bundle, was_cancelled=was_cancelled)
941         return result
942
943     def create_original_bundle(self, pickle, fname: str):
944         from string_utils import generate_uuid
945
946         uuid = generate_uuid(omit_dashes=True)
947         code_file = f'/tmp/{uuid}.code.bin'
948         result_file = f'/tmp/{uuid}.result.bin'
949
950         logger.debug(f'Writing pickled code to {code_file}')
951         with open(f'{code_file}', 'wb') as wb:
952             wb.write(pickle)
953
954         bundle = BundleDetails(
955             pickled_code=pickle,
956             uuid=uuid,
957             fname=fname,
958             worker=None,
959             username=None,
960             machine=None,
961             hostname=platform.node(),
962             code_file=code_file,
963             result_file=result_file,
964             pid=0,
965             start_ts=time.time(),
966             end_ts=0.0,
967             slower_than_local_p95=False,
968             slower_than_global_p95=False,
969             src_bundle=None,
970             is_cancelled=threading.Event(),
971             was_cancelled=False,
972             backup_bundles=[],
973             failure_count=0,
974         )
975         self.status.record_bundle_details(bundle)
976         logger.debug(f'{bundle}: Created an original bundle')
977         return bundle
978
979     def create_backup_bundle(self, src_bundle: BundleDetails):
980         assert src_bundle.backup_bundles is not None
981         n = len(src_bundle.backup_bundles)
982         uuid = src_bundle.uuid + f'_backup#{n}'
983
984         backup_bundle = BundleDetails(
985             pickled_code=src_bundle.pickled_code,
986             uuid=uuid,
987             fname=src_bundle.fname,
988             worker=None,
989             username=None,
990             machine=None,
991             hostname=src_bundle.hostname,
992             code_file=src_bundle.code_file,
993             result_file=src_bundle.result_file,
994             pid=0,
995             start_ts=time.time(),
996             end_ts=0.0,
997             slower_than_local_p95=False,
998             slower_than_global_p95=False,
999             src_bundle=src_bundle,
1000             is_cancelled=threading.Event(),
1001             was_cancelled=False,
1002             backup_bundles=None,  # backup backups not allowed
1003             failure_count=0,
1004         )
1005         src_bundle.backup_bundles.append(backup_bundle)
1006         self.status.record_bundle_details_already_locked(backup_bundle)
1007         logger.debug(f'{backup_bundle}: Created a backup bundle')
1008         return backup_bundle
1009
1010     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1011         assert self.status.lock.locked()
1012         assert src_bundle is not None
1013         backup_bundle = self.create_backup_bundle(src_bundle)
1014         logger.debug(
1015             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1016         )
1017         self._helper_executor.submit(self.launch, backup_bundle)
1018
1019         # Results from backups don't matter; if they finish first
1020         # they will move the result_file to this machine and let
1021         # the original pick them up and unpickle them.
1022
1023     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1024         is_original = bundle.src_bundle is None
1025         bundle.worker = None
1026         avoid_last_machine = bundle.machine
1027         bundle.machine = None
1028         bundle.username = None
1029         bundle.failure_count += 1
1030         if is_original:
1031             retry_limit = 3
1032         else:
1033             retry_limit = 2
1034
1035         if bundle.failure_count > retry_limit:
1036             logger.error(
1037                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1038             )
1039             if is_original:
1040                 raise RemoteExecutorException(
1041                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1042                 )
1043             else:
1044                 logger.error(
1045                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1046                 )
1047             return None
1048         else:
1049             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1050             logger.warning(msg)
1051             warnings.warn(msg)
1052             return self.launch(bundle, avoid_last_machine)
1053
1054     @overrides
1055     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1056         pickle = make_cloud_pickle(function, *args, **kwargs)
1057         bundle = self.create_original_bundle(pickle, function.__name__)
1058         self.total_bundles_submitted += 1
1059         return self._helper_executor.submit(self.launch, bundle)
1060
1061     @overrides
1062     def shutdown(self, wait=True) -> None:
1063         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1064         self.heartbeat_stop_event.set()
1065         self.heartbeat_thread.join()
1066         self._helper_executor.shutdown(wait)
1067         print(self.histogram)
1068
1069
1070 @singleton
1071 class DefaultExecutors(object):
1072     def __init__(self):
1073         self.thread_executor: Optional[ThreadExecutor] = None
1074         self.process_executor: Optional[ProcessExecutor] = None
1075         self.remote_executor: Optional[RemoteExecutor] = None
1076
1077     def ping(self, host) -> bool:
1078         logger.debug(f'RUN> ping -c 1 {host}')
1079         try:
1080             x = cmd_with_timeout(
1081                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1082             )
1083             return x == 0
1084         except Exception:
1085             return False
1086
1087     def thread_pool(self) -> ThreadExecutor:
1088         if self.thread_executor is None:
1089             self.thread_executor = ThreadExecutor()
1090         return self.thread_executor
1091
1092     def process_pool(self) -> ProcessExecutor:
1093         if self.process_executor is None:
1094             self.process_executor = ProcessExecutor()
1095         return self.process_executor
1096
1097     def remote_pool(self) -> RemoteExecutor:
1098         if self.remote_executor is None:
1099             logger.info('Looking for some helper machines...')
1100             pool: List[RemoteWorkerRecord] = []
1101             if self.ping('cheetah.house'):
1102                 logger.info('Found cheetah.house')
1103                 pool.append(
1104                     RemoteWorkerRecord(
1105                         username='scott',
1106                         machine='cheetah.house',
1107                         weight=34,
1108                         count=6,
1109                     ),
1110                 )
1111             if self.ping('meerkat.cabin'):
1112                 logger.info('Found meerkat.cabin')
1113                 pool.append(
1114                     RemoteWorkerRecord(
1115                         username='scott',
1116                         machine='meerkat.cabin',
1117                         weight=12,
1118                         count=2,
1119                     ),
1120                 )
1121             if self.ping('wannabe.house'):
1122                 logger.info('Found wannabe.house')
1123                 pool.append(
1124                     RemoteWorkerRecord(
1125                         username='scott',
1126                         machine='wannabe.house',
1127                         weight=25,
1128                         count=10,
1129                     ),
1130                 )
1131             if self.ping('puma.cabin'):
1132                 logger.info('Found puma.cabin')
1133                 pool.append(
1134                     RemoteWorkerRecord(
1135                         username='scott',
1136                         machine='puma.cabin',
1137                         weight=25,
1138                         count=6,
1139                     ),
1140                 )
1141             if self.ping('backup.house'):
1142                 logger.info('Found backup.house')
1143                 pool.append(
1144                     RemoteWorkerRecord(
1145                         username='scott',
1146                         machine='backup.house',
1147                         weight=7,
1148                         count=2,
1149                     ),
1150                 )
1151
1152             # The controller machine has a lot to do; go easy on it.
1153             for record in pool:
1154                 if record.machine == platform.node() and record.count > 1:
1155                     logger.info(f'Reducing workload for {record.machine}.')
1156                     record.count = 1
1157
1158             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1159             policy.register_worker_pool(pool)
1160             self.remote_executor = RemoteExecutor(pool, policy)
1161         return self.remote_executor
1162
1163     def shutdown(self) -> None:
1164         if self.thread_executor is not None:
1165             self.thread_executor.shutdown()
1166             self.thread_executor = None
1167         if self.process_executor is not None:
1168             self.process_executor.shutdown()
1169             self.process_executor = None
1170         if self.remote_executor is not None:
1171             self.remote_executor.shutdown()
1172             self.remote_executor = None