Teach chord parser about Minor7
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
9 machines.
10
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
13
14 from __future__ import annotations
15 import concurrent.futures as fut
16 import logging
17 import os
18 import platform
19 import random
20 import subprocess
21 import threading
22 import time
23 import warnings
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
28
29 import cloudpickle  # type: ignore
30 import numpy
31 from overrides import overrides
32
33 import argparse_utils
34 import config
35 import histogram as hist
36 import string_utils
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
40 from thread_utils import background_thread
41
42 logger = logging.getLogger(__name__)
43
44 parser = config.add_commandline_args(
45     f"Executors ({__file__})", "Args related to processing executors."
46 )
47 parser.add_argument(
48     '--executors_threadpool_size',
49     type=int,
50     metavar='#THREADS',
51     help='Number of threads in the default threadpool, leave unset for default',
52     default=None,
53 )
54 parser.add_argument(
55     '--executors_processpool_size',
56     type=int,
57     metavar='#PROCESSES',
58     help='Number of processes in the default processpool, leave unset for default',
59     default=None,
60 )
61 parser.add_argument(
62     '--executors_schedule_remote_backups',
63     default=True,
64     action=argparse_utils.ActionNoYes,
65     help='Should we schedule duplicative backup work if a remote bundle is slow',
66 )
67 parser.add_argument(
68     '--executors_max_bundle_failures',
69     type=int,
70     default=3,
71     metavar='#FAILURES',
72     help='Maximum number of failures before giving up on a bundle',
73 )
74
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
77
78
79 def make_cloud_pickle(fun, *args, **kwargs):
80     logger.debug("Making cloudpickled bundle at %s", fun.__name__)
81     return cloudpickle.dumps((fun, args, kwargs))
82
83
84 class BaseExecutor(ABC):
85     """The base executor interface definition."""
86
87     def __init__(self, *, title=''):
88         self.title = title
89         self.histogram = hist.SimpleHistogram(
90             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
91         )
92         self.task_count = 0
93
94     @abstractmethod
95     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
96         pass
97
98     @abstractmethod
99     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
100         pass
101
102     def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
103         """Shutdown the executor and return True if the executor is idle
104         (i.e. there are no pending or active tasks).  Return False
105         otherwise.  Note: this should only be called by the launcher
106         process.
107
108         """
109         if self.task_count == 0:
110             self.shutdown(wait=True, quiet=quiet)
111             return True
112         return False
113
114     def adjust_task_count(self, delta: int) -> None:
115         """Change the task count.  Note: do not call this method from a
116         worker, it should only be called by the launcher process /
117         thread / machine.
118
119         """
120         self.task_count += delta
121         logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
122
123     def get_task_count(self) -> int:
124         """Change the task count.  Note: do not call this method from a
125         worker, it should only be called by the launcher process /
126         thread / machine.
127
128         """
129         return self.task_count
130
131
132 class ThreadExecutor(BaseExecutor):
133     """A threadpool executor instance."""
134
135     def __init__(self, max_workers: Optional[int] = None):
136         super().__init__()
137         workers = None
138         if max_workers is not None:
139             workers = max_workers
140         elif 'executors_threadpool_size' in config.config:
141             workers = config.config['executors_threadpool_size']
142         logger.debug('Creating threadpool executor with %d workers', workers)
143         self._thread_pool_executor = fut.ThreadPoolExecutor(
144             max_workers=workers, thread_name_prefix="thread_executor_helper"
145         )
146         self.already_shutdown = False
147
148     # This is run on a different thread; do not adjust task count here.
149     @staticmethod
150     def run_local_bundle(fun, *args, **kwargs):
151         logger.debug("Running local bundle at %s", fun.__name__)
152         result = fun(*args, **kwargs)
153         return result
154
155     @overrides
156     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
157         if self.already_shutdown:
158             raise Exception('Submitted work after shutdown.')
159         self.adjust_task_count(+1)
160         newargs = []
161         newargs.append(function)
162         for arg in args:
163             newargs.append(arg)
164         start = time.time()
165         result = self._thread_pool_executor.submit(
166             ThreadExecutor.run_local_bundle, *newargs, **kwargs
167         )
168         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
169         result.add_done_callback(lambda _: self.adjust_task_count(-1))
170         return result
171
172     @overrides
173     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
174         if not self.already_shutdown:
175             logger.debug('Shutting down threadpool executor %s', self.title)
176             self._thread_pool_executor.shutdown(wait)
177             if not quiet:
178                 print(self.histogram.__repr__(label_formatter='%ds'))
179             self.already_shutdown = True
180
181
182 class ProcessExecutor(BaseExecutor):
183     """A processpool executor."""
184
185     def __init__(self, max_workers=None):
186         super().__init__()
187         workers = None
188         if max_workers is not None:
189             workers = max_workers
190         elif 'executors_processpool_size' in config.config:
191             workers = config.config['executors_processpool_size']
192         logger.debug('Creating processpool executor with %d workers.', workers)
193         self._process_executor = fut.ProcessPoolExecutor(
194             max_workers=workers,
195         )
196         self.already_shutdown = False
197
198     # This is run in another process; do not adjust task count here.
199     @staticmethod
200     def run_cloud_pickle(pickle):
201         fun, args, kwargs = cloudpickle.loads(pickle)
202         logger.debug("Running pickled bundle at %s", fun.__name__)
203         result = fun(*args, **kwargs)
204         return result
205
206     @overrides
207     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
208         if self.already_shutdown:
209             raise Exception('Submitted work after shutdown.')
210         start = time.time()
211         self.adjust_task_count(+1)
212         pickle = make_cloud_pickle(function, *args, **kwargs)
213         result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
214         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
215         result.add_done_callback(lambda _: self.adjust_task_count(-1))
216         return result
217
218     @overrides
219     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
220         if not self.already_shutdown:
221             logger.debug('Shutting down processpool executor %s', self.title)
222             self._process_executor.shutdown(wait)
223             if not quiet:
224                 print(self.histogram.__repr__(label_formatter='%ds'))
225             self.already_shutdown = True
226
227     def __getstate__(self):
228         state = self.__dict__.copy()
229         state['_process_executor'] = None
230         return state
231
232
233 class RemoteExecutorException(Exception):
234     """Thrown when a bundle cannot be executed despite several retries."""
235
236     pass
237
238
239 @dataclass
240 class RemoteWorkerRecord:
241     """A record of info about a remote worker."""
242
243     username: str
244     machine: str
245     weight: int
246     count: int
247
248     def __hash__(self):
249         return hash((self.username, self.machine))
250
251     def __repr__(self):
252         return f'{self.username}@{self.machine}'
253
254
255 @dataclass
256 class BundleDetails:
257     """All info necessary to define some unit of work that needs to be
258     done, where it is being run, its state, whether it is an original
259     bundle of a backup bundle, how many times it has failed, etc...
260
261     """
262
263     pickled_code: bytes
264     uuid: str
265     fname: str
266     worker: Optional[RemoteWorkerRecord]
267     username: Optional[str]
268     machine: Optional[str]
269     hostname: str
270     code_file: str
271     result_file: str
272     pid: int
273     start_ts: float
274     end_ts: float
275     slower_than_local_p95: bool
276     slower_than_global_p95: bool
277     src_bundle: Optional[BundleDetails]
278     is_cancelled: threading.Event
279     was_cancelled: bool
280     backup_bundles: Optional[List[BundleDetails]]
281     failure_count: int
282
283     def __repr__(self):
284         uuid = self.uuid
285         if uuid[-9:-2] == '_backup':
286             uuid = uuid[:-9]
287             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
288         else:
289             suffix = uuid[-6:]
290
291         colorz = [
292             fg('violet red'),
293             fg('red'),
294             fg('orange'),
295             fg('peach orange'),
296             fg('yellow'),
297             fg('marigold yellow'),
298             fg('green yellow'),
299             fg('tea green'),
300             fg('cornflower blue'),
301             fg('turquoise blue'),
302             fg('tropical blue'),
303             fg('lavender purple'),
304             fg('medium purple'),
305         ]
306         c = colorz[int(uuid[-2:], 16) % len(colorz)]
307         fname = self.fname if self.fname is not None else 'nofname'
308         machine = self.machine if self.machine is not None else 'nomachine'
309         return f'{c}{suffix}/{fname}/{machine}{reset()}'
310
311
312 class RemoteExecutorStatus:
313     """A status 'scoreboard' for a remote executor."""
314
315     def __init__(self, total_worker_count: int) -> None:
316         self.worker_count: int = total_worker_count
317         self.known_workers: Set[RemoteWorkerRecord] = set()
318         self.start_time: float = time.time()
319         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
320         self.end_per_bundle: Dict[str, float] = defaultdict(float)
321         self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
322         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
323         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
324         self.finished_bundle_timings: List[float] = []
325         self.last_periodic_dump: Optional[float] = None
326         self.total_bundles_submitted: int = 0
327
328         # Protects reads and modification using self.  Also used
329         # as a memory fence for modifications to bundle.
330         self.lock: threading.Lock = threading.Lock()
331
332     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
333         with self.lock:
334             self.record_acquire_worker_already_locked(worker, uuid)
335
336     def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
337         assert self.lock.locked()
338         self.known_workers.add(worker)
339         self.start_per_bundle[uuid] = None
340         x = self.in_flight_bundles_by_worker.get(worker, set())
341         x.add(uuid)
342         self.in_flight_bundles_by_worker[worker] = x
343
344     def record_bundle_details(self, details: BundleDetails) -> None:
345         with self.lock:
346             self.record_bundle_details_already_locked(details)
347
348     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
349         assert self.lock.locked()
350         self.bundle_details_by_uuid[details.uuid] = details
351
352     def record_release_worker(
353         self,
354         worker: RemoteWorkerRecord,
355         uuid: str,
356         was_cancelled: bool,
357     ) -> None:
358         with self.lock:
359             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
360
361     def record_release_worker_already_locked(
362         self,
363         worker: RemoteWorkerRecord,
364         uuid: str,
365         was_cancelled: bool,
366     ) -> None:
367         assert self.lock.locked()
368         ts = time.time()
369         self.end_per_bundle[uuid] = ts
370         self.in_flight_bundles_by_worker[worker].remove(uuid)
371         if not was_cancelled:
372             start = self.start_per_bundle[uuid]
373             assert start is not None
374             bundle_latency = ts - start
375             x = self.finished_bundle_timings_per_worker.get(worker, [])
376             x.append(bundle_latency)
377             self.finished_bundle_timings_per_worker[worker] = x
378             self.finished_bundle_timings.append(bundle_latency)
379
380     def record_processing_began(self, uuid: str):
381         with self.lock:
382             self.start_per_bundle[uuid] = time.time()
383
384     def total_in_flight(self) -> int:
385         assert self.lock.locked()
386         total_in_flight = 0
387         for worker in self.known_workers:
388             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
389         return total_in_flight
390
391     def total_idle(self) -> int:
392         assert self.lock.locked()
393         return self.worker_count - self.total_in_flight()
394
395     def __repr__(self):
396         assert self.lock.locked()
397         ts = time.time()
398         total_finished = len(self.finished_bundle_timings)
399         total_in_flight = self.total_in_flight()
400         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
401         qall = None
402         if len(self.finished_bundle_timings) > 1:
403             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
404             ret += (
405                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
406                 f'✅={total_finished}/{self.total_bundles_submitted}, '
407                 f'💻n={total_in_flight}/{self.worker_count}\n'
408             )
409         else:
410             ret += (
411                 f'⏱={ts-self.start_time:.1f}s, '
412                 f'✅={total_finished}/{self.total_bundles_submitted}, '
413                 f'💻n={total_in_flight}/{self.worker_count}\n'
414             )
415
416         for worker in self.known_workers:
417             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
418             timings = self.finished_bundle_timings_per_worker.get(worker, [])
419             count = len(timings)
420             qworker = None
421             if count > 1:
422                 qworker = numpy.quantile(timings, [0.5, 0.95])
423                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
424             else:
425                 ret += '\n'
426             if count > 0:
427                 ret += f'    ...finished {count} total bundle(s) so far\n'
428             in_flight = len(self.in_flight_bundles_by_worker[worker])
429             if in_flight > 0:
430                 ret += f'    ...{in_flight} bundles currently in flight:\n'
431                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
432                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
433                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
434                     if self.start_per_bundle[bundle_uuid] is not None:
435                         sec = ts - self.start_per_bundle[bundle_uuid]
436                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
437                     else:
438                         ret += f'       {details} setting up / copying data...'
439                         sec = 0.0
440
441                     if qworker is not None:
442                         if sec > qworker[1]:
443                             ret += f'{bg("red")}>💻p95{reset()} '
444                             if details is not None:
445                                 details.slower_than_local_p95 = True
446                         else:
447                             if details is not None:
448                                 details.slower_than_local_p95 = False
449
450                     if qall is not None:
451                         if sec > qall[1]:
452                             ret += f'{bg("red")}>∀p95{reset()} '
453                             if details is not None:
454                                 details.slower_than_global_p95 = True
455                         else:
456                             details.slower_than_global_p95 = False
457                     ret += '\n'
458         return ret
459
460     def periodic_dump(self, total_bundles_submitted: int) -> None:
461         assert self.lock.locked()
462         self.total_bundles_submitted = total_bundles_submitted
463         ts = time.time()
464         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
465             print(self)
466             self.last_periodic_dump = ts
467
468
469 class RemoteWorkerSelectionPolicy(ABC):
470     """A policy for selecting a remote worker base class."""
471
472     def __init__(self):
473         self.workers: Optional[List[RemoteWorkerRecord]] = None
474
475     def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
476         self.workers = workers
477
478     @abstractmethod
479     def is_worker_available(self) -> bool:
480         pass
481
482     @abstractmethod
483     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
484         pass
485
486
487 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
488     """A remote worker selector that uses weighted RNG."""
489
490     @overrides
491     def is_worker_available(self) -> bool:
492         if self.workers:
493             for worker in self.workers:
494                 if worker.count > 0:
495                     return True
496         return False
497
498     @overrides
499     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
500         grabbag = []
501         if self.workers:
502             for worker in self.workers:
503                 if worker.machine != machine_to_avoid:
504                     if worker.count > 0:
505                         for _ in range(worker.count * worker.weight):
506                             grabbag.append(worker)
507
508         if len(grabbag) == 0:
509             logger.debug('There are no available workers that avoid %s', machine_to_avoid)
510             if self.workers:
511                 for worker in self.workers:
512                     if worker.count > 0:
513                         for _ in range(worker.count * worker.weight):
514                             grabbag.append(worker)
515
516         if len(grabbag) == 0:
517             logger.warning('There are no available workers?!')
518             return None
519
520         worker = random.sample(grabbag, 1)[0]
521         assert worker.count > 0
522         worker.count -= 1
523         logger.debug('Selected worker %s', worker)
524         return worker
525
526
527 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
528     """A remote worker selector that just round robins."""
529
530     def __init__(self) -> None:
531         super().__init__()
532         self.index = 0
533
534     @overrides
535     def is_worker_available(self) -> bool:
536         if self.workers:
537             for worker in self.workers:
538                 if worker.count > 0:
539                     return True
540         return False
541
542     @overrides
543     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
544         if self.workers:
545             x = self.index
546             while True:
547                 worker = self.workers[x]
548                 if worker.count > 0:
549                     worker.count -= 1
550                     x += 1
551                     if x >= len(self.workers):
552                         x = 0
553                     self.index = x
554                     logger.debug('Selected worker %s', worker)
555                     return worker
556                 x += 1
557                 if x >= len(self.workers):
558                     x = 0
559                 if x == self.index:
560                     logger.warning('Unexpectedly could not find a worker, retrying...')
561                     return None
562         return None
563
564
565 class RemoteExecutor(BaseExecutor):
566     """A remote work executor."""
567
568     def __init__(
569         self,
570         workers: List[RemoteWorkerRecord],
571         policy: RemoteWorkerSelectionPolicy,
572     ) -> None:
573         super().__init__()
574         self.workers = workers
575         self.policy = policy
576         self.worker_count = 0
577         for worker in self.workers:
578             self.worker_count += worker.count
579         if self.worker_count <= 0:
580             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
581             logger.critical(msg)
582             raise RemoteExecutorException(msg)
583         self.policy.register_worker_pool(self.workers)
584         self.cv = threading.Condition()
585         logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
586         self._helper_executor = fut.ThreadPoolExecutor(
587             thread_name_prefix="remote_executor_helper",
588             max_workers=self.worker_count,
589         )
590         self.status = RemoteExecutorStatus(self.worker_count)
591         self.total_bundles_submitted = 0
592         self.backup_lock = threading.Lock()
593         self.last_backup = None
594         (
595             self.heartbeat_thread,
596             self.heartbeat_stop_event,
597         ) = self.run_periodic_heartbeat()
598         self.already_shutdown = False
599
600     @background_thread
601     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
602         while not stop_event.is_set():
603             time.sleep(5.0)
604             logger.debug('Running periodic heartbeat code...')
605             self.heartbeat()
606         logger.debug('Periodic heartbeat thread shutting down.')
607
608     def heartbeat(self) -> None:
609         # Note: this is invoked on a background thread, not an
610         # executor thread.  Be careful what you do with it b/c it
611         # needs to get back and dump status again periodically.
612         with self.status.lock:
613             self.status.periodic_dump(self.total_bundles_submitted)
614
615             # Look for bundles to reschedule via executor.submit
616             if config.config['executors_schedule_remote_backups']:
617                 self.maybe_schedule_backup_bundles()
618
619     def maybe_schedule_backup_bundles(self):
620         assert self.status.lock.locked()
621         num_done = len(self.status.finished_bundle_timings)
622         num_idle_workers = self.worker_count - self.task_count
623         now = time.time()
624         if (
625             num_done >= 2
626             and num_idle_workers > 0
627             and (self.last_backup is None or (now - self.last_backup > 9.0))
628             and self.backup_lock.acquire(blocking=False)
629         ):
630             try:
631                 assert self.backup_lock.locked()
632
633                 bundle_to_backup = None
634                 best_score = None
635                 for (
636                     worker,
637                     bundle_uuids,
638                 ) in self.status.in_flight_bundles_by_worker.items():
639
640                     # Prefer to schedule backups of bundles running on
641                     # slower machines.
642                     base_score = 0
643                     for record in self.workers:
644                         if worker.machine == record.machine:
645                             base_score = float(record.weight)
646                             base_score = 1.0 / base_score
647                             base_score *= 200.0
648                             base_score = int(base_score)
649                             break
650
651                     for uuid in bundle_uuids:
652                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
653                         if (
654                             bundle is not None
655                             and bundle.src_bundle is None
656                             and bundle.backup_bundles is not None
657                         ):
658                             score = base_score
659
660                             # Schedule backups of bundles running
661                             # longer; especially those that are
662                             # unexpectedly slow.
663                             start_ts = self.status.start_per_bundle[uuid]
664                             if start_ts is not None:
665                                 runtime = now - start_ts
666                                 score += runtime
667                                 logger.debug('score[%s] => %.1f  # latency boost', bundle, score)
668
669                                 if bundle.slower_than_local_p95:
670                                     score += runtime / 2
671                                     logger.debug('score[%s] => %.1f  # >worker p95', bundle, score)
672
673                                 if bundle.slower_than_global_p95:
674                                     score += runtime / 4
675                                     logger.debug('score[%s] => %.1f  # >global p95', bundle, score)
676
677                             # Prefer backups of bundles that don't
678                             # have backups already.
679                             backup_count = len(bundle.backup_bundles)
680                             if backup_count == 0:
681                                 score *= 2
682                             elif backup_count == 1:
683                                 score /= 2
684                             elif backup_count == 2:
685                                 score /= 8
686                             else:
687                                 score = 0
688                             logger.debug(
689                                 'score[%s] => %.1f  # {backup_count} dup backup factor',
690                                 bundle,
691                                 score,
692                             )
693
694                             if score != 0 and (best_score is None or score > best_score):
695                                 bundle_to_backup = bundle
696                                 assert bundle is not None
697                                 assert bundle.backup_bundles is not None
698                                 assert bundle.src_bundle is None
699                                 best_score = score
700
701                 # Note: this is all still happening on the heartbeat
702                 # runner thread.  That's ok because
703                 # schedule_backup_for_bundle uses the executor to
704                 # submit the bundle again which will cause it to be
705                 # picked up by a worker thread and allow this thread
706                 # to return to run future heartbeats.
707                 if bundle_to_backup is not None:
708                     self.last_backup = now
709                     logger.info(
710                         '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
711                         bundle_to_backup,
712                         best_score,
713                     )
714                     self.schedule_backup_for_bundle(bundle_to_backup)
715             finally:
716                 self.backup_lock.release()
717
718     def is_worker_available(self) -> bool:
719         return self.policy.is_worker_available()
720
721     def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
722         return self.policy.acquire_worker(machine_to_avoid)
723
724     def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
725         with self.cv:
726             while not self.is_worker_available():
727                 self.cv.wait()
728             worker = self.acquire_worker(machine_to_avoid)
729             if worker is not None:
730                 return worker
731         msg = "We should never reach this point in the code"
732         logger.critical(msg)
733         raise Exception(msg)
734
735     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
736         worker = bundle.worker
737         assert worker is not None
738         logger.debug('Released worker %s', worker)
739         self.status.record_release_worker(
740             worker,
741             bundle.uuid,
742             was_cancelled,
743         )
744         with self.cv:
745             worker.count += 1
746             self.cv.notify()
747         self.adjust_task_count(-1)
748
749     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
750         with self.status.lock:
751             if bundle.is_cancelled.wait(timeout=0.0):
752                 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
753                 bundle.was_cancelled = True
754                 return True
755         return False
756
757     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
758         """Find a worker for bundle or block until one is available."""
759
760         self.adjust_task_count(+1)
761         uuid = bundle.uuid
762         hostname = bundle.hostname
763         avoid_machine = override_avoid_machine
764         is_original = bundle.src_bundle is None
765
766         # Try not to schedule a backup on the same host as the original.
767         if avoid_machine is None and bundle.src_bundle is not None:
768             avoid_machine = bundle.src_bundle.machine
769         worker = None
770         while worker is None:
771             worker = self.find_available_worker_or_block(avoid_machine)
772         assert worker is not None
773
774         # Ok, found a worker.
775         bundle.worker = worker
776         machine = bundle.machine = worker.machine
777         username = bundle.username = worker.username
778         self.status.record_acquire_worker(worker, uuid)
779         logger.debug('%s: Running bundle on %s...', bundle, worker)
780
781         # Before we do any work, make sure the bundle is still viable.
782         # It may have been some time between when it was submitted and
783         # now due to lack of worker availability and someone else may
784         # have already finished it.
785         if self.check_if_cancelled(bundle):
786             try:
787                 return self.process_work_result(bundle)
788             except Exception as e:
789                 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
790                 self.release_worker(bundle)
791                 if is_original:
792                     # Weird.  We are the original owner of this
793                     # bundle.  For it to have been cancelled, a backup
794                     # must have already started and completed before
795                     # we even for started.  Moreover, the backup says
796                     # it is done but we can't find the results it
797                     # should have copied over.  Reschedule the whole
798                     # thing.
799                     logger.exception(e)
800                     logger.error(
801                         '%s: We are the original owner thread and yet there are '
802                         'no results for this bundle.  This is unexpected and bad.',
803                         bundle,
804                     )
805                     return self.emergency_retry_nasty_bundle(bundle)
806                 else:
807                     # We're a backup and our bundle is cancelled
808                     # before we even got started.  Do nothing and let
809                     # the original bundle's thread worry about either
810                     # finding the results or complaining about it.
811                     return None
812
813         # Send input code / data to worker machine if it's not local.
814         if hostname not in machine:
815             try:
816                 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
817                 start_ts = time.time()
818                 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
819                 run_silently(cmd)
820                 xfer_latency = time.time() - start_ts
821                 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
822             except Exception as e:
823                 self.release_worker(bundle)
824                 if is_original:
825                     # Weird.  We tried to copy the code to the worker
826                     # and it failed...  And we're the original bundle.
827                     # We have to retry.
828                     logger.exception(e)
829                     logger.error(
830                         "%s: Failed to send instructions to the worker machine?! "
831                         "This is not expected; we\'re the original bundle so this shouldn\'t "
832                         "be a race condition.  Attempting an emergency retry...",
833                         bundle,
834                     )
835                     return self.emergency_retry_nasty_bundle(bundle)
836                 else:
837                     # This is actually expected; we're a backup.
838                     # There's a race condition where someone else
839                     # already finished the work and removed the source
840                     # code_file before we could copy it.  Ignore.
841                     logger.warning(
842                         '%s: Failed to send instructions to the worker machine... '
843                         'We\'re a backup and this may be caused by the original (or '
844                         'some other backup) already finishing this work.  Ignoring.',
845                         bundle,
846                     )
847                     return None
848
849         # Kick off the work.  Note that if this fails we let
850         # wait_for_process deal with it.
851         self.status.record_processing_began(uuid)
852         cmd = (
853             f'{SSH} {bundle.username}@{bundle.machine} '
854             f'"source py38-venv/bin/activate &&'
855             f' /home/scott/lib/python_modules/remote_worker.py'
856             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
857         )
858         logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
859         p = cmd_in_background(cmd, silent=True)
860         bundle.pid = p.pid
861         logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
862         return self.wait_for_process(p, bundle, 0)
863
864     def wait_for_process(
865         self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
866     ) -> Any:
867         machine = bundle.machine
868         assert p is not None
869         pid = p.pid
870         if depth > 3:
871             logger.error(
872                 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
873             )
874             p.terminate()
875             self.release_worker(bundle)
876             return self.emergency_retry_nasty_bundle(bundle)
877
878         # Spin until either the ssh job we scheduled finishes the
879         # bundle or some backup worker signals that they finished it
880         # before we could.
881         while True:
882             try:
883                 p.wait(timeout=0.25)
884             except subprocess.TimeoutExpired:
885                 if self.check_if_cancelled(bundle):
886                     logger.info('%s: looks like another worker finished bundle...', bundle)
887                     break
888             else:
889                 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
890                 p = None
891                 break
892
893         # If we get here we believe the bundle is done; either the ssh
894         # subprocess finished (hopefully successfully) or we noticed
895         # that some other worker seems to have completed the bundle
896         # and we're bailing out.
897         try:
898             ret = self.process_work_result(bundle)
899             if ret is not None and p is not None:
900                 p.terminate()
901             return ret
902
903         # Something went wrong; e.g. we could not copy the results
904         # back, cleanup after ourselves on the remote machine, or
905         # unpickle the results we got from the remove machine.  If we
906         # still have an active ssh subprocess, keep waiting on it.
907         # Otherwise, time for an emergency reschedule.
908         except Exception as e:
909             logger.exception(e)
910             logger.error('%s: Something unexpected just happened...', bundle)
911             if p is not None:
912                 logger.warning(
913                     "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
914                 )
915                 return self.wait_for_process(p, bundle, depth + 1)
916             else:
917                 self.release_worker(bundle)
918                 return self.emergency_retry_nasty_bundle(bundle)
919
920     def process_work_result(self, bundle: BundleDetails) -> Any:
921         with self.status.lock:
922             is_original = bundle.src_bundle is None
923             was_cancelled = bundle.was_cancelled
924             username = bundle.username
925             machine = bundle.machine
926             result_file = bundle.result_file
927             code_file = bundle.code_file
928
929             # Whether original or backup, if we finished first we must
930             # fetch the results if the computation happened on a
931             # remote machine.
932             bundle.end_ts = time.time()
933             if not was_cancelled:
934                 assert bundle.machine is not None
935                 if bundle.hostname not in bundle.machine:
936                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
937                     logger.info(
938                         "%s: Fetching results back from %s@%s via %s",
939                         bundle,
940                         username,
941                         machine,
942                         cmd,
943                     )
944
945                     # If either of these throw they are handled in
946                     # wait_for_process.
947                     attempts = 0
948                     while True:
949                         try:
950                             run_silently(cmd)
951                         except Exception as e:
952                             attempts += 1
953                             if attempts >= 3:
954                                 raise e
955                         else:
956                             break
957
958                     # Cleanup remote /tmp files.
959                     run_silently(
960                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
961                     )
962                     logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
963                 dur = bundle.end_ts - bundle.start_ts
964                 self.histogram.add_item(dur)
965
966         # Only the original worker should unpickle the file contents
967         # though since it's the only one whose result matters.  The
968         # original is also the only job that may delete result_file
969         # from disk.  Note that the original may have been cancelled
970         # if one of the backups finished first; it still must read the
971         # result from disk.  It still does that here with is_cancelled
972         # set.
973         if is_original:
974             logger.debug("%s: Unpickling %s.", bundle, result_file)
975             try:
976                 with open(result_file, 'rb') as rb:
977                     serialized = rb.read()
978                 result = cloudpickle.loads(serialized)
979             except Exception as e:
980                 logger.exception(e)
981                 logger.error('Failed to load %s... this is bad news.', result_file)
982                 self.release_worker(bundle)
983
984                 # Re-raise the exception; the code in wait_for_process may
985                 # decide to emergency_retry_nasty_bundle here.
986                 raise e
987             logger.debug('Removing local (master) %s and %s.', code_file, result_file)
988             os.remove(result_file)
989             os.remove(code_file)
990
991             # Notify any backups that the original is done so they
992             # should stop ASAP.  Do this whether or not we
993             # finished first since there could be more than one
994             # backup.
995             if bundle.backup_bundles is not None:
996                 for backup in bundle.backup_bundles:
997                     logger.debug(
998                         '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
999                     )
1000                     backup.is_cancelled.set()
1001
1002         # This is a backup job and, by now, we have already fetched
1003         # the bundle results.
1004         else:
1005             # Backup results don't matter, they just need to leave the
1006             # result file in the right place for their originals to
1007             # read/unpickle later.
1008             result = None
1009
1010             # Tell the original to stop if we finished first.
1011             if not was_cancelled:
1012                 orig_bundle = bundle.src_bundle
1013                 assert orig_bundle is not None
1014                 logger.debug(
1015                     '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1016                 )
1017                 orig_bundle.is_cancelled.set()
1018         self.release_worker(bundle, was_cancelled=was_cancelled)
1019         return result
1020
1021     def create_original_bundle(self, pickle, fname: str):
1022         uuid = string_utils.generate_uuid(omit_dashes=True)
1023         code_file = f'/tmp/{uuid}.code.bin'
1024         result_file = f'/tmp/{uuid}.result.bin'
1025
1026         logger.debug('Writing pickled code to %s', code_file)
1027         with open(code_file, 'wb') as wb:
1028             wb.write(pickle)
1029
1030         bundle = BundleDetails(
1031             pickled_code=pickle,
1032             uuid=uuid,
1033             fname=fname,
1034             worker=None,
1035             username=None,
1036             machine=None,
1037             hostname=platform.node(),
1038             code_file=code_file,
1039             result_file=result_file,
1040             pid=0,
1041             start_ts=time.time(),
1042             end_ts=0.0,
1043             slower_than_local_p95=False,
1044             slower_than_global_p95=False,
1045             src_bundle=None,
1046             is_cancelled=threading.Event(),
1047             was_cancelled=False,
1048             backup_bundles=[],
1049             failure_count=0,
1050         )
1051         self.status.record_bundle_details(bundle)
1052         logger.debug('%s: Created an original bundle', bundle)
1053         return bundle
1054
1055     def create_backup_bundle(self, src_bundle: BundleDetails):
1056         assert self.status.lock.locked()
1057         assert src_bundle.backup_bundles is not None
1058         n = len(src_bundle.backup_bundles)
1059         uuid = src_bundle.uuid + f'_backup#{n}'
1060
1061         backup_bundle = BundleDetails(
1062             pickled_code=src_bundle.pickled_code,
1063             uuid=uuid,
1064             fname=src_bundle.fname,
1065             worker=None,
1066             username=None,
1067             machine=None,
1068             hostname=src_bundle.hostname,
1069             code_file=src_bundle.code_file,
1070             result_file=src_bundle.result_file,
1071             pid=0,
1072             start_ts=time.time(),
1073             end_ts=0.0,
1074             slower_than_local_p95=False,
1075             slower_than_global_p95=False,
1076             src_bundle=src_bundle,
1077             is_cancelled=threading.Event(),
1078             was_cancelled=False,
1079             backup_bundles=None,  # backup backups not allowed
1080             failure_count=0,
1081         )
1082         src_bundle.backup_bundles.append(backup_bundle)
1083         self.status.record_bundle_details_already_locked(backup_bundle)
1084         logger.debug('%s: Created a backup bundle', backup_bundle)
1085         return backup_bundle
1086
1087     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1088         assert self.status.lock.locked()
1089         assert src_bundle is not None
1090         backup_bundle = self.create_backup_bundle(src_bundle)
1091         logger.debug(
1092             '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
1093         )
1094         self._helper_executor.submit(self.launch, backup_bundle)
1095
1096         # Results from backups don't matter; if they finish first
1097         # they will move the result_file to this machine and let
1098         # the original pick them up and unpickle them (and return
1099         # a result).
1100
1101     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1102         is_original = bundle.src_bundle is None
1103         bundle.worker = None
1104         avoid_last_machine = bundle.machine
1105         bundle.machine = None
1106         bundle.username = None
1107         bundle.failure_count += 1
1108         if is_original:
1109             retry_limit = 3
1110         else:
1111             retry_limit = 2
1112
1113         if bundle.failure_count > retry_limit:
1114             logger.error(
1115                 '%s: Tried this bundle too many times already (%dx); giving up.',
1116                 bundle,
1117                 retry_limit,
1118             )
1119             if is_original:
1120                 raise RemoteExecutorException(
1121                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1122                 )
1123             else:
1124                 logger.error(
1125                     '%s: At least it\'s only a backup; better luck with the others.', bundle
1126                 )
1127             return None
1128         else:
1129             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1130             logger.warning(msg)
1131             warnings.warn(msg)
1132             return self.launch(bundle, avoid_last_machine)
1133
1134     @overrides
1135     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1136         if self.already_shutdown:
1137             raise Exception('Submitted work after shutdown.')
1138         pickle = make_cloud_pickle(function, *args, **kwargs)
1139         bundle = self.create_original_bundle(pickle, function.__name__)
1140         self.total_bundles_submitted += 1
1141         return self._helper_executor.submit(self.launch, bundle)
1142
1143     @overrides
1144     def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1145         if not self.already_shutdown:
1146             logging.debug('Shutting down RemoteExecutor %s', self.title)
1147             self.heartbeat_stop_event.set()
1148             self.heartbeat_thread.join()
1149             self._helper_executor.shutdown(wait)
1150             if not quiet:
1151                 print(self.histogram.__repr__(label_formatter='%ds'))
1152             self.already_shutdown = True
1153
1154
1155 @singleton
1156 class DefaultExecutors(object):
1157     """A container for a default thread, process and remote executor.
1158     These are not created until needed and we take care to clean up
1159     before process exit.
1160
1161     """
1162
1163     def __init__(self):
1164         self.thread_executor: Optional[ThreadExecutor] = None
1165         self.process_executor: Optional[ProcessExecutor] = None
1166         self.remote_executor: Optional[RemoteExecutor] = None
1167
1168     @staticmethod
1169     def ping(host) -> bool:
1170         logger.debug('RUN> ping -c 1 %s', host)
1171         try:
1172             x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1173             return x == 0
1174         except Exception:
1175             return False
1176
1177     def thread_pool(self) -> ThreadExecutor:
1178         if self.thread_executor is None:
1179             self.thread_executor = ThreadExecutor()
1180         return self.thread_executor
1181
1182     def process_pool(self) -> ProcessExecutor:
1183         if self.process_executor is None:
1184             self.process_executor = ProcessExecutor()
1185         return self.process_executor
1186
1187     def remote_pool(self) -> RemoteExecutor:
1188         if self.remote_executor is None:
1189             logger.info('Looking for some helper machines...')
1190             pool: List[RemoteWorkerRecord] = []
1191             if self.ping('cheetah.house'):
1192                 logger.info('Found cheetah.house')
1193                 pool.append(
1194                     RemoteWorkerRecord(
1195                         username='scott',
1196                         machine='cheetah.house',
1197                         weight=24,
1198                         count=5,
1199                     ),
1200                 )
1201             if self.ping('meerkat.cabin'):
1202                 logger.info('Found meerkat.cabin')
1203                 pool.append(
1204                     RemoteWorkerRecord(
1205                         username='scott',
1206                         machine='meerkat.cabin',
1207                         weight=12,
1208                         count=2,
1209                     ),
1210                 )
1211             if self.ping('wannabe.house'):
1212                 logger.info('Found wannabe.house')
1213                 pool.append(
1214                     RemoteWorkerRecord(
1215                         username='scott',
1216                         machine='wannabe.house',
1217                         weight=14,
1218                         count=2,
1219                     ),
1220                 )
1221             if self.ping('puma.cabin'):
1222                 logger.info('Found puma.cabin')
1223                 pool.append(
1224                     RemoteWorkerRecord(
1225                         username='scott',
1226                         machine='puma.cabin',
1227                         weight=24,
1228                         count=5,
1229                     ),
1230                 )
1231             if self.ping('backup.house'):
1232                 logger.info('Found backup.house')
1233                 pool.append(
1234                     RemoteWorkerRecord(
1235                         username='scott',
1236                         machine='backup.house',
1237                         weight=9,
1238                         count=2,
1239                     ),
1240                 )
1241
1242             # The controller machine has a lot to do; go easy on it.
1243             for record in pool:
1244                 if record.machine == platform.node() and record.count > 1:
1245                     logger.info('Reducing workload for %s.', record.machine)
1246                     record.count = max(int(record.count / 2), 1)
1247
1248             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1249             policy.register_worker_pool(pool)
1250             self.remote_executor = RemoteExecutor(pool, policy)
1251         return self.remote_executor
1252
1253     def shutdown(self) -> None:
1254         if self.thread_executor is not None:
1255             self.thread_executor.shutdown(wait=True, quiet=True)
1256             self.thread_executor = None
1257         if self.process_executor is not None:
1258             self.process_executor.shutdown(wait=True, quiet=True)
1259             self.process_executor = None
1260         if self.remote_executor is not None:
1261             self.remote_executor.shutdown(wait=True, quiet=True)
1262             self.remote_executor = None