Ran black code formatter on everything.
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.task_count = 0
78         self.histogram = hist.SimpleHistogram(
79             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
80         )
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, wait: bool = True) -> None:
88         pass
89
90     def adjust_task_count(self, delta: int) -> None:
91         self.task_count += delta
92         logger.debug(f'Executor current task count is {self.task_count}')
93
94
95 class ThreadExecutor(BaseExecutor):
96     def __init__(self, max_workers: Optional[int] = None):
97         super().__init__()
98         workers = None
99         if max_workers is not None:
100             workers = max_workers
101         elif 'executors_threadpool_size' in config.config:
102             workers = config.config['executors_threadpool_size']
103         logger.debug(f'Creating threadpool executor with {workers} workers')
104         self._thread_pool_executor = fut.ThreadPoolExecutor(
105             max_workers=workers, thread_name_prefix="thread_executor_helper"
106         )
107
108     def run_local_bundle(self, fun, *args, **kwargs):
109         logger.debug(f"Running local bundle at {fun.__name__}")
110         start = time.time()
111         result = fun(*args, **kwargs)
112         end = time.time()
113         self.adjust_task_count(-1)
114         duration = end - start
115         logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
116         self.histogram.add_item(duration)
117         return result
118
119     @overrides
120     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
121         self.adjust_task_count(+1)
122         newargs = []
123         newargs.append(function)
124         for arg in args:
125             newargs.append(arg)
126         return self._thread_pool_executor.submit(
127             self.run_local_bundle, *newargs, **kwargs
128         )
129
130     @overrides
131     def shutdown(self, wait=True) -> None:
132         logger.debug(f'Shutting down threadpool executor {self.title}')
133         print(self.histogram)
134         self._thread_pool_executor.shutdown(wait)
135
136
137 class ProcessExecutor(BaseExecutor):
138     def __init__(self, max_workers=None):
139         super().__init__()
140         workers = None
141         if max_workers is not None:
142             workers = max_workers
143         elif 'executors_processpool_size' in config.config:
144             workers = config.config['executors_processpool_size']
145         logger.debug(f'Creating processpool executor with {workers} workers.')
146         self._process_executor = fut.ProcessPoolExecutor(
147             max_workers=workers,
148         )
149
150     def run_cloud_pickle(self, pickle):
151         fun, args, kwargs = cloudpickle.loads(pickle)
152         logger.debug(f"Running pickled bundle at {fun.__name__}")
153         result = fun(*args, **kwargs)
154         self.adjust_task_count(-1)
155         return result
156
157     @overrides
158     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
159         start = time.time()
160         self.adjust_task_count(+1)
161         pickle = make_cloud_pickle(function, *args, **kwargs)
162         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
163         result.add_done_callback(
164             lambda _: self.histogram.add_item(time.time() - start)
165         )
166         return result
167
168     @overrides
169     def shutdown(self, wait=True) -> None:
170         logger.debug(f'Shutting down processpool executor {self.title}')
171         self._process_executor.shutdown(wait)
172         print(self.histogram)
173
174     def __getstate__(self):
175         state = self.__dict__.copy()
176         state['_process_executor'] = None
177         return state
178
179
180 class RemoteExecutorException(Exception):
181     """Thrown when a bundle cannot be executed despite several retries."""
182
183     pass
184
185
186 @dataclass
187 class RemoteWorkerRecord:
188     username: str
189     machine: str
190     weight: int
191     count: int
192
193     def __hash__(self):
194         return hash((self.username, self.machine))
195
196     def __repr__(self):
197         return f'{self.username}@{self.machine}'
198
199
200 @dataclass
201 class BundleDetails:
202     pickled_code: bytes
203     uuid: str
204     fname: str
205     worker: Optional[RemoteWorkerRecord]
206     username: Optional[str]
207     machine: Optional[str]
208     hostname: str
209     code_file: str
210     result_file: str
211     pid: int
212     start_ts: float
213     end_ts: float
214     slower_than_local_p95: bool
215     slower_than_global_p95: bool
216     src_bundle: BundleDetails
217     is_cancelled: threading.Event
218     was_cancelled: bool
219     backup_bundles: Optional[List[BundleDetails]]
220     failure_count: int
221
222     def __repr__(self):
223         uuid = self.uuid
224         if uuid[-9:-2] == '_backup':
225             uuid = uuid[:-9]
226             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
227         else:
228             suffix = uuid[-6:]
229
230         colorz = [
231             fg('violet red'),
232             fg('red'),
233             fg('orange'),
234             fg('peach orange'),
235             fg('yellow'),
236             fg('marigold yellow'),
237             fg('green yellow'),
238             fg('tea green'),
239             fg('cornflower blue'),
240             fg('turquoise blue'),
241             fg('tropical blue'),
242             fg('lavender purple'),
243             fg('medium purple'),
244         ]
245         c = colorz[int(uuid[-2:], 16) % len(colorz)]
246         fname = self.fname if self.fname is not None else 'nofname'
247         machine = self.machine if self.machine is not None else 'nomachine'
248         return f'{c}{suffix}/{fname}/{machine}{reset()}'
249
250
251 class RemoteExecutorStatus:
252     def __init__(self, total_worker_count: int) -> None:
253         self.worker_count: int = total_worker_count
254         self.known_workers: Set[RemoteWorkerRecord] = set()
255         self.start_time: float = time.time()
256         self.start_per_bundle: Dict[str, float] = defaultdict(float)
257         self.end_per_bundle: Dict[str, float] = defaultdict(float)
258         self.finished_bundle_timings_per_worker: Dict[
259             RemoteWorkerRecord, List[float]
260         ] = {}
261         self.in_flight_bundles_by_worker: Dict[
262             RemoteWorkerRecord, Set[str]
263         ] = {}
264         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
265         self.finished_bundle_timings: List[float] = []
266         self.last_periodic_dump: Optional[float] = None
267         self.total_bundles_submitted: int = 0
268
269         # Protects reads and modification using self.  Also used
270         # as a memory fence for modifications to bundle.
271         self.lock: threading.Lock = threading.Lock()
272
273     def record_acquire_worker(
274         self, worker: RemoteWorkerRecord, uuid: str
275     ) -> None:
276         with self.lock:
277             self.record_acquire_worker_already_locked(worker, uuid)
278
279     def record_acquire_worker_already_locked(
280         self, worker: RemoteWorkerRecord, uuid: str
281     ) -> None:
282         assert self.lock.locked()
283         self.known_workers.add(worker)
284         self.start_per_bundle[uuid] = None
285         x = self.in_flight_bundles_by_worker.get(worker, set())
286         x.add(uuid)
287         self.in_flight_bundles_by_worker[worker] = x
288
289     def record_bundle_details(self, details: BundleDetails) -> None:
290         with self.lock:
291             self.record_bundle_details_already_locked(details)
292
293     def record_bundle_details_already_locked(
294         self, details: BundleDetails
295     ) -> None:
296         assert self.lock.locked()
297         self.bundle_details_by_uuid[details.uuid] = details
298
299     def record_release_worker(
300         self,
301         worker: RemoteWorkerRecord,
302         uuid: str,
303         was_cancelled: bool,
304     ) -> None:
305         with self.lock:
306             self.record_release_worker_already_locked(
307                 worker, uuid, was_cancelled
308             )
309
310     def record_release_worker_already_locked(
311         self,
312         worker: RemoteWorkerRecord,
313         uuid: str,
314         was_cancelled: bool,
315     ) -> None:
316         assert self.lock.locked()
317         ts = time.time()
318         self.end_per_bundle[uuid] = ts
319         self.in_flight_bundles_by_worker[worker].remove(uuid)
320         if not was_cancelled:
321             bundle_latency = ts - self.start_per_bundle[uuid]
322             x = self.finished_bundle_timings_per_worker.get(worker, list())
323             x.append(bundle_latency)
324             self.finished_bundle_timings_per_worker[worker] = x
325             self.finished_bundle_timings.append(bundle_latency)
326
327     def record_processing_began(self, uuid: str):
328         with self.lock:
329             self.start_per_bundle[uuid] = time.time()
330
331     def total_in_flight(self) -> int:
332         assert self.lock.locked()
333         total_in_flight = 0
334         for worker in self.known_workers:
335             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
336         return total_in_flight
337
338     def total_idle(self) -> int:
339         assert self.lock.locked()
340         return self.worker_count - self.total_in_flight()
341
342     def __repr__(self):
343         assert self.lock.locked()
344         ts = time.time()
345         total_finished = len(self.finished_bundle_timings)
346         total_in_flight = self.total_in_flight()
347         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
348         qall = None
349         if len(self.finished_bundle_timings) > 1:
350             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
351             ret += (
352                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
353                 f'✅={total_finished}/{self.total_bundles_submitted}, '
354                 f'💻n={total_in_flight}/{self.worker_count}\n'
355             )
356         else:
357             ret += (
358                 f'⏱={ts-self.start_time:.1f}s, '
359                 f'✅={total_finished}/{self.total_bundles_submitted}, '
360                 f'💻n={total_in_flight}/{self.worker_count}\n'
361             )
362
363         for worker in self.known_workers:
364             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
365             timings = self.finished_bundle_timings_per_worker.get(worker, [])
366             count = len(timings)
367             qworker = None
368             if count > 1:
369                 qworker = numpy.quantile(timings, [0.5, 0.95])
370                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
371             else:
372                 ret += '\n'
373             if count > 0:
374                 ret += f'    ...finished {count} total bundle(s) so far\n'
375             in_flight = len(self.in_flight_bundles_by_worker[worker])
376             if in_flight > 0:
377                 ret += f'    ...{in_flight} bundles currently in flight:\n'
378                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
379                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
380                     pid = (
381                         str(details.pid)
382                         if (details and details.pid != 0)
383                         else "TBD"
384                     )
385                     if self.start_per_bundle[bundle_uuid] is not None:
386                         sec = ts - self.start_per_bundle[bundle_uuid]
387                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
388                     else:
389                         ret += f'       {details} setting up / copying data...'
390                         sec = 0.0
391
392                     if qworker is not None:
393                         if sec > qworker[1]:
394                             ret += f'{bg("red")}>💻p95{reset()} '
395                             if details is not None:
396                                 details.slower_than_local_p95 = True
397                         else:
398                             if details is not None:
399                                 details.slower_than_local_p95 = False
400
401                     if qall is not None:
402                         if sec > qall[1]:
403                             ret += f'{bg("red")}>∀p95{reset()} '
404                             if details is not None:
405                                 details.slower_than_global_p95 = True
406                         else:
407                             details.slower_than_global_p95 = False
408                     ret += '\n'
409         return ret
410
411     def periodic_dump(self, total_bundles_submitted: int) -> None:
412         assert self.lock.locked()
413         self.total_bundles_submitted = total_bundles_submitted
414         ts = time.time()
415         if (
416             self.last_periodic_dump is None
417             or ts - self.last_periodic_dump > 5.0
418         ):
419             print(self)
420             self.last_periodic_dump = ts
421
422
423 class RemoteWorkerSelectionPolicy(ABC):
424     def register_worker_pool(self, workers):
425         self.workers = workers
426
427     @abstractmethod
428     def is_worker_available(self) -> bool:
429         pass
430
431     @abstractmethod
432     def acquire_worker(
433         self, machine_to_avoid=None
434     ) -> Optional[RemoteWorkerRecord]:
435         pass
436
437
438 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
439     @overrides
440     def is_worker_available(self) -> bool:
441         for worker in self.workers:
442             if worker.count > 0:
443                 return True
444         return False
445
446     @overrides
447     def acquire_worker(
448         self, machine_to_avoid=None
449     ) -> Optional[RemoteWorkerRecord]:
450         grabbag = []
451         for worker in self.workers:
452             for x in range(0, worker.count):
453                 for y in range(0, worker.weight):
454                     grabbag.append(worker)
455
456         for _ in range(0, 5):
457             random.shuffle(grabbag)
458             worker = grabbag[0]
459             if worker.machine != machine_to_avoid or _ > 2:
460                 if worker.count > 0:
461                     worker.count -= 1
462                     logger.debug(f'Selected worker {worker}')
463                     return worker
464         msg = 'Unexpectedly could not find a worker, retrying...'
465         logger.warning(msg)
466         return None
467
468
469 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
470     def __init__(self) -> None:
471         self.index = 0
472
473     @overrides
474     def is_worker_available(self) -> bool:
475         for worker in self.workers:
476             if worker.count > 0:
477                 return True
478         return False
479
480     @overrides
481     def acquire_worker(
482         self, machine_to_avoid: str = None
483     ) -> Optional[RemoteWorkerRecord]:
484         x = self.index
485         while True:
486             worker = self.workers[x]
487             if worker.count > 0:
488                 worker.count -= 1
489                 x += 1
490                 if x >= len(self.workers):
491                     x = 0
492                 self.index = x
493                 logger.debug(f'Selected worker {worker}')
494                 return worker
495             x += 1
496             if x >= len(self.workers):
497                 x = 0
498             if x == self.index:
499                 msg = 'Unexpectedly could not find a worker, retrying...'
500                 logger.warning(msg)
501                 return None
502
503
504 class RemoteExecutor(BaseExecutor):
505     def __init__(
506         self,
507         workers: List[RemoteWorkerRecord],
508         policy: RemoteWorkerSelectionPolicy,
509     ) -> None:
510         super().__init__()
511         self.workers = workers
512         self.policy = policy
513         self.worker_count = 0
514         for worker in self.workers:
515             self.worker_count += worker.count
516         if self.worker_count <= 0:
517             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
518             logger.critical(msg)
519             raise RemoteExecutorException(msg)
520         self.policy.register_worker_pool(self.workers)
521         self.cv = threading.Condition()
522         logger.debug(
523             f'Creating {self.worker_count} local threads, one per remote worker.'
524         )
525         self._helper_executor = fut.ThreadPoolExecutor(
526             thread_name_prefix="remote_executor_helper",
527             max_workers=self.worker_count,
528         )
529         self.status = RemoteExecutorStatus(self.worker_count)
530         self.total_bundles_submitted = 0
531         self.backup_lock = threading.Lock()
532         self.last_backup = None
533         (
534             self.heartbeat_thread,
535             self.heartbeat_stop_event,
536         ) = self.run_periodic_heartbeat()
537
538     @background_thread
539     def run_periodic_heartbeat(self, stop_event) -> None:
540         while not stop_event.is_set():
541             time.sleep(5.0)
542             logger.debug('Running periodic heartbeat code...')
543             self.heartbeat()
544         logger.debug('Periodic heartbeat thread shutting down.')
545
546     def heartbeat(self) -> None:
547         with self.status.lock:
548             # Dump regular progress report
549             self.status.periodic_dump(self.total_bundles_submitted)
550
551             # Look for bundles to reschedule via executor.submit
552             if config.config['executors_schedule_remote_backups']:
553                 self.maybe_schedule_backup_bundles()
554
555     def maybe_schedule_backup_bundles(self):
556         assert self.status.lock.locked()
557         num_done = len(self.status.finished_bundle_timings)
558         num_idle_workers = self.worker_count - self.task_count
559         now = time.time()
560         if (
561             num_done > 2
562             and num_idle_workers > 1
563             and (self.last_backup is None or (now - self.last_backup > 6.0))
564             and self.backup_lock.acquire(blocking=False)
565         ):
566             try:
567                 assert self.backup_lock.locked()
568
569                 bundle_to_backup = None
570                 best_score = None
571                 for (
572                     worker,
573                     bundle_uuids,
574                 ) in self.status.in_flight_bundles_by_worker.items():
575
576                     # Prefer to schedule backups of bundles running on
577                     # slower machines.
578                     base_score = 0
579                     for record in self.workers:
580                         if worker.machine == record.machine:
581                             base_score = float(record.weight)
582                             base_score = 1.0 / base_score
583                             base_score *= 200.0
584                             base_score = int(base_score)
585                             break
586
587                     for uuid in bundle_uuids:
588                         bundle = self.status.bundle_details_by_uuid.get(
589                             uuid, None
590                         )
591                         if (
592                             bundle is not None
593                             and bundle.src_bundle is None
594                             and bundle.backup_bundles is not None
595                         ):
596                             score = base_score
597
598                             # Schedule backups of bundles running
599                             # longer; especially those that are
600                             # unexpectedly slow.
601                             start_ts = self.status.start_per_bundle[uuid]
602                             if start_ts is not None:
603                                 runtime = now - start_ts
604                                 score += runtime
605                                 logger.debug(
606                                     f'score[{bundle}] => {score}  # latency boost'
607                                 )
608
609                                 if bundle.slower_than_local_p95:
610                                     score += runtime / 2
611                                     logger.debug(
612                                         f'score[{bundle}] => {score}  # >worker p95'
613                                     )
614
615                                 if bundle.slower_than_global_p95:
616                                     score += runtime / 4
617                                     logger.debug(
618                                         f'score[{bundle}] => {score}  # >global p95'
619                                     )
620
621                             # Prefer backups of bundles that don't
622                             # have backups already.
623                             backup_count = len(bundle.backup_bundles)
624                             if backup_count == 0:
625                                 score *= 2
626                             elif backup_count == 1:
627                                 score /= 2
628                             elif backup_count == 2:
629                                 score /= 8
630                             else:
631                                 score = 0
632                             logger.debug(
633                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
634                             )
635
636                             if score != 0 and (
637                                 best_score is None or score > best_score
638                             ):
639                                 bundle_to_backup = bundle
640                                 assert bundle is not None
641                                 assert bundle.backup_bundles is not None
642                                 assert bundle.src_bundle is None
643                                 best_score = score
644
645                 # Note: this is all still happening on the heartbeat
646                 # runner thread.  That's ok because
647                 # schedule_backup_for_bundle uses the executor to
648                 # submit the bundle again which will cause it to be
649                 # picked up by a worker thread and allow this thread
650                 # to return to run future heartbeats.
651                 if bundle_to_backup is not None:
652                     self.last_backup = now
653                     logger.info(
654                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
655                     )
656                     self.schedule_backup_for_bundle(bundle_to_backup)
657             finally:
658                 self.backup_lock.release()
659
660     def is_worker_available(self) -> bool:
661         return self.policy.is_worker_available()
662
663     def acquire_worker(
664         self, machine_to_avoid: str = None
665     ) -> Optional[RemoteWorkerRecord]:
666         return self.policy.acquire_worker(machine_to_avoid)
667
668     def find_available_worker_or_block(
669         self, machine_to_avoid: str = None
670     ) -> RemoteWorkerRecord:
671         with self.cv:
672             while not self.is_worker_available():
673                 self.cv.wait()
674             worker = self.acquire_worker(machine_to_avoid)
675             if worker is not None:
676                 return worker
677         msg = "We should never reach this point in the code"
678         logger.critical(msg)
679         raise Exception(msg)
680
681     def release_worker(
682         self, bundle: BundleDetails, *, was_cancelled=True
683     ) -> None:
684         worker = bundle.worker
685         assert worker is not None
686         logger.debug(f'Released worker {worker}')
687         self.status.record_release_worker(
688             worker,
689             bundle.uuid,
690             was_cancelled,
691         )
692         with self.cv:
693             worker.count += 1
694             self.cv.notify()
695         self.adjust_task_count(-1)
696
697     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
698         with self.status.lock:
699             if bundle.is_cancelled.wait(timeout=0.0):
700                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
701                 bundle.was_cancelled = True
702                 return True
703         return False
704
705     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
706         """Find a worker for bundle or block until one is available."""
707         self.adjust_task_count(+1)
708         uuid = bundle.uuid
709         hostname = bundle.hostname
710         avoid_machine = override_avoid_machine
711         is_original = bundle.src_bundle is None
712
713         # Try not to schedule a backup on the same host as the original.
714         if avoid_machine is None and bundle.src_bundle is not None:
715             avoid_machine = bundle.src_bundle.machine
716         worker = None
717         while worker is None:
718             worker = self.find_available_worker_or_block(avoid_machine)
719         assert worker
720
721         # Ok, found a worker.
722         bundle.worker = worker
723         machine = bundle.machine = worker.machine
724         username = bundle.username = worker.username
725         self.status.record_acquire_worker(worker, uuid)
726         logger.debug(f'{bundle}: Running bundle on {worker}...')
727
728         # Before we do any work, make sure the bundle is still viable.
729         # It may have been some time between when it was submitted and
730         # now due to lack of worker availability and someone else may
731         # have already finished it.
732         if self.check_if_cancelled(bundle):
733             try:
734                 return self.process_work_result(bundle)
735             except Exception as e:
736                 logger.warning(
737                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
738                 )
739                 self.release_worker(bundle)
740                 if is_original:
741                     # Weird.  We are the original owner of this
742                     # bundle.  For it to have been cancelled, a backup
743                     # must have already started and completed before
744                     # we even for started.  Moreover, the backup says
745                     # it is done but we can't find the results it
746                     # should have copied over.  Reschedule the whole
747                     # thing.
748                     logger.exception(e)
749                     logger.error(
750                         f'{bundle}: We are the original owner thread and yet there are '
751                         + 'no results for this bundle.  This is unexpected and bad.'
752                     )
753                     return self.emergency_retry_nasty_bundle(bundle)
754                 else:
755                     # Expected(?).  We're a backup and our bundle is
756                     # cancelled before we even got started.  Something
757                     # went bad in process_work_result (I acutually don't
758                     # see what?) but probably not worth worrying
759                     # about.  Let the original thread worry about
760                     # either finding the results or complaining about
761                     # it.
762                     return None
763
764         # Send input code / data to worker machine if it's not local.
765         if hostname not in machine:
766             try:
767                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
768                 start_ts = time.time()
769                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
770                 run_silently(cmd)
771                 xfer_latency = time.time() - start_ts
772                 logger.debug(
773                     f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s."
774                 )
775             except Exception as e:
776                 self.release_worker(bundle)
777                 if is_original:
778                     # Weird.  We tried to copy the code to the worker and it failed...
779                     # And we're the original bundle.  We have to retry.
780                     logger.exception(e)
781                     logger.error(
782                         f"{bundle}: Failed to send instructions to the worker machine?! "
783                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
784                         + "be a race condition.  Attempting an emergency retry..."
785                     )
786                     return self.emergency_retry_nasty_bundle(bundle)
787                 else:
788                     # This is actually expected; we're a backup.
789                     # There's a race condition where someone else
790                     # already finished the work and removed the source
791                     # code file before we could copy it.  No biggie.
792                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
793                     msg += 'We\'re a backup and this may be caused by the original (or some '
794                     msg += 'other backup) already finishing this work.  Ignoring this.'
795                     logger.warning(msg)
796                     return None
797
798         # Kick off the work.  Note that if this fails we let
799         # wait_for_process deal with it.
800         self.status.record_processing_began(uuid)
801         cmd = (
802             f'{SSH} {bundle.username}@{bundle.machine} '
803             f'"source py38-venv/bin/activate &&'
804             f' /home/scott/lib/python_modules/remote_worker.py'
805             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
806         )
807         logger.debug(
808             f'{bundle}: Executing {cmd} in the background to kick off work...'
809         )
810         p = cmd_in_background(cmd, silent=True)
811         bundle.pid = p.pid
812         logger.debug(
813             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
814         )
815         return self.wait_for_process(p, bundle, 0)
816
817     def wait_for_process(
818         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
819     ) -> Any:
820         machine = bundle.machine
821         pid = p.pid
822         if depth > 3:
823             logger.error(
824                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
825             )
826             p.terminate()
827             self.release_worker(bundle)
828             return self.emergency_retry_nasty_bundle(bundle)
829
830         # Spin until either the ssh job we scheduled finishes the
831         # bundle or some backup worker signals that they finished it
832         # before we could.
833         while True:
834             try:
835                 p.wait(timeout=0.25)
836             except subprocess.TimeoutExpired:
837                 if self.check_if_cancelled(bundle):
838                     logger.info(
839                         f'{bundle}: looks like another worker finished bundle...'
840                     )
841                     break
842             else:
843                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
844                 p = None
845                 break
846
847         # If we get here we believe the bundle is done; either the ssh
848         # subprocess finished (hopefully successfully) or we noticed
849         # that some other worker seems to have completed the bundle
850         # and we're bailing out.
851         try:
852             ret = self.process_work_result(bundle)
853             if ret is not None and p is not None:
854                 p.terminate()
855             return ret
856
857         # Something went wrong; e.g. we could not copy the results
858         # back, cleanup after ourselves on the remote machine, or
859         # unpickle the results we got from the remove machine.  If we
860         # still have an active ssh subprocess, keep waiting on it.
861         # Otherwise, time for an emergency reschedule.
862         except Exception as e:
863             logger.exception(e)
864             logger.error(f'{bundle}: Something unexpected just happened...')
865             if p is not None:
866                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
867                 logger.warning(msg)
868                 return self.wait_for_process(p, bundle, depth + 1)
869             else:
870                 self.release_worker(bundle)
871                 return self.emergency_retry_nasty_bundle(bundle)
872
873     def process_work_result(self, bundle: BundleDetails) -> Any:
874         with self.status.lock:
875             is_original = bundle.src_bundle is None
876             was_cancelled = bundle.was_cancelled
877             username = bundle.username
878             machine = bundle.machine
879             result_file = bundle.result_file
880             code_file = bundle.code_file
881
882             # Whether original or backup, if we finished first we must
883             # fetch the results if the computation happened on a
884             # remote machine.
885             bundle.end_ts = time.time()
886             if not was_cancelled:
887                 assert bundle.machine is not None
888                 if bundle.hostname not in bundle.machine:
889                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
890                     logger.info(
891                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
892                     )
893
894                     # If either of these throw they are handled in
895                     # wait_for_process.
896                     attempts = 0
897                     while True:
898                         try:
899                             run_silently(cmd)
900                         except Exception as e:
901                             attempts += 1
902                             if attempts >= 3:
903                                 raise (e)
904                         else:
905                             break
906
907                     run_silently(
908                         f'{SSH} {username}@{machine}'
909                         f' "/bin/rm -f {code_file} {result_file}"'
910                     )
911                     logger.debug(
912                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
913                     )
914                 dur = bundle.end_ts - bundle.start_ts
915                 self.histogram.add_item(dur)
916
917         # Only the original worker should unpickle the file contents
918         # though since it's the only one whose result matters.  The
919         # original is also the only job that may delete result_file
920         # from disk.  Note that the original may have been cancelled
921         # if one of the backups finished first; it still must read the
922         # result from disk.
923         if is_original:
924             logger.debug(f"{bundle}: Unpickling {result_file}.")
925             try:
926                 with open(result_file, 'rb') as rb:
927                     serialized = rb.read()
928                 result = cloudpickle.loads(serialized)
929             except Exception as e:
930                 logger.exception(e)
931                 msg = f'Failed to load {result_file}... this is bad news.'
932                 logger.critical(msg)
933                 self.release_worker(bundle)
934
935                 # Re-raise the exception; the code in wait_for_process may
936                 # decide to emergency_retry_nasty_bundle here.
937                 raise Exception(e)
938             logger.debug(
939                 f'Removing local (master) {code_file} and {result_file}.'
940             )
941             os.remove(f'{result_file}')
942             os.remove(f'{code_file}')
943
944             # Notify any backups that the original is done so they
945             # should stop ASAP.  Do this whether or not we
946             # finished first since there could be more than one
947             # backup.
948             if bundle.backup_bundles is not None:
949                 for backup in bundle.backup_bundles:
950                     logger.debug(
951                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
952                     )
953                     backup.is_cancelled.set()
954
955         # This is a backup job and, by now, we have already fetched
956         # the bundle results.
957         else:
958             # Backup results don't matter, they just need to leave the
959             # result file in the right place for their originals to
960             # read/unpickle later.
961             result = None
962
963             # Tell the original to stop if we finished first.
964             if not was_cancelled:
965                 logger.debug(
966                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
967                 )
968                 bundle.src_bundle.is_cancelled.set()
969         self.release_worker(bundle, was_cancelled=was_cancelled)
970         return result
971
972     def create_original_bundle(self, pickle, fname: str):
973         from string_utils import generate_uuid
974
975         uuid = generate_uuid(omit_dashes=True)
976         code_file = f'/tmp/{uuid}.code.bin'
977         result_file = f'/tmp/{uuid}.result.bin'
978
979         logger.debug(f'Writing pickled code to {code_file}')
980         with open(f'{code_file}', 'wb') as wb:
981             wb.write(pickle)
982
983         bundle = BundleDetails(
984             pickled_code=pickle,
985             uuid=uuid,
986             fname=fname,
987             worker=None,
988             username=None,
989             machine=None,
990             hostname=platform.node(),
991             code_file=code_file,
992             result_file=result_file,
993             pid=0,
994             start_ts=time.time(),
995             end_ts=0.0,
996             slower_than_local_p95=False,
997             slower_than_global_p95=False,
998             src_bundle=None,
999             is_cancelled=threading.Event(),
1000             was_cancelled=False,
1001             backup_bundles=[],
1002             failure_count=0,
1003         )
1004         self.status.record_bundle_details(bundle)
1005         logger.debug(f'{bundle}: Created an original bundle')
1006         return bundle
1007
1008     def create_backup_bundle(self, src_bundle: BundleDetails):
1009         assert src_bundle.backup_bundles is not None
1010         n = len(src_bundle.backup_bundles)
1011         uuid = src_bundle.uuid + f'_backup#{n}'
1012
1013         backup_bundle = BundleDetails(
1014             pickled_code=src_bundle.pickled_code,
1015             uuid=uuid,
1016             fname=src_bundle.fname,
1017             worker=None,
1018             username=None,
1019             machine=None,
1020             hostname=src_bundle.hostname,
1021             code_file=src_bundle.code_file,
1022             result_file=src_bundle.result_file,
1023             pid=0,
1024             start_ts=time.time(),
1025             end_ts=0.0,
1026             slower_than_local_p95=False,
1027             slower_than_global_p95=False,
1028             src_bundle=src_bundle,
1029             is_cancelled=threading.Event(),
1030             was_cancelled=False,
1031             backup_bundles=None,  # backup backups not allowed
1032             failure_count=0,
1033         )
1034         src_bundle.backup_bundles.append(backup_bundle)
1035         self.status.record_bundle_details_already_locked(backup_bundle)
1036         logger.debug(f'{backup_bundle}: Created a backup bundle')
1037         return backup_bundle
1038
1039     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1040         assert self.status.lock.locked()
1041         assert src_bundle is not None
1042         backup_bundle = self.create_backup_bundle(src_bundle)
1043         logger.debug(
1044             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1045         )
1046         self._helper_executor.submit(self.launch, backup_bundle)
1047
1048         # Results from backups don't matter; if they finish first
1049         # they will move the result_file to this machine and let
1050         # the original pick them up and unpickle them.
1051
1052     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1053         is_original = bundle.src_bundle is None
1054         bundle.worker = None
1055         avoid_last_machine = bundle.machine
1056         bundle.machine = None
1057         bundle.username = None
1058         bundle.failure_count += 1
1059         if is_original:
1060             retry_limit = 3
1061         else:
1062             retry_limit = 2
1063
1064         if bundle.failure_count > retry_limit:
1065             logger.error(
1066                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1067             )
1068             if is_original:
1069                 raise RemoteExecutorException(
1070                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1071                 )
1072             else:
1073                 logger.error(
1074                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1075                 )
1076             return None
1077         else:
1078             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1079             logger.warning(msg)
1080             warnings.warn(msg)
1081             return self.launch(bundle, avoid_last_machine)
1082
1083     @overrides
1084     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1085         pickle = make_cloud_pickle(function, *args, **kwargs)
1086         bundle = self.create_original_bundle(pickle, function.__name__)
1087         self.total_bundles_submitted += 1
1088         return self._helper_executor.submit(self.launch, bundle)
1089
1090     @overrides
1091     def shutdown(self, wait=True) -> None:
1092         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1093         self.heartbeat_stop_event.set()
1094         self.heartbeat_thread.join()
1095         self._helper_executor.shutdown(wait)
1096         print(self.histogram)
1097
1098
1099 @singleton
1100 class DefaultExecutors(object):
1101     def __init__(self):
1102         self.thread_executor: Optional[ThreadExecutor] = None
1103         self.process_executor: Optional[ProcessExecutor] = None
1104         self.remote_executor: Optional[RemoteExecutor] = None
1105
1106     def ping(self, host) -> bool:
1107         logger.debug(f'RUN> ping -c 1 {host}')
1108         try:
1109             x = cmd_with_timeout(
1110                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1111             )
1112             return x == 0
1113         except Exception:
1114             return False
1115
1116     def thread_pool(self) -> ThreadExecutor:
1117         if self.thread_executor is None:
1118             self.thread_executor = ThreadExecutor()
1119         return self.thread_executor
1120
1121     def process_pool(self) -> ProcessExecutor:
1122         if self.process_executor is None:
1123             self.process_executor = ProcessExecutor()
1124         return self.process_executor
1125
1126     def remote_pool(self) -> RemoteExecutor:
1127         if self.remote_executor is None:
1128             logger.info('Looking for some helper machines...')
1129             pool: List[RemoteWorkerRecord] = []
1130             if self.ping('cheetah.house'):
1131                 logger.info('Found cheetah.house')
1132                 pool.append(
1133                     RemoteWorkerRecord(
1134                         username='scott',
1135                         machine='cheetah.house',
1136                         weight=34,
1137                         count=6,
1138                     ),
1139                 )
1140             if self.ping('meerkat.cabin'):
1141                 logger.info('Found meerkat.cabin')
1142                 pool.append(
1143                     RemoteWorkerRecord(
1144                         username='scott',
1145                         machine='meerkat.cabin',
1146                         weight=12,
1147                         count=2,
1148                     ),
1149                 )
1150             if self.ping('wannabe.house'):
1151                 logger.info('Found wannabe.house')
1152                 pool.append(
1153                     RemoteWorkerRecord(
1154                         username='scott',
1155                         machine='wannabe.house',
1156                         weight=25,
1157                         count=10,
1158                     ),
1159                 )
1160             if self.ping('puma.cabin'):
1161                 logger.info('Found puma.cabin')
1162                 pool.append(
1163                     RemoteWorkerRecord(
1164                         username='scott',
1165                         machine='puma.cabin',
1166                         weight=25,
1167                         count=6,
1168                     ),
1169                 )
1170             if self.ping('backup.house'):
1171                 logger.info('Found backup.house')
1172                 pool.append(
1173                     RemoteWorkerRecord(
1174                         username='scott',
1175                         machine='backup.house',
1176                         weight=7,
1177                         count=2,
1178                     ),
1179                 )
1180
1181             # The controller machine has a lot to do; go easy on it.
1182             for record in pool:
1183                 if record.machine == platform.node() and record.count > 1:
1184                     logger.info(f'Reducing workload for {record.machine}.')
1185                     record.count = 1
1186
1187             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1188             policy.register_worker_pool(pool)
1189             self.remote_executor = RemoteExecutor(pool, policy)
1190         return self.remote_executor
1191
1192     def shutdown(self) -> None:
1193         if self.thread_executor is not None:
1194             self.thread_executor.shutdown()
1195             self.thread_executor = None
1196         if self.process_executor is not None:
1197             self.process_executor.shutdown()
1198             self.process_executor = None
1199         if self.remote_executor is not None:
1200             self.remote_executor.shutdown()
1201             self.remote_executor = None