3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
20 import cloudpickle # type: ignore
21 from overrides import overrides
23 from ansi import bg, fg, underline, reset
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
32 logger = logging.getLogger(__name__)
34 parser = config.add_commandline_args(
35 f"Executors ({__file__})", "Args related to processing executors."
38 '--executors_threadpool_size',
41 help='Number of threads in the default threadpool, leave unset for default',
45 '--executors_processpool_size',
48 help='Number of processes in the default processpool, leave unset for default',
52 '--executors_schedule_remote_backups',
54 action=argparse_utils.ActionNoYes,
55 help='Should we schedule duplicative backup work if a remote bundle is slow',
58 '--executors_max_bundle_failures',
62 help='Maximum number of failures before giving up on a bundle',
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
69 def make_cloud_pickle(fun, *args, **kwargs):
70 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71 return cloudpickle.dumps((fun, args, kwargs))
74 class BaseExecutor(ABC):
75 def __init__(self, *, title=''):
77 self.histogram = hist.SimpleHistogram(
78 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
83 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
87 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
90 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
91 """Shutdown the executor and return True if the executor is idle
92 (i.e. there are no pending or active tasks). Return False
93 otherwise. Note: this should only be called by the launcher
97 if self.task_count == 0:
98 self.shutdown(wait=True, quiet=quiet)
102 def adjust_task_count(self, delta: int) -> None:
103 """Change the task count. Note: do not call this method from a
104 worker, it should only be called by the launcher process /
108 self.task_count += delta
109 logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
111 def get_task_count(self) -> int:
112 """Change the task count. Note: do not call this method from a
113 worker, it should only be called by the launcher process /
117 return self.task_count
120 class ThreadExecutor(BaseExecutor):
121 def __init__(self, max_workers: Optional[int] = None):
124 if max_workers is not None:
125 workers = max_workers
126 elif 'executors_threadpool_size' in config.config:
127 workers = config.config['executors_threadpool_size']
128 logger.debug(f'Creating threadpool executor with {workers} workers')
129 self._thread_pool_executor = fut.ThreadPoolExecutor(
130 max_workers=workers, thread_name_prefix="thread_executor_helper"
132 self.already_shutdown = False
134 # This is run on a different thread; do not adjust task count here.
135 def run_local_bundle(self, fun, *args, **kwargs):
136 logger.debug(f"Running local bundle at {fun.__name__}")
137 result = fun(*args, **kwargs)
141 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142 if self.already_shutdown:
143 raise Exception('Submitted work after shutdown.')
144 self.adjust_task_count(+1)
146 newargs.append(function)
150 result = self._thread_pool_executor.submit(
151 self.run_local_bundle, *newargs, **kwargs
153 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
154 result.add_done_callback(lambda _: self.adjust_task_count(-1))
158 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
159 if not self.already_shutdown:
160 logger.debug(f'Shutting down threadpool executor {self.title}')
161 self._thread_pool_executor.shutdown(wait)
163 print(self.histogram.__repr__(label_formatter='%ds'))
164 self.already_shutdown = True
167 class ProcessExecutor(BaseExecutor):
168 def __init__(self, max_workers=None):
171 if max_workers is not None:
172 workers = max_workers
173 elif 'executors_processpool_size' in config.config:
174 workers = config.config['executors_processpool_size']
175 logger.debug(f'Creating processpool executor with {workers} workers.')
176 self._process_executor = fut.ProcessPoolExecutor(
179 self.already_shutdown = False
181 # This is run in another process; do not adjust task count here.
182 def run_cloud_pickle(self, pickle):
183 fun, args, kwargs = cloudpickle.loads(pickle)
184 logger.debug(f"Running pickled bundle at {fun.__name__}")
185 result = fun(*args, **kwargs)
189 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
190 if self.already_shutdown:
191 raise Exception('Submitted work after shutdown.')
193 self.adjust_task_count(+1)
194 pickle = make_cloud_pickle(function, *args, **kwargs)
195 result = self._process_executor.submit(self.run_cloud_pickle, pickle)
196 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
197 result.add_done_callback(lambda _: self.adjust_task_count(-1))
201 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
202 if not self.already_shutdown:
203 logger.debug(f'Shutting down processpool executor {self.title}')
204 self._process_executor.shutdown(wait)
206 print(self.histogram.__repr__(label_formatter='%ds'))
207 self.already_shutdown = True
209 def __getstate__(self):
210 state = self.__dict__.copy()
211 state['_process_executor'] = None
215 class RemoteExecutorException(Exception):
216 """Thrown when a bundle cannot be executed despite several retries."""
222 class RemoteWorkerRecord:
229 return hash((self.username, self.machine))
232 return f'{self.username}@{self.machine}'
240 worker: Optional[RemoteWorkerRecord]
241 username: Optional[str]
242 machine: Optional[str]
249 slower_than_local_p95: bool
250 slower_than_global_p95: bool
251 src_bundle: Optional[BundleDetails]
252 is_cancelled: threading.Event
254 backup_bundles: Optional[List[BundleDetails]]
259 if uuid[-9:-2] == '_backup':
261 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
271 fg('marigold yellow'),
274 fg('cornflower blue'),
275 fg('turquoise blue'),
277 fg('lavender purple'),
280 c = colorz[int(uuid[-2:], 16) % len(colorz)]
281 fname = self.fname if self.fname is not None else 'nofname'
282 machine = self.machine if self.machine is not None else 'nomachine'
283 return f'{c}{suffix}/{fname}/{machine}{reset()}'
286 class RemoteExecutorStatus:
287 def __init__(self, total_worker_count: int) -> None:
288 self.worker_count: int = total_worker_count
289 self.known_workers: Set[RemoteWorkerRecord] = set()
290 self.start_time: float = time.time()
291 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
292 self.end_per_bundle: Dict[str, float] = defaultdict(float)
293 self.finished_bundle_timings_per_worker: Dict[
294 RemoteWorkerRecord, List[float]
296 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
297 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
298 self.finished_bundle_timings: List[float] = []
299 self.last_periodic_dump: Optional[float] = None
300 self.total_bundles_submitted: int = 0
302 # Protects reads and modification using self. Also used
303 # as a memory fence for modifications to bundle.
304 self.lock: threading.Lock = threading.Lock()
306 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
308 self.record_acquire_worker_already_locked(worker, uuid)
310 def record_acquire_worker_already_locked(
311 self, worker: RemoteWorkerRecord, uuid: str
313 assert self.lock.locked()
314 self.known_workers.add(worker)
315 self.start_per_bundle[uuid] = None
316 x = self.in_flight_bundles_by_worker.get(worker, set())
318 self.in_flight_bundles_by_worker[worker] = x
320 def record_bundle_details(self, details: BundleDetails) -> None:
322 self.record_bundle_details_already_locked(details)
324 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
325 assert self.lock.locked()
326 self.bundle_details_by_uuid[details.uuid] = details
328 def record_release_worker(
330 worker: RemoteWorkerRecord,
335 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
337 def record_release_worker_already_locked(
339 worker: RemoteWorkerRecord,
343 assert self.lock.locked()
345 self.end_per_bundle[uuid] = ts
346 self.in_flight_bundles_by_worker[worker].remove(uuid)
347 if not was_cancelled:
348 start = self.start_per_bundle[uuid]
350 bundle_latency = ts - start
351 x = self.finished_bundle_timings_per_worker.get(worker, list())
352 x.append(bundle_latency)
353 self.finished_bundle_timings_per_worker[worker] = x
354 self.finished_bundle_timings.append(bundle_latency)
356 def record_processing_began(self, uuid: str):
358 self.start_per_bundle[uuid] = time.time()
360 def total_in_flight(self) -> int:
361 assert self.lock.locked()
363 for worker in self.known_workers:
364 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
365 return total_in_flight
367 def total_idle(self) -> int:
368 assert self.lock.locked()
369 return self.worker_count - self.total_in_flight()
372 assert self.lock.locked()
374 total_finished = len(self.finished_bundle_timings)
375 total_in_flight = self.total_in_flight()
376 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
378 if len(self.finished_bundle_timings) > 1:
379 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
381 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
382 f'✅={total_finished}/{self.total_bundles_submitted}, '
383 f'💻n={total_in_flight}/{self.worker_count}\n'
387 f'⏱={ts-self.start_time:.1f}s, '
388 f'✅={total_finished}/{self.total_bundles_submitted}, '
389 f'💻n={total_in_flight}/{self.worker_count}\n'
392 for worker in self.known_workers:
393 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
394 timings = self.finished_bundle_timings_per_worker.get(worker, [])
398 qworker = numpy.quantile(timings, [0.5, 0.95])
399 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
403 ret += f' ...finished {count} total bundle(s) so far\n'
404 in_flight = len(self.in_flight_bundles_by_worker[worker])
406 ret += f' ...{in_flight} bundles currently in flight:\n'
407 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
408 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
409 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
410 if self.start_per_bundle[bundle_uuid] is not None:
411 sec = ts - self.start_per_bundle[bundle_uuid]
412 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
414 ret += f' {details} setting up / copying data...'
417 if qworker is not None:
419 ret += f'{bg("red")}>💻p95{reset()} '
420 if details is not None:
421 details.slower_than_local_p95 = True
423 if details is not None:
424 details.slower_than_local_p95 = False
428 ret += f'{bg("red")}>∀p95{reset()} '
429 if details is not None:
430 details.slower_than_global_p95 = True
432 details.slower_than_global_p95 = False
436 def periodic_dump(self, total_bundles_submitted: int) -> None:
437 assert self.lock.locked()
438 self.total_bundles_submitted = total_bundles_submitted
440 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
442 self.last_periodic_dump = ts
445 class RemoteWorkerSelectionPolicy(ABC):
446 def register_worker_pool(self, workers):
447 self.workers = workers
450 def is_worker_available(self) -> bool:
454 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
458 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
460 def is_worker_available(self) -> bool:
461 for worker in self.workers:
467 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
469 for worker in self.workers:
470 if worker.machine != machine_to_avoid:
472 for _ in range(worker.count * worker.weight):
473 grabbag.append(worker)
475 if len(grabbag) == 0:
477 f'There are no available workers that avoid {machine_to_avoid}...'
479 for worker in self.workers:
481 for _ in range(worker.count * worker.weight):
482 grabbag.append(worker)
484 if len(grabbag) == 0:
485 logger.warning('There are no available workers?!')
488 worker = random.sample(grabbag, 1)[0]
489 assert worker.count > 0
491 logger.debug(f'Chose worker {worker}')
495 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
496 def __init__(self) -> None:
500 def is_worker_available(self) -> bool:
501 for worker in self.workers:
508 self, machine_to_avoid: str = None
509 ) -> Optional[RemoteWorkerRecord]:
512 worker = self.workers[x]
516 if x >= len(self.workers):
519 logger.debug(f'Selected worker {worker}')
522 if x >= len(self.workers):
525 msg = 'Unexpectedly could not find a worker, retrying...'
530 class RemoteExecutor(BaseExecutor):
533 workers: List[RemoteWorkerRecord],
534 policy: RemoteWorkerSelectionPolicy,
537 self.workers = workers
539 self.worker_count = 0
540 for worker in self.workers:
541 self.worker_count += worker.count
542 if self.worker_count <= 0:
543 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
545 raise RemoteExecutorException(msg)
546 self.policy.register_worker_pool(self.workers)
547 self.cv = threading.Condition()
549 f'Creating {self.worker_count} local threads, one per remote worker.'
551 self._helper_executor = fut.ThreadPoolExecutor(
552 thread_name_prefix="remote_executor_helper",
553 max_workers=self.worker_count,
555 self.status = RemoteExecutorStatus(self.worker_count)
556 self.total_bundles_submitted = 0
557 self.backup_lock = threading.Lock()
558 self.last_backup = None
560 self.heartbeat_thread,
561 self.heartbeat_stop_event,
562 ) = self.run_periodic_heartbeat()
563 self.already_shutdown = False
566 def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
567 while not stop_event.is_set():
569 logger.debug('Running periodic heartbeat code...')
571 logger.debug('Periodic heartbeat thread shutting down.')
573 def heartbeat(self) -> None:
574 # Note: this is invoked on a background thread, not an
575 # executor thread. Be careful what you do with it b/c it
576 # needs to get back and dump status again periodically.
577 with self.status.lock:
578 self.status.periodic_dump(self.total_bundles_submitted)
580 # Look for bundles to reschedule via executor.submit
581 if config.config['executors_schedule_remote_backups']:
582 self.maybe_schedule_backup_bundles()
584 def maybe_schedule_backup_bundles(self):
585 assert self.status.lock.locked()
586 num_done = len(self.status.finished_bundle_timings)
587 num_idle_workers = self.worker_count - self.task_count
591 and num_idle_workers > 1
592 and (self.last_backup is None or (now - self.last_backup > 9.0))
593 and self.backup_lock.acquire(blocking=False)
596 assert self.backup_lock.locked()
598 bundle_to_backup = None
603 ) in self.status.in_flight_bundles_by_worker.items():
605 # Prefer to schedule backups of bundles running on
608 for record in self.workers:
609 if worker.machine == record.machine:
610 base_score = float(record.weight)
611 base_score = 1.0 / base_score
613 base_score = int(base_score)
616 for uuid in bundle_uuids:
617 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
620 and bundle.src_bundle is None
621 and bundle.backup_bundles is not None
625 # Schedule backups of bundles running
626 # longer; especially those that are
628 start_ts = self.status.start_per_bundle[uuid]
629 if start_ts is not None:
630 runtime = now - start_ts
633 f'score[{bundle}] => {score} # latency boost'
636 if bundle.slower_than_local_p95:
639 f'score[{bundle}] => {score} # >worker p95'
642 if bundle.slower_than_global_p95:
645 f'score[{bundle}] => {score} # >global p95'
648 # Prefer backups of bundles that don't
649 # have backups already.
650 backup_count = len(bundle.backup_bundles)
651 if backup_count == 0:
653 elif backup_count == 1:
655 elif backup_count == 2:
660 f'score[{bundle}] => {score} # {backup_count} dup backup factor'
664 best_score is None or score > best_score
666 bundle_to_backup = bundle
667 assert bundle is not None
668 assert bundle.backup_bundles is not None
669 assert bundle.src_bundle is None
672 # Note: this is all still happening on the heartbeat
673 # runner thread. That's ok because
674 # schedule_backup_for_bundle uses the executor to
675 # submit the bundle again which will cause it to be
676 # picked up by a worker thread and allow this thread
677 # to return to run future heartbeats.
678 if bundle_to_backup is not None:
679 self.last_backup = now
681 f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
683 self.schedule_backup_for_bundle(bundle_to_backup)
685 self.backup_lock.release()
687 def is_worker_available(self) -> bool:
688 return self.policy.is_worker_available()
691 self, machine_to_avoid: str = None
692 ) -> Optional[RemoteWorkerRecord]:
693 return self.policy.acquire_worker(machine_to_avoid)
695 def find_available_worker_or_block(
696 self, machine_to_avoid: str = None
697 ) -> RemoteWorkerRecord:
699 while not self.is_worker_available():
701 worker = self.acquire_worker(machine_to_avoid)
702 if worker is not None:
704 msg = "We should never reach this point in the code"
708 def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
709 worker = bundle.worker
710 assert worker is not None
711 logger.debug(f'Released worker {worker}')
712 self.status.record_release_worker(
720 self.adjust_task_count(-1)
722 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
723 with self.status.lock:
724 if bundle.is_cancelled.wait(timeout=0.0):
725 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
726 bundle.was_cancelled = True
730 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
731 """Find a worker for bundle or block until one is available."""
732 self.adjust_task_count(+1)
734 hostname = bundle.hostname
735 avoid_machine = override_avoid_machine
736 is_original = bundle.src_bundle is None
738 # Try not to schedule a backup on the same host as the original.
739 if avoid_machine is None and bundle.src_bundle is not None:
740 avoid_machine = bundle.src_bundle.machine
742 while worker is None:
743 worker = self.find_available_worker_or_block(avoid_machine)
746 # Ok, found a worker.
747 bundle.worker = worker
748 machine = bundle.machine = worker.machine
749 username = bundle.username = worker.username
750 self.status.record_acquire_worker(worker, uuid)
751 logger.debug(f'{bundle}: Running bundle on {worker}...')
753 # Before we do any work, make sure the bundle is still viable.
754 # It may have been some time between when it was submitted and
755 # now due to lack of worker availability and someone else may
756 # have already finished it.
757 if self.check_if_cancelled(bundle):
759 return self.process_work_result(bundle)
760 except Exception as e:
762 f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
764 self.release_worker(bundle)
766 # Weird. We are the original owner of this
767 # bundle. For it to have been cancelled, a backup
768 # must have already started and completed before
769 # we even for started. Moreover, the backup says
770 # it is done but we can't find the results it
771 # should have copied over. Reschedule the whole
775 f'{bundle}: We are the original owner thread and yet there are '
776 + 'no results for this bundle. This is unexpected and bad.'
778 return self.emergency_retry_nasty_bundle(bundle)
780 # Expected(?). We're a backup and our bundle is
781 # cancelled before we even got started. Something
782 # went bad in process_work_result (I acutually don't
783 # see what?) but probably not worth worrying
784 # about. Let the original thread worry about
785 # either finding the results or complaining about
789 # Send input code / data to worker machine if it's not local.
790 if hostname not in machine:
793 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
795 start_ts = time.time()
796 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
798 xfer_latency = time.time() - start_ts
799 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
800 except Exception as e:
801 self.release_worker(bundle)
803 # Weird. We tried to copy the code to the worker and it failed...
804 # And we're the original bundle. We have to retry.
807 f"{bundle}: Failed to send instructions to the worker machine?! "
808 + "This is not expected; we\'re the original bundle so this shouldn\'t "
809 + "be a race condition. Attempting an emergency retry..."
811 return self.emergency_retry_nasty_bundle(bundle)
813 # This is actually expected; we're a backup.
814 # There's a race condition where someone else
815 # already finished the work and removed the source
816 # code file before we could copy it. No biggie.
817 msg = f'{bundle}: Failed to send instructions to the worker machine... '
818 msg += 'We\'re a backup and this may be caused by the original (or some '
819 msg += 'other backup) already finishing this work. Ignoring this.'
823 # Kick off the work. Note that if this fails we let
824 # wait_for_process deal with it.
825 self.status.record_processing_began(uuid)
827 f'{SSH} {bundle.username}@{bundle.machine} '
828 f'"source py38-venv/bin/activate &&'
829 f' /home/scott/lib/python_modules/remote_worker.py'
830 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
832 logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
833 p = cmd_in_background(cmd, silent=True)
836 f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
838 return self.wait_for_process(p, bundle, 0)
840 def wait_for_process(
841 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
843 machine = bundle.machine
848 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
851 self.release_worker(bundle)
852 return self.emergency_retry_nasty_bundle(bundle)
854 # Spin until either the ssh job we scheduled finishes the
855 # bundle or some backup worker signals that they finished it
860 except subprocess.TimeoutExpired:
861 if self.check_if_cancelled(bundle):
863 f'{bundle}: looks like another worker finished bundle...'
867 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
871 # If we get here we believe the bundle is done; either the ssh
872 # subprocess finished (hopefully successfully) or we noticed
873 # that some other worker seems to have completed the bundle
874 # and we're bailing out.
876 ret = self.process_work_result(bundle)
877 if ret is not None and p is not None:
881 # Something went wrong; e.g. we could not copy the results
882 # back, cleanup after ourselves on the remote machine, or
883 # unpickle the results we got from the remove machine. If we
884 # still have an active ssh subprocess, keep waiting on it.
885 # Otherwise, time for an emergency reschedule.
886 except Exception as e:
888 logger.error(f'{bundle}: Something unexpected just happened...')
890 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
892 return self.wait_for_process(p, bundle, depth + 1)
894 self.release_worker(bundle)
895 return self.emergency_retry_nasty_bundle(bundle)
897 def process_work_result(self, bundle: BundleDetails) -> Any:
898 with self.status.lock:
899 is_original = bundle.src_bundle is None
900 was_cancelled = bundle.was_cancelled
901 username = bundle.username
902 machine = bundle.machine
903 result_file = bundle.result_file
904 code_file = bundle.code_file
906 # Whether original or backup, if we finished first we must
907 # fetch the results if the computation happened on a
909 bundle.end_ts = time.time()
910 if not was_cancelled:
911 assert bundle.machine is not None
912 if bundle.hostname not in bundle.machine:
913 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
915 f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
918 # If either of these throw they are handled in
924 except Exception as e:
932 f'{SSH} {username}@{machine}'
933 f' "/bin/rm -f {code_file} {result_file}"'
936 f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
938 dur = bundle.end_ts - bundle.start_ts
939 self.histogram.add_item(dur)
941 # Only the original worker should unpickle the file contents
942 # though since it's the only one whose result matters. The
943 # original is also the only job that may delete result_file
944 # from disk. Note that the original may have been cancelled
945 # if one of the backups finished first; it still must read the
948 logger.debug(f"{bundle}: Unpickling {result_file}.")
950 with open(result_file, 'rb') as rb:
951 serialized = rb.read()
952 result = cloudpickle.loads(serialized)
953 except Exception as e:
955 msg = f'Failed to load {result_file}... this is bad news.'
957 self.release_worker(bundle)
959 # Re-raise the exception; the code in wait_for_process may
960 # decide to emergency_retry_nasty_bundle here.
962 logger.debug(f'Removing local (master) {code_file} and {result_file}.')
963 os.remove(f'{result_file}')
964 os.remove(f'{code_file}')
966 # Notify any backups that the original is done so they
967 # should stop ASAP. Do this whether or not we
968 # finished first since there could be more than one
970 if bundle.backup_bundles is not None:
971 for backup in bundle.backup_bundles:
973 f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
975 backup.is_cancelled.set()
977 # This is a backup job and, by now, we have already fetched
978 # the bundle results.
980 # Backup results don't matter, they just need to leave the
981 # result file in the right place for their originals to
982 # read/unpickle later.
985 # Tell the original to stop if we finished first.
986 if not was_cancelled:
987 orig_bundle = bundle.src_bundle
990 f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
992 orig_bundle.is_cancelled.set()
993 self.release_worker(bundle, was_cancelled=was_cancelled)
996 def create_original_bundle(self, pickle, fname: str):
997 from string_utils import generate_uuid
999 uuid = generate_uuid(omit_dashes=True)
1000 code_file = f'/tmp/{uuid}.code.bin'
1001 result_file = f'/tmp/{uuid}.result.bin'
1003 logger.debug(f'Writing pickled code to {code_file}')
1004 with open(f'{code_file}', 'wb') as wb:
1007 bundle = BundleDetails(
1008 pickled_code=pickle,
1014 hostname=platform.node(),
1015 code_file=code_file,
1016 result_file=result_file,
1018 start_ts=time.time(),
1020 slower_than_local_p95=False,
1021 slower_than_global_p95=False,
1023 is_cancelled=threading.Event(),
1024 was_cancelled=False,
1028 self.status.record_bundle_details(bundle)
1029 logger.debug(f'{bundle}: Created an original bundle')
1032 def create_backup_bundle(self, src_bundle: BundleDetails):
1033 assert src_bundle.backup_bundles is not None
1034 n = len(src_bundle.backup_bundles)
1035 uuid = src_bundle.uuid + f'_backup#{n}'
1037 backup_bundle = BundleDetails(
1038 pickled_code=src_bundle.pickled_code,
1040 fname=src_bundle.fname,
1044 hostname=src_bundle.hostname,
1045 code_file=src_bundle.code_file,
1046 result_file=src_bundle.result_file,
1048 start_ts=time.time(),
1050 slower_than_local_p95=False,
1051 slower_than_global_p95=False,
1052 src_bundle=src_bundle,
1053 is_cancelled=threading.Event(),
1054 was_cancelled=False,
1055 backup_bundles=None, # backup backups not allowed
1058 src_bundle.backup_bundles.append(backup_bundle)
1059 self.status.record_bundle_details_already_locked(backup_bundle)
1060 logger.debug(f'{backup_bundle}: Created a backup bundle')
1061 return backup_bundle
1063 def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1064 assert self.status.lock.locked()
1065 assert src_bundle is not None
1066 backup_bundle = self.create_backup_bundle(src_bundle)
1068 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1070 self._helper_executor.submit(self.launch, backup_bundle)
1072 # Results from backups don't matter; if they finish first
1073 # they will move the result_file to this machine and let
1074 # the original pick them up and unpickle them.
1076 def emergency_retry_nasty_bundle(
1077 self, bundle: BundleDetails
1078 ) -> Optional[fut.Future]:
1079 is_original = bundle.src_bundle is None
1080 bundle.worker = None
1081 avoid_last_machine = bundle.machine
1082 bundle.machine = None
1083 bundle.username = None
1084 bundle.failure_count += 1
1090 if bundle.failure_count > retry_limit:
1092 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1095 raise RemoteExecutorException(
1096 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1100 f'{bundle}: At least it\'s only a backup; better luck with the others.'
1104 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1107 return self.launch(bundle, avoid_last_machine)
1110 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1111 if self.already_shutdown:
1112 raise Exception('Submitted work after shutdown.')
1113 pickle = make_cloud_pickle(function, *args, **kwargs)
1114 bundle = self.create_original_bundle(pickle, function.__name__)
1115 self.total_bundles_submitted += 1
1116 return self._helper_executor.submit(self.launch, bundle)
1119 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1120 if not self.already_shutdown:
1121 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1122 self.heartbeat_stop_event.set()
1123 self.heartbeat_thread.join()
1124 self._helper_executor.shutdown(wait)
1126 print(self.histogram.__repr__(label_formatter='%ds'))
1127 self.already_shutdown = True
1131 class DefaultExecutors(object):
1133 self.thread_executor: Optional[ThreadExecutor] = None
1134 self.process_executor: Optional[ProcessExecutor] = None
1135 self.remote_executor: Optional[RemoteExecutor] = None
1137 def ping(self, host) -> bool:
1138 logger.debug(f'RUN> ping -c 1 {host}')
1140 x = cmd_with_timeout(
1141 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1147 def thread_pool(self) -> ThreadExecutor:
1148 if self.thread_executor is None:
1149 self.thread_executor = ThreadExecutor()
1150 return self.thread_executor
1152 def process_pool(self) -> ProcessExecutor:
1153 if self.process_executor is None:
1154 self.process_executor = ProcessExecutor()
1155 return self.process_executor
1157 def remote_pool(self) -> RemoteExecutor:
1158 if self.remote_executor is None:
1159 logger.info('Looking for some helper machines...')
1160 pool: List[RemoteWorkerRecord] = []
1161 if self.ping('cheetah.house'):
1162 logger.info('Found cheetah.house')
1166 machine='cheetah.house',
1171 if self.ping('meerkat.cabin'):
1172 logger.info('Found meerkat.cabin')
1176 machine='meerkat.cabin',
1181 if self.ping('wannabe.house'):
1182 logger.info('Found wannabe.house')
1186 machine='wannabe.house',
1191 if self.ping('puma.cabin'):
1192 logger.info('Found puma.cabin')
1196 machine='puma.cabin',
1201 if self.ping('backup.house'):
1202 logger.info('Found backup.house')
1206 machine='backup.house',
1212 # The controller machine has a lot to do; go easy on it.
1214 if record.machine == platform.node() and record.count > 1:
1215 logger.info(f'Reducing workload for {record.machine}.')
1218 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1219 policy.register_worker_pool(pool)
1220 self.remote_executor = RemoteExecutor(pool, policy)
1221 return self.remote_executor
1223 def shutdown(self) -> None:
1224 if self.thread_executor is not None:
1225 self.thread_executor.shutdown(wait=True, quiet=True)
1226 self.thread_executor = None
1227 if self.process_executor is not None:
1228 self.process_executor.shutdown(wait=True, quiet=True)
1229 self.process_executor = None
1230 if self.remote_executor is not None:
1231 self.remote_executor.shutdown(wait=True, quiet=True)
1232 self.remote_executor = None