Do not adjust task count from a child process or thread; always
[python_utils.git] / executors.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
9 import logging
10 import numpy
11 import os
12 import platform
13 import random
14 import subprocess
15 import threading
16 import time
17 from typing import Any, Callable, Dict, List, Optional, Set
18 import warnings
19
20 import cloudpickle  # type: ignore
21 from overrides import overrides
22
23 from ansi import bg, fg, underline, reset
24 import argparse_utils
25 import config
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
30
31
32 logger = logging.getLogger(__name__)
33
34 parser = config.add_commandline_args(
35     f"Executors ({__file__})", "Args related to processing executors."
36 )
37 parser.add_argument(
38     '--executors_threadpool_size',
39     type=int,
40     metavar='#THREADS',
41     help='Number of threads in the default threadpool, leave unset for default',
42     default=None,
43 )
44 parser.add_argument(
45     '--executors_processpool_size',
46     type=int,
47     metavar='#PROCESSES',
48     help='Number of processes in the default processpool, leave unset for default',
49     default=None,
50 )
51 parser.add_argument(
52     '--executors_schedule_remote_backups',
53     default=True,
54     action=argparse_utils.ActionNoYes,
55     help='Should we schedule duplicative backup work if a remote bundle is slow',
56 )
57 parser.add_argument(
58     '--executors_max_bundle_failures',
59     type=int,
60     default=3,
61     metavar='#FAILURES',
62     help='Maximum number of failures before giving up on a bundle',
63 )
64
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
67
68
69 def make_cloud_pickle(fun, *args, **kwargs):
70     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71     return cloudpickle.dumps((fun, args, kwargs))
72
73
74 class BaseExecutor(ABC):
75     def __init__(self, *, title=''):
76         self.title = title
77         self.histogram = hist.SimpleHistogram(
78             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
79         )
80         self.task_count = 0
81
82     @abstractmethod
83     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
84         pass
85
86     @abstractmethod
87     def shutdown(self, wait: bool = True) -> None:
88         pass
89
90     def adjust_task_count(self, delta: int) -> None:
91         """Change the task count.  Note: do not call this method from a
92         worker, it should only be called by the launcher process /
93         thread / machine.
94
95         """
96         self.task_count += delta
97         print(f'(adjusted task count by {delta} to {self.task_count})')
98         logger.debug(f'Executor current task count is {self.task_count}')
99
100     def get_task_count(self) -> int:
101         """Change the task count.  Note: do not call this method from a
102         worker, it should only be called by the launcher process /
103         thread / machine.
104
105         """
106         return self.task_count
107
108
109 class ThreadExecutor(BaseExecutor):
110     def __init__(self, max_workers: Optional[int] = None):
111         super().__init__()
112         workers = None
113         if max_workers is not None:
114             workers = max_workers
115         elif 'executors_threadpool_size' in config.config:
116             workers = config.config['executors_threadpool_size']
117         logger.debug(f'Creating threadpool executor with {workers} workers')
118         self._thread_pool_executor = fut.ThreadPoolExecutor(
119             max_workers=workers, thread_name_prefix="thread_executor_helper"
120         )
121
122     # This is run on a different thread; do not adjust task count here.
123     def run_local_bundle(self, fun, *args, **kwargs):
124         logger.debug(f"Running local bundle at {fun.__name__}")
125         result = fun(*args, **kwargs)
126         return result
127
128     @overrides
129     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
130         self.adjust_task_count(+1)
131         newargs = []
132         newargs.append(function)
133         for arg in args:
134             newargs.append(arg)
135         start = time.time()
136         result = self._thread_pool_executor.submit(
137             self.run_local_bundle, *newargs, **kwargs
138         )
139         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
140         result.add_done_callback(lambda _: self.adjust_task_count(-1))
141
142     @overrides
143     def shutdown(self, wait=True) -> None:
144         logger.debug(f'Shutting down threadpool executor {self.title}')
145         print(self.histogram)
146         self._thread_pool_executor.shutdown(wait)
147
148
149 class ProcessExecutor(BaseExecutor):
150     def __init__(self, max_workers=None):
151         super().__init__()
152         workers = None
153         if max_workers is not None:
154             workers = max_workers
155         elif 'executors_processpool_size' in config.config:
156             workers = config.config['executors_processpool_size']
157         logger.debug(f'Creating processpool executor with {workers} workers.')
158         self._process_executor = fut.ProcessPoolExecutor(
159             max_workers=workers,
160         )
161
162     # This is run in another process; do not adjust task count here.
163     def run_cloud_pickle(self, pickle):
164         fun, args, kwargs = cloudpickle.loads(pickle)
165         logger.debug(f"Running pickled bundle at {fun.__name__}")
166         result = fun(*args, **kwargs)
167         return result
168
169     @overrides
170     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
171         start = time.time()
172         self.adjust_task_count(+1)
173         pickle = make_cloud_pickle(function, *args, **kwargs)
174         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
175         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
176         result.add_done_callback(lambda _: self.adjust_task_count(-1))
177         return result
178
179     @overrides
180     def shutdown(self, wait=True) -> None:
181         logger.debug(f'Shutting down processpool executor {self.title}')
182         self._process_executor.shutdown(wait)
183         print(self.histogram)
184
185     def __getstate__(self):
186         state = self.__dict__.copy()
187         state['_process_executor'] = None
188         return state
189
190
191 class RemoteExecutorException(Exception):
192     """Thrown when a bundle cannot be executed despite several retries."""
193
194     pass
195
196
197 @dataclass
198 class RemoteWorkerRecord:
199     username: str
200     machine: str
201     weight: int
202     count: int
203
204     def __hash__(self):
205         return hash((self.username, self.machine))
206
207     def __repr__(self):
208         return f'{self.username}@{self.machine}'
209
210
211 @dataclass
212 class BundleDetails:
213     pickled_code: bytes
214     uuid: str
215     fname: str
216     worker: Optional[RemoteWorkerRecord]
217     username: Optional[str]
218     machine: Optional[str]
219     hostname: str
220     code_file: str
221     result_file: str
222     pid: int
223     start_ts: float
224     end_ts: float
225     slower_than_local_p95: bool
226     slower_than_global_p95: bool
227     src_bundle: BundleDetails
228     is_cancelled: threading.Event
229     was_cancelled: bool
230     backup_bundles: Optional[List[BundleDetails]]
231     failure_count: int
232
233     def __repr__(self):
234         uuid = self.uuid
235         if uuid[-9:-2] == '_backup':
236             uuid = uuid[:-9]
237             suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
238         else:
239             suffix = uuid[-6:]
240
241         colorz = [
242             fg('violet red'),
243             fg('red'),
244             fg('orange'),
245             fg('peach orange'),
246             fg('yellow'),
247             fg('marigold yellow'),
248             fg('green yellow'),
249             fg('tea green'),
250             fg('cornflower blue'),
251             fg('turquoise blue'),
252             fg('tropical blue'),
253             fg('lavender purple'),
254             fg('medium purple'),
255         ]
256         c = colorz[int(uuid[-2:], 16) % len(colorz)]
257         fname = self.fname if self.fname is not None else 'nofname'
258         machine = self.machine if self.machine is not None else 'nomachine'
259         return f'{c}{suffix}/{fname}/{machine}{reset()}'
260
261
262 class RemoteExecutorStatus:
263     def __init__(self, total_worker_count: int) -> None:
264         self.worker_count: int = total_worker_count
265         self.known_workers: Set[RemoteWorkerRecord] = set()
266         self.start_time: float = time.time()
267         self.start_per_bundle: Dict[str, float] = defaultdict(float)
268         self.end_per_bundle: Dict[str, float] = defaultdict(float)
269         self.finished_bundle_timings_per_worker: Dict[
270             RemoteWorkerRecord, List[float]
271         ] = {}
272         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
273         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
274         self.finished_bundle_timings: List[float] = []
275         self.last_periodic_dump: Optional[float] = None
276         self.total_bundles_submitted: int = 0
277
278         # Protects reads and modification using self.  Also used
279         # as a memory fence for modifications to bundle.
280         self.lock: threading.Lock = threading.Lock()
281
282     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
283         with self.lock:
284             self.record_acquire_worker_already_locked(worker, uuid)
285
286     def record_acquire_worker_already_locked(
287         self, worker: RemoteWorkerRecord, uuid: str
288     ) -> None:
289         assert self.lock.locked()
290         self.known_workers.add(worker)
291         self.start_per_bundle[uuid] = None
292         x = self.in_flight_bundles_by_worker.get(worker, set())
293         x.add(uuid)
294         self.in_flight_bundles_by_worker[worker] = x
295
296     def record_bundle_details(self, details: BundleDetails) -> None:
297         with self.lock:
298             self.record_bundle_details_already_locked(details)
299
300     def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
301         assert self.lock.locked()
302         self.bundle_details_by_uuid[details.uuid] = details
303
304     def record_release_worker(
305         self,
306         worker: RemoteWorkerRecord,
307         uuid: str,
308         was_cancelled: bool,
309     ) -> None:
310         with self.lock:
311             self.record_release_worker_already_locked(worker, uuid, was_cancelled)
312
313     def record_release_worker_already_locked(
314         self,
315         worker: RemoteWorkerRecord,
316         uuid: str,
317         was_cancelled: bool,
318     ) -> None:
319         assert self.lock.locked()
320         ts = time.time()
321         self.end_per_bundle[uuid] = ts
322         self.in_flight_bundles_by_worker[worker].remove(uuid)
323         if not was_cancelled:
324             bundle_latency = ts - self.start_per_bundle[uuid]
325             x = self.finished_bundle_timings_per_worker.get(worker, list())
326             x.append(bundle_latency)
327             self.finished_bundle_timings_per_worker[worker] = x
328             self.finished_bundle_timings.append(bundle_latency)
329
330     def record_processing_began(self, uuid: str):
331         with self.lock:
332             self.start_per_bundle[uuid] = time.time()
333
334     def total_in_flight(self) -> int:
335         assert self.lock.locked()
336         total_in_flight = 0
337         for worker in self.known_workers:
338             total_in_flight += len(self.in_flight_bundles_by_worker[worker])
339         return total_in_flight
340
341     def total_idle(self) -> int:
342         assert self.lock.locked()
343         return self.worker_count - self.total_in_flight()
344
345     def __repr__(self):
346         assert self.lock.locked()
347         ts = time.time()
348         total_finished = len(self.finished_bundle_timings)
349         total_in_flight = self.total_in_flight()
350         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
351         qall = None
352         if len(self.finished_bundle_timings) > 1:
353             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
354             ret += (
355                 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
356                 f'✅={total_finished}/{self.total_bundles_submitted}, '
357                 f'💻n={total_in_flight}/{self.worker_count}\n'
358             )
359         else:
360             ret += (
361                 f'⏱={ts-self.start_time:.1f}s, '
362                 f'✅={total_finished}/{self.total_bundles_submitted}, '
363                 f'💻n={total_in_flight}/{self.worker_count}\n'
364             )
365
366         for worker in self.known_workers:
367             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
368             timings = self.finished_bundle_timings_per_worker.get(worker, [])
369             count = len(timings)
370             qworker = None
371             if count > 1:
372                 qworker = numpy.quantile(timings, [0.5, 0.95])
373                 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
374             else:
375                 ret += '\n'
376             if count > 0:
377                 ret += f'    ...finished {count} total bundle(s) so far\n'
378             in_flight = len(self.in_flight_bundles_by_worker[worker])
379             if in_flight > 0:
380                 ret += f'    ...{in_flight} bundles currently in flight:\n'
381                 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
382                     details = self.bundle_details_by_uuid.get(bundle_uuid, None)
383                     pid = str(details.pid) if (details and details.pid != 0) else "TBD"
384                     if self.start_per_bundle[bundle_uuid] is not None:
385                         sec = ts - self.start_per_bundle[bundle_uuid]
386                         ret += f'       (pid={pid}): {details} for {sec:.1f}s so far '
387                     else:
388                         ret += f'       {details} setting up / copying data...'
389                         sec = 0.0
390
391                     if qworker is not None:
392                         if sec > qworker[1]:
393                             ret += f'{bg("red")}>💻p95{reset()} '
394                             if details is not None:
395                                 details.slower_than_local_p95 = True
396                         else:
397                             if details is not None:
398                                 details.slower_than_local_p95 = False
399
400                     if qall is not None:
401                         if sec > qall[1]:
402                             ret += f'{bg("red")}>∀p95{reset()} '
403                             if details is not None:
404                                 details.slower_than_global_p95 = True
405                         else:
406                             details.slower_than_global_p95 = False
407                     ret += '\n'
408         return ret
409
410     def periodic_dump(self, total_bundles_submitted: int) -> None:
411         assert self.lock.locked()
412         self.total_bundles_submitted = total_bundles_submitted
413         ts = time.time()
414         if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
415             print(self)
416             self.last_periodic_dump = ts
417
418
419 class RemoteWorkerSelectionPolicy(ABC):
420     def register_worker_pool(self, workers):
421         self.workers = workers
422
423     @abstractmethod
424     def is_worker_available(self) -> bool:
425         pass
426
427     @abstractmethod
428     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
429         pass
430
431
432 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
433     @overrides
434     def is_worker_available(self) -> bool:
435         for worker in self.workers:
436             if worker.count > 0:
437                 return True
438         return False
439
440     @overrides
441     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
442         grabbag = []
443         for worker in self.workers:
444             if worker.machine != machine_to_avoid:
445                 if worker.count > 0:
446                     for _ in range(worker.count * worker.weight):
447                         grabbag.append(worker)
448
449         if len(grabbag) == 0:
450             logger.debug(
451                 f'There are no available workers that avoid {machine_to_avoid}...'
452             )
453             for worker in self.workers:
454                 if worker.count > 0:
455                     for _ in range(worker.count * worker.weight):
456                         grabbag.append(worker)
457
458         if len(grabbag) == 0:
459             logger.warning('There are no available workers?!')
460             return None
461
462         worker = random.sample(grabbag, 1)[0]
463         assert worker.count > 0
464         worker.count -= 1
465         logger.debug(f'Chose worker {worker}')
466         return worker
467
468
469 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
470     def __init__(self) -> None:
471         self.index = 0
472
473     @overrides
474     def is_worker_available(self) -> bool:
475         for worker in self.workers:
476             if worker.count > 0:
477                 return True
478         return False
479
480     @overrides
481     def acquire_worker(
482         self, machine_to_avoid: str = None
483     ) -> Optional[RemoteWorkerRecord]:
484         x = self.index
485         while True:
486             worker = self.workers[x]
487             if worker.count > 0:
488                 worker.count -= 1
489                 x += 1
490                 if x >= len(self.workers):
491                     x = 0
492                 self.index = x
493                 logger.debug(f'Selected worker {worker}')
494                 return worker
495             x += 1
496             if x >= len(self.workers):
497                 x = 0
498             if x == self.index:
499                 msg = 'Unexpectedly could not find a worker, retrying...'
500                 logger.warning(msg)
501                 return None
502
503
504 class RemoteExecutor(BaseExecutor):
505     def __init__(
506         self,
507         workers: List[RemoteWorkerRecord],
508         policy: RemoteWorkerSelectionPolicy,
509     ) -> None:
510         super().__init__()
511         self.workers = workers
512         self.policy = policy
513         self.worker_count = 0
514         for worker in self.workers:
515             self.worker_count += worker.count
516         if self.worker_count <= 0:
517             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
518             logger.critical(msg)
519             raise RemoteExecutorException(msg)
520         self.policy.register_worker_pool(self.workers)
521         self.cv = threading.Condition()
522         logger.debug(
523             f'Creating {self.worker_count} local threads, one per remote worker.'
524         )
525         self._helper_executor = fut.ThreadPoolExecutor(
526             thread_name_prefix="remote_executor_helper",
527             max_workers=self.worker_count,
528         )
529         self.status = RemoteExecutorStatus(self.worker_count)
530         self.total_bundles_submitted = 0
531         self.backup_lock = threading.Lock()
532         self.last_backup = None
533         (
534             self.heartbeat_thread,
535             self.heartbeat_stop_event,
536         ) = self.run_periodic_heartbeat()
537
538     @background_thread
539     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
540         while not stop_event.is_set():
541             time.sleep(5.0)
542             logger.debug('Running periodic heartbeat code...')
543             self.heartbeat()
544         logger.debug('Periodic heartbeat thread shutting down.')
545
546     def heartbeat(self) -> None:
547         # Note: this is invoked on a background thread, not an
548         # executor thread.  Be careful what you do with it b/c it
549         # needs to get back and dump status again periodically.
550         with self.status.lock:
551             self.status.periodic_dump(self.total_bundles_submitted)
552
553             # Look for bundles to reschedule via executor.submit
554             if config.config['executors_schedule_remote_backups']:
555                 self.maybe_schedule_backup_bundles()
556
557     def maybe_schedule_backup_bundles(self):
558         assert self.status.lock.locked()
559         num_done = len(self.status.finished_bundle_timings)
560         num_idle_workers = self.worker_count - self.task_count
561         now = time.time()
562         if (
563             num_done > 2
564             and num_idle_workers > 1
565             and (self.last_backup is None or (now - self.last_backup > 9.0))
566             and self.backup_lock.acquire(blocking=False)
567         ):
568             try:
569                 assert self.backup_lock.locked()
570
571                 bundle_to_backup = None
572                 best_score = None
573                 for (
574                     worker,
575                     bundle_uuids,
576                 ) in self.status.in_flight_bundles_by_worker.items():
577
578                     # Prefer to schedule backups of bundles running on
579                     # slower machines.
580                     base_score = 0
581                     for record in self.workers:
582                         if worker.machine == record.machine:
583                             base_score = float(record.weight)
584                             base_score = 1.0 / base_score
585                             base_score *= 200.0
586                             base_score = int(base_score)
587                             break
588
589                     for uuid in bundle_uuids:
590                         bundle = self.status.bundle_details_by_uuid.get(uuid, None)
591                         if (
592                             bundle is not None
593                             and bundle.src_bundle is None
594                             and bundle.backup_bundles is not None
595                         ):
596                             score = base_score
597
598                             # Schedule backups of bundles running
599                             # longer; especially those that are
600                             # unexpectedly slow.
601                             start_ts = self.status.start_per_bundle[uuid]
602                             if start_ts is not None:
603                                 runtime = now - start_ts
604                                 score += runtime
605                                 logger.debug(
606                                     f'score[{bundle}] => {score}  # latency boost'
607                                 )
608
609                                 if bundle.slower_than_local_p95:
610                                     score += runtime / 2
611                                     logger.debug(
612                                         f'score[{bundle}] => {score}  # >worker p95'
613                                     )
614
615                                 if bundle.slower_than_global_p95:
616                                     score += runtime / 4
617                                     logger.debug(
618                                         f'score[{bundle}] => {score}  # >global p95'
619                                     )
620
621                             # Prefer backups of bundles that don't
622                             # have backups already.
623                             backup_count = len(bundle.backup_bundles)
624                             if backup_count == 0:
625                                 score *= 2
626                             elif backup_count == 1:
627                                 score /= 2
628                             elif backup_count == 2:
629                                 score /= 8
630                             else:
631                                 score = 0
632                             logger.debug(
633                                 f'score[{bundle}] => {score}  # {backup_count} dup backup factor'
634                             )
635
636                             if score != 0 and (
637                                 best_score is None or score > best_score
638                             ):
639                                 bundle_to_backup = bundle
640                                 assert bundle is not None
641                                 assert bundle.backup_bundles is not None
642                                 assert bundle.src_bundle is None
643                                 best_score = score
644
645                 # Note: this is all still happening on the heartbeat
646                 # runner thread.  That's ok because
647                 # schedule_backup_for_bundle uses the executor to
648                 # submit the bundle again which will cause it to be
649                 # picked up by a worker thread and allow this thread
650                 # to return to run future heartbeats.
651                 if bundle_to_backup is not None:
652                     self.last_backup = now
653                     logger.info(
654                         f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
655                     )
656                     self.schedule_backup_for_bundle(bundle_to_backup)
657             finally:
658                 self.backup_lock.release()
659
660     def is_worker_available(self) -> bool:
661         return self.policy.is_worker_available()
662
663     def acquire_worker(
664         self, machine_to_avoid: str = None
665     ) -> Optional[RemoteWorkerRecord]:
666         return self.policy.acquire_worker(machine_to_avoid)
667
668     def find_available_worker_or_block(
669         self, machine_to_avoid: str = None
670     ) -> RemoteWorkerRecord:
671         with self.cv:
672             while not self.is_worker_available():
673                 self.cv.wait()
674             worker = self.acquire_worker(machine_to_avoid)
675             if worker is not None:
676                 return worker
677         msg = "We should never reach this point in the code"
678         logger.critical(msg)
679         raise Exception(msg)
680
681     def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
682         worker = bundle.worker
683         assert worker is not None
684         logger.debug(f'Released worker {worker}')
685         self.status.record_release_worker(
686             worker,
687             bundle.uuid,
688             was_cancelled,
689         )
690         with self.cv:
691             worker.count += 1
692             self.cv.notify()
693         self.adjust_task_count(-1)
694
695     def check_if_cancelled(self, bundle: BundleDetails) -> bool:
696         with self.status.lock:
697             if bundle.is_cancelled.wait(timeout=0.0):
698                 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
699                 bundle.was_cancelled = True
700                 return True
701         return False
702
703     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
704         """Find a worker for bundle or block until one is available."""
705         self.adjust_task_count(+1)
706         uuid = bundle.uuid
707         hostname = bundle.hostname
708         avoid_machine = override_avoid_machine
709         is_original = bundle.src_bundle is None
710
711         # Try not to schedule a backup on the same host as the original.
712         if avoid_machine is None and bundle.src_bundle is not None:
713             avoid_machine = bundle.src_bundle.machine
714         worker = None
715         while worker is None:
716             worker = self.find_available_worker_or_block(avoid_machine)
717         assert worker
718
719         # Ok, found a worker.
720         bundle.worker = worker
721         machine = bundle.machine = worker.machine
722         username = bundle.username = worker.username
723         self.status.record_acquire_worker(worker, uuid)
724         logger.debug(f'{bundle}: Running bundle on {worker}...')
725
726         # Before we do any work, make sure the bundle is still viable.
727         # It may have been some time between when it was submitted and
728         # now due to lack of worker availability and someone else may
729         # have already finished it.
730         if self.check_if_cancelled(bundle):
731             try:
732                 return self.process_work_result(bundle)
733             except Exception as e:
734                 logger.warning(
735                     f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
736                 )
737                 self.release_worker(bundle)
738                 if is_original:
739                     # Weird.  We are the original owner of this
740                     # bundle.  For it to have been cancelled, a backup
741                     # must have already started and completed before
742                     # we even for started.  Moreover, the backup says
743                     # it is done but we can't find the results it
744                     # should have copied over.  Reschedule the whole
745                     # thing.
746                     logger.exception(e)
747                     logger.error(
748                         f'{bundle}: We are the original owner thread and yet there are '
749                         + 'no results for this bundle.  This is unexpected and bad.'
750                     )
751                     return self.emergency_retry_nasty_bundle(bundle)
752                 else:
753                     # Expected(?).  We're a backup and our bundle is
754                     # cancelled before we even got started.  Something
755                     # went bad in process_work_result (I acutually don't
756                     # see what?) but probably not worth worrying
757                     # about.  Let the original thread worry about
758                     # either finding the results or complaining about
759                     # it.
760                     return None
761
762         # Send input code / data to worker machine if it's not local.
763         if hostname not in machine:
764             try:
765                 cmd = (
766                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
767                 )
768                 start_ts = time.time()
769                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
770                 run_silently(cmd)
771                 xfer_latency = time.time() - start_ts
772                 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
773             except Exception as e:
774                 self.release_worker(bundle)
775                 if is_original:
776                     # Weird.  We tried to copy the code to the worker and it failed...
777                     # And we're the original bundle.  We have to retry.
778                     logger.exception(e)
779                     logger.error(
780                         f"{bundle}: Failed to send instructions to the worker machine?! "
781                         + "This is not expected; we\'re the original bundle so this shouldn\'t "
782                         + "be a race condition.  Attempting an emergency retry..."
783                     )
784                     return self.emergency_retry_nasty_bundle(bundle)
785                 else:
786                     # This is actually expected; we're a backup.
787                     # There's a race condition where someone else
788                     # already finished the work and removed the source
789                     # code file before we could copy it.  No biggie.
790                     msg = f'{bundle}: Failed to send instructions to the worker machine... '
791                     msg += 'We\'re a backup and this may be caused by the original (or some '
792                     msg += 'other backup) already finishing this work.  Ignoring this.'
793                     logger.warning(msg)
794                     return None
795
796         # Kick off the work.  Note that if this fails we let
797         # wait_for_process deal with it.
798         self.status.record_processing_began(uuid)
799         cmd = (
800             f'{SSH} {bundle.username}@{bundle.machine} '
801             f'"source py38-venv/bin/activate &&'
802             f' /home/scott/lib/python_modules/remote_worker.py'
803             f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
804         )
805         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
806         p = cmd_in_background(cmd, silent=True)
807         bundle.pid = p.pid
808         logger.debug(
809             f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
810         )
811         return self.wait_for_process(p, bundle, 0)
812
813     def wait_for_process(
814         self, p: subprocess.Popen, bundle: BundleDetails, depth: int
815     ) -> Any:
816         machine = bundle.machine
817         pid = p.pid
818         if depth > 3:
819             logger.error(
820                 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
821             )
822             p.terminate()
823             self.release_worker(bundle)
824             return self.emergency_retry_nasty_bundle(bundle)
825
826         # Spin until either the ssh job we scheduled finishes the
827         # bundle or some backup worker signals that they finished it
828         # before we could.
829         while True:
830             try:
831                 p.wait(timeout=0.25)
832             except subprocess.TimeoutExpired:
833                 if self.check_if_cancelled(bundle):
834                     logger.info(
835                         f'{bundle}: looks like another worker finished bundle...'
836                     )
837                     break
838             else:
839                 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
840                 p = None
841                 break
842
843         # If we get here we believe the bundle is done; either the ssh
844         # subprocess finished (hopefully successfully) or we noticed
845         # that some other worker seems to have completed the bundle
846         # and we're bailing out.
847         try:
848             ret = self.process_work_result(bundle)
849             if ret is not None and p is not None:
850                 p.terminate()
851             return ret
852
853         # Something went wrong; e.g. we could not copy the results
854         # back, cleanup after ourselves on the remote machine, or
855         # unpickle the results we got from the remove machine.  If we
856         # still have an active ssh subprocess, keep waiting on it.
857         # Otherwise, time for an emergency reschedule.
858         except Exception as e:
859             logger.exception(e)
860             logger.error(f'{bundle}: Something unexpected just happened...')
861             if p is not None:
862                 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
863                 logger.warning(msg)
864                 return self.wait_for_process(p, bundle, depth + 1)
865             else:
866                 self.release_worker(bundle)
867                 return self.emergency_retry_nasty_bundle(bundle)
868
869     def process_work_result(self, bundle: BundleDetails) -> Any:
870         with self.status.lock:
871             is_original = bundle.src_bundle is None
872             was_cancelled = bundle.was_cancelled
873             username = bundle.username
874             machine = bundle.machine
875             result_file = bundle.result_file
876             code_file = bundle.code_file
877
878             # Whether original or backup, if we finished first we must
879             # fetch the results if the computation happened on a
880             # remote machine.
881             bundle.end_ts = time.time()
882             if not was_cancelled:
883                 assert bundle.machine is not None
884                 if bundle.hostname not in bundle.machine:
885                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
886                     logger.info(
887                         f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
888                     )
889
890                     # If either of these throw they are handled in
891                     # wait_for_process.
892                     attempts = 0
893                     while True:
894                         try:
895                             run_silently(cmd)
896                         except Exception as e:
897                             attempts += 1
898                             if attempts >= 3:
899                                 raise (e)
900                         else:
901                             break
902
903                     run_silently(
904                         f'{SSH} {username}@{machine}'
905                         f' "/bin/rm -f {code_file} {result_file}"'
906                     )
907                     logger.debug(
908                         f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
909                     )
910                 dur = bundle.end_ts - bundle.start_ts
911                 self.histogram.add_item(dur)
912
913         # Only the original worker should unpickle the file contents
914         # though since it's the only one whose result matters.  The
915         # original is also the only job that may delete result_file
916         # from disk.  Note that the original may have been cancelled
917         # if one of the backups finished first; it still must read the
918         # result from disk.
919         if is_original:
920             logger.debug(f"{bundle}: Unpickling {result_file}.")
921             try:
922                 with open(result_file, 'rb') as rb:
923                     serialized = rb.read()
924                 result = cloudpickle.loads(serialized)
925             except Exception as e:
926                 logger.exception(e)
927                 msg = f'Failed to load {result_file}... this is bad news.'
928                 logger.critical(msg)
929                 self.release_worker(bundle)
930
931                 # Re-raise the exception; the code in wait_for_process may
932                 # decide to emergency_retry_nasty_bundle here.
933                 raise Exception(e)
934             logger.debug(f'Removing local (master) {code_file} and {result_file}.')
935             os.remove(f'{result_file}')
936             os.remove(f'{code_file}')
937
938             # Notify any backups that the original is done so they
939             # should stop ASAP.  Do this whether or not we
940             # finished first since there could be more than one
941             # backup.
942             if bundle.backup_bundles is not None:
943                 for backup in bundle.backup_bundles:
944                     logger.debug(
945                         f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
946                     )
947                     backup.is_cancelled.set()
948
949         # This is a backup job and, by now, we have already fetched
950         # the bundle results.
951         else:
952             # Backup results don't matter, they just need to leave the
953             # result file in the right place for their originals to
954             # read/unpickle later.
955             result = None
956
957             # Tell the original to stop if we finished first.
958             if not was_cancelled:
959                 logger.debug(
960                     f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
961                 )
962                 bundle.src_bundle.is_cancelled.set()
963         self.release_worker(bundle, was_cancelled=was_cancelled)
964         return result
965
966     def create_original_bundle(self, pickle, fname: str):
967         from string_utils import generate_uuid
968
969         uuid = generate_uuid(omit_dashes=True)
970         code_file = f'/tmp/{uuid}.code.bin'
971         result_file = f'/tmp/{uuid}.result.bin'
972
973         logger.debug(f'Writing pickled code to {code_file}')
974         with open(f'{code_file}', 'wb') as wb:
975             wb.write(pickle)
976
977         bundle = BundleDetails(
978             pickled_code=pickle,
979             uuid=uuid,
980             fname=fname,
981             worker=None,
982             username=None,
983             machine=None,
984             hostname=platform.node(),
985             code_file=code_file,
986             result_file=result_file,
987             pid=0,
988             start_ts=time.time(),
989             end_ts=0.0,
990             slower_than_local_p95=False,
991             slower_than_global_p95=False,
992             src_bundle=None,
993             is_cancelled=threading.Event(),
994             was_cancelled=False,
995             backup_bundles=[],
996             failure_count=0,
997         )
998         self.status.record_bundle_details(bundle)
999         logger.debug(f'{bundle}: Created an original bundle')
1000         return bundle
1001
1002     def create_backup_bundle(self, src_bundle: BundleDetails):
1003         assert src_bundle.backup_bundles is not None
1004         n = len(src_bundle.backup_bundles)
1005         uuid = src_bundle.uuid + f'_backup#{n}'
1006
1007         backup_bundle = BundleDetails(
1008             pickled_code=src_bundle.pickled_code,
1009             uuid=uuid,
1010             fname=src_bundle.fname,
1011             worker=None,
1012             username=None,
1013             machine=None,
1014             hostname=src_bundle.hostname,
1015             code_file=src_bundle.code_file,
1016             result_file=src_bundle.result_file,
1017             pid=0,
1018             start_ts=time.time(),
1019             end_ts=0.0,
1020             slower_than_local_p95=False,
1021             slower_than_global_p95=False,
1022             src_bundle=src_bundle,
1023             is_cancelled=threading.Event(),
1024             was_cancelled=False,
1025             backup_bundles=None,  # backup backups not allowed
1026             failure_count=0,
1027         )
1028         src_bundle.backup_bundles.append(backup_bundle)
1029         self.status.record_bundle_details_already_locked(backup_bundle)
1030         logger.debug(f'{backup_bundle}: Created a backup bundle')
1031         return backup_bundle
1032
1033     def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1034         assert self.status.lock.locked()
1035         assert src_bundle is not None
1036         backup_bundle = self.create_backup_bundle(src_bundle)
1037         logger.debug(
1038             f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1039         )
1040         self._helper_executor.submit(self.launch, backup_bundle)
1041
1042         # Results from backups don't matter; if they finish first
1043         # they will move the result_file to this machine and let
1044         # the original pick them up and unpickle them.
1045
1046     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1047         is_original = bundle.src_bundle is None
1048         bundle.worker = None
1049         avoid_last_machine = bundle.machine
1050         bundle.machine = None
1051         bundle.username = None
1052         bundle.failure_count += 1
1053         if is_original:
1054             retry_limit = 3
1055         else:
1056             retry_limit = 2
1057
1058         if bundle.failure_count > retry_limit:
1059             logger.error(
1060                 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1061             )
1062             if is_original:
1063                 raise RemoteExecutorException(
1064                     f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1065                 )
1066             else:
1067                 logger.error(
1068                     f'{bundle}: At least it\'s only a backup; better luck with the others.'
1069                 )
1070             return None
1071         else:
1072             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1073             logger.warning(msg)
1074             warnings.warn(msg)
1075             return self.launch(bundle, avoid_last_machine)
1076
1077     @overrides
1078     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1079         pickle = make_cloud_pickle(function, *args, **kwargs)
1080         bundle = self.create_original_bundle(pickle, function.__name__)
1081         self.total_bundles_submitted += 1
1082         return self._helper_executor.submit(self.launch, bundle)
1083
1084     @overrides
1085     def shutdown(self, wait=True) -> None:
1086         logging.debug(f'Shutting down RemoteExecutor {self.title}')
1087         self.heartbeat_stop_event.set()
1088         self.heartbeat_thread.join()
1089         self._helper_executor.shutdown(wait)
1090         print(self.histogram)
1091
1092
1093 @singleton
1094 class DefaultExecutors(object):
1095     def __init__(self):
1096         self.thread_executor: Optional[ThreadExecutor] = None
1097         self.process_executor: Optional[ProcessExecutor] = None
1098         self.remote_executor: Optional[RemoteExecutor] = None
1099
1100     def ping(self, host) -> bool:
1101         logger.debug(f'RUN> ping -c 1 {host}')
1102         try:
1103             x = cmd_with_timeout(
1104                 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1105             )
1106             return x == 0
1107         except Exception:
1108             return False
1109
1110     def thread_pool(self) -> ThreadExecutor:
1111         if self.thread_executor is None:
1112             self.thread_executor = ThreadExecutor()
1113         return self.thread_executor
1114
1115     def process_pool(self) -> ProcessExecutor:
1116         if self.process_executor is None:
1117             self.process_executor = ProcessExecutor()
1118         return self.process_executor
1119
1120     def remote_pool(self) -> RemoteExecutor:
1121         if self.remote_executor is None:
1122             logger.info('Looking for some helper machines...')
1123             pool: List[RemoteWorkerRecord] = []
1124             if self.ping('cheetah.house'):
1125                 logger.info('Found cheetah.house')
1126                 pool.append(
1127                     RemoteWorkerRecord(
1128                         username='scott',
1129                         machine='cheetah.house',
1130                         weight=30,
1131                         count=6,
1132                     ),
1133                 )
1134             if self.ping('meerkat.cabin'):
1135                 logger.info('Found meerkat.cabin')
1136                 pool.append(
1137                     RemoteWorkerRecord(
1138                         username='scott',
1139                         machine='meerkat.cabin',
1140                         weight=12,
1141                         count=2,
1142                     ),
1143                 )
1144             if self.ping('wannabe.house'):
1145                 logger.info('Found wannabe.house')
1146                 pool.append(
1147                     RemoteWorkerRecord(
1148                         username='scott',
1149                         machine='wannabe.house',
1150                         weight=25,
1151                         count=10,
1152                     ),
1153                 )
1154             if self.ping('puma.cabin'):
1155                 logger.info('Found puma.cabin')
1156                 pool.append(
1157                     RemoteWorkerRecord(
1158                         username='scott',
1159                         machine='puma.cabin',
1160                         weight=30,
1161                         count=6,
1162                     ),
1163                 )
1164             if self.ping('backup.house'):
1165                 logger.info('Found backup.house')
1166                 pool.append(
1167                     RemoteWorkerRecord(
1168                         username='scott',
1169                         machine='backup.house',
1170                         weight=8,
1171                         count=2,
1172                     ),
1173                 )
1174
1175             # The controller machine has a lot to do; go easy on it.
1176             for record in pool:
1177                 if record.machine == platform.node() and record.count > 1:
1178                     logger.info(f'Reducing workload for {record.machine}.')
1179                     record.count = 1
1180
1181             policy = WeightedRandomRemoteWorkerSelectionPolicy()
1182             policy.register_worker_pool(pool)
1183             self.remote_executor = RemoteExecutor(pool, policy)
1184         return self.remote_executor
1185
1186     def shutdown(self) -> None:
1187         if self.thread_executor is not None:
1188             self.thread_executor.shutdown()
1189             self.thread_executor = None
1190         if self.process_executor is not None:
1191             self.process_executor.shutdown()
1192             self.process_executor = None
1193         if self.remote_executor is not None:
1194             self.remote_executor.shutdown()
1195             self.remote_executor = None