3 from __future__ import annotations
5 import concurrent.futures as fut
14 from abc import ABC, abstractmethod
15 from collections import defaultdict
16 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
19 import cloudpickle # type: ignore
21 from overrides import overrides
25 import histogram as hist
26 from ansi import bg, fg, reset, underline
27 from decorator_utils import singleton
28 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
29 from thread_utils import background_thread
31 logger = logging.getLogger(__name__)
33 parser = config.add_commandline_args(
34 f"Executors ({__file__})", "Args related to processing executors."
37 '--executors_threadpool_size',
40 help='Number of threads in the default threadpool, leave unset for default',
44 '--executors_processpool_size',
47 help='Number of processes in the default processpool, leave unset for default',
51 '--executors_schedule_remote_backups',
53 action=argparse_utils.ActionNoYes,
54 help='Should we schedule duplicative backup work if a remote bundle is slow',
57 '--executors_max_bundle_failures',
61 help='Maximum number of failures before giving up on a bundle',
64 SSH = '/usr/bin/ssh -oForwardX11=no'
65 SCP = '/usr/bin/scp -C'
68 def make_cloud_pickle(fun, *args, **kwargs):
69 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
70 return cloudpickle.dumps((fun, args, kwargs))
73 class BaseExecutor(ABC):
74 def __init__(self, *, title=''):
76 self.histogram = hist.SimpleHistogram(
77 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
82 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
86 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
89 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
90 """Shutdown the executor and return True if the executor is idle
91 (i.e. there are no pending or active tasks). Return False
92 otherwise. Note: this should only be called by the launcher
96 if self.task_count == 0:
97 self.shutdown(wait=True, quiet=quiet)
101 def adjust_task_count(self, delta: int) -> None:
102 """Change the task count. Note: do not call this method from a
103 worker, it should only be called by the launcher process /
107 self.task_count += delta
108 logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
110 def get_task_count(self) -> int:
111 """Change the task count. Note: do not call this method from a
112 worker, it should only be called by the launcher process /
116 return self.task_count
119 class ThreadExecutor(BaseExecutor):
120 def __init__(self, max_workers: Optional[int] = None):
123 if max_workers is not None:
124 workers = max_workers
125 elif 'executors_threadpool_size' in config.config:
126 workers = config.config['executors_threadpool_size']
127 logger.debug(f'Creating threadpool executor with {workers} workers')
128 self._thread_pool_executor = fut.ThreadPoolExecutor(
129 max_workers=workers, thread_name_prefix="thread_executor_helper"
131 self.already_shutdown = False
133 # This is run on a different thread; do not adjust task count here.
134 def run_local_bundle(self, fun, *args, **kwargs):
135 logger.debug(f"Running local bundle at {fun.__name__}")
136 result = fun(*args, **kwargs)
140 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
141 if self.already_shutdown:
142 raise Exception('Submitted work after shutdown.')
143 self.adjust_task_count(+1)
145 newargs.append(function)
149 result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
150 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
151 result.add_done_callback(lambda _: self.adjust_task_count(-1))
155 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
156 if not self.already_shutdown:
157 logger.debug(f'Shutting down threadpool executor {self.title}')
158 self._thread_pool_executor.shutdown(wait)
160 print(self.histogram.__repr__(label_formatter='%ds'))
161 self.already_shutdown = True
164 class ProcessExecutor(BaseExecutor):
165 def __init__(self, max_workers=None):
168 if max_workers is not None:
169 workers = max_workers
170 elif 'executors_processpool_size' in config.config:
171 workers = config.config['executors_processpool_size']
172 logger.debug(f'Creating processpool executor with {workers} workers.')
173 self._process_executor = fut.ProcessPoolExecutor(
176 self.already_shutdown = False
178 # This is run in another process; do not adjust task count here.
179 def run_cloud_pickle(self, pickle):
180 fun, args, kwargs = cloudpickle.loads(pickle)
181 logger.debug(f"Running pickled bundle at {fun.__name__}")
182 result = fun(*args, **kwargs)
186 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
187 if self.already_shutdown:
188 raise Exception('Submitted work after shutdown.')
190 self.adjust_task_count(+1)
191 pickle = make_cloud_pickle(function, *args, **kwargs)
192 result = self._process_executor.submit(self.run_cloud_pickle, pickle)
193 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
194 result.add_done_callback(lambda _: self.adjust_task_count(-1))
198 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
199 if not self.already_shutdown:
200 logger.debug(f'Shutting down processpool executor {self.title}')
201 self._process_executor.shutdown(wait)
203 print(self.histogram.__repr__(label_formatter='%ds'))
204 self.already_shutdown = True
206 def __getstate__(self):
207 state = self.__dict__.copy()
208 state['_process_executor'] = None
212 class RemoteExecutorException(Exception):
213 """Thrown when a bundle cannot be executed despite several retries."""
219 class RemoteWorkerRecord:
226 return hash((self.username, self.machine))
229 return f'{self.username}@{self.machine}'
237 worker: Optional[RemoteWorkerRecord]
238 username: Optional[str]
239 machine: Optional[str]
246 slower_than_local_p95: bool
247 slower_than_global_p95: bool
248 src_bundle: Optional[BundleDetails]
249 is_cancelled: threading.Event
251 backup_bundles: Optional[List[BundleDetails]]
256 if uuid[-9:-2] == '_backup':
258 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
268 fg('marigold yellow'),
271 fg('cornflower blue'),
272 fg('turquoise blue'),
274 fg('lavender purple'),
277 c = colorz[int(uuid[-2:], 16) % len(colorz)]
278 fname = self.fname if self.fname is not None else 'nofname'
279 machine = self.machine if self.machine is not None else 'nomachine'
280 return f'{c}{suffix}/{fname}/{machine}{reset()}'
283 class RemoteExecutorStatus:
284 def __init__(self, total_worker_count: int) -> None:
285 self.worker_count: int = total_worker_count
286 self.known_workers: Set[RemoteWorkerRecord] = set()
287 self.start_time: float = time.time()
288 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
289 self.end_per_bundle: Dict[str, float] = defaultdict(float)
290 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
291 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
292 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
293 self.finished_bundle_timings: List[float] = []
294 self.last_periodic_dump: Optional[float] = None
295 self.total_bundles_submitted: int = 0
297 # Protects reads and modification using self. Also used
298 # as a memory fence for modifications to bundle.
299 self.lock: threading.Lock = threading.Lock()
301 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
303 self.record_acquire_worker_already_locked(worker, uuid)
305 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
306 assert self.lock.locked()
307 self.known_workers.add(worker)
308 self.start_per_bundle[uuid] = None
309 x = self.in_flight_bundles_by_worker.get(worker, set())
311 self.in_flight_bundles_by_worker[worker] = x
313 def record_bundle_details(self, details: BundleDetails) -> None:
315 self.record_bundle_details_already_locked(details)
317 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
318 assert self.lock.locked()
319 self.bundle_details_by_uuid[details.uuid] = details
321 def record_release_worker(
323 worker: RemoteWorkerRecord,
328 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
330 def record_release_worker_already_locked(
332 worker: RemoteWorkerRecord,
336 assert self.lock.locked()
338 self.end_per_bundle[uuid] = ts
339 self.in_flight_bundles_by_worker[worker].remove(uuid)
340 if not was_cancelled:
341 start = self.start_per_bundle[uuid]
342 assert start is not None
343 bundle_latency = ts - start
344 x = self.finished_bundle_timings_per_worker.get(worker, list())
345 x.append(bundle_latency)
346 self.finished_bundle_timings_per_worker[worker] = x
347 self.finished_bundle_timings.append(bundle_latency)
349 def record_processing_began(self, uuid: str):
351 self.start_per_bundle[uuid] = time.time()
353 def total_in_flight(self) -> int:
354 assert self.lock.locked()
356 for worker in self.known_workers:
357 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
358 return total_in_flight
360 def total_idle(self) -> int:
361 assert self.lock.locked()
362 return self.worker_count - self.total_in_flight()
365 assert self.lock.locked()
367 total_finished = len(self.finished_bundle_timings)
368 total_in_flight = self.total_in_flight()
369 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
371 if len(self.finished_bundle_timings) > 1:
372 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
374 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
375 f'✅={total_finished}/{self.total_bundles_submitted}, '
376 f'💻n={total_in_flight}/{self.worker_count}\n'
380 f'⏱={ts-self.start_time:.1f}s, '
381 f'✅={total_finished}/{self.total_bundles_submitted}, '
382 f'💻n={total_in_flight}/{self.worker_count}\n'
385 for worker in self.known_workers:
386 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
387 timings = self.finished_bundle_timings_per_worker.get(worker, [])
391 qworker = numpy.quantile(timings, [0.5, 0.95])
392 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
396 ret += f' ...finished {count} total bundle(s) so far\n'
397 in_flight = len(self.in_flight_bundles_by_worker[worker])
399 ret += f' ...{in_flight} bundles currently in flight:\n'
400 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
401 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
402 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
403 if self.start_per_bundle[bundle_uuid] is not None:
404 sec = ts - self.start_per_bundle[bundle_uuid]
405 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
407 ret += f' {details} setting up / copying data...'
410 if qworker is not None:
412 ret += f'{bg("red")}>💻p95{reset()} '
413 if details is not None:
414 details.slower_than_local_p95 = True
416 if details is not None:
417 details.slower_than_local_p95 = False
421 ret += f'{bg("red")}>∀p95{reset()} '
422 if details is not None:
423 details.slower_than_global_p95 = True
425 details.slower_than_global_p95 = False
429 def periodic_dump(self, total_bundles_submitted: int) -> None:
430 assert self.lock.locked()
431 self.total_bundles_submitted = total_bundles_submitted
433 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
435 self.last_periodic_dump = ts
438 class RemoteWorkerSelectionPolicy(ABC):
439 def register_worker_pool(self, workers):
440 self.workers = workers
443 def is_worker_available(self) -> bool:
447 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
451 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
453 def is_worker_available(self) -> bool:
454 for worker in self.workers:
460 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
462 for worker in self.workers:
463 if worker.machine != machine_to_avoid:
465 for _ in range(worker.count * worker.weight):
466 grabbag.append(worker)
468 if len(grabbag) == 0:
469 logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
470 for worker in self.workers:
472 for _ in range(worker.count * worker.weight):
473 grabbag.append(worker)
475 if len(grabbag) == 0:
476 logger.warning('There are no available workers?!')
479 worker = random.sample(grabbag, 1)[0]
480 assert worker.count > 0
482 logger.debug(f'Chose worker {worker}')
486 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
487 def __init__(self) -> None:
491 def is_worker_available(self) -> bool:
492 for worker in self.workers:
498 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
501 worker = self.workers[x]
505 if x >= len(self.workers):
508 logger.debug(f'Selected worker {worker}')
511 if x >= len(self.workers):
514 msg = 'Unexpectedly could not find a worker, retrying...'
519 class RemoteExecutor(BaseExecutor):
522 workers: List[RemoteWorkerRecord],
523 policy: RemoteWorkerSelectionPolicy,
526 self.workers = workers
528 self.worker_count = 0
529 for worker in self.workers:
530 self.worker_count += worker.count
531 if self.worker_count <= 0:
532 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
534 raise RemoteExecutorException(msg)
535 self.policy.register_worker_pool(self.workers)
536 self.cv = threading.Condition()
537 logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
538 self._helper_executor = fut.ThreadPoolExecutor(
539 thread_name_prefix="remote_executor_helper",
540 max_workers=self.worker_count,
542 self.status = RemoteExecutorStatus(self.worker_count)
543 self.total_bundles_submitted = 0
544 self.backup_lock = threading.Lock()
545 self.last_backup = None
547 self.heartbeat_thread,
548 self.heartbeat_stop_event,
549 ) = self.run_periodic_heartbeat()
550 self.already_shutdown = False
553 def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
554 while not stop_event.is_set():
556 logger.debug('Running periodic heartbeat code...')
558 logger.debug('Periodic heartbeat thread shutting down.')
560 def heartbeat(self) -> None:
561 # Note: this is invoked on a background thread, not an
562 # executor thread. Be careful what you do with it b/c it
563 # needs to get back and dump status again periodically.
564 with self.status.lock:
565 self.status.periodic_dump(self.total_bundles_submitted)
567 # Look for bundles to reschedule via executor.submit
568 if config.config['executors_schedule_remote_backups']:
569 self.maybe_schedule_backup_bundles()
571 def maybe_schedule_backup_bundles(self):
572 assert self.status.lock.locked()
573 num_done = len(self.status.finished_bundle_timings)
574 num_idle_workers = self.worker_count - self.task_count
578 and num_idle_workers > 1
579 and (self.last_backup is None or (now - self.last_backup > 9.0))
580 and self.backup_lock.acquire(blocking=False)
583 assert self.backup_lock.locked()
585 bundle_to_backup = None
590 ) in self.status.in_flight_bundles_by_worker.items():
592 # Prefer to schedule backups of bundles running on
595 for record in self.workers:
596 if worker.machine == record.machine:
597 base_score = float(record.weight)
598 base_score = 1.0 / base_score
600 base_score = int(base_score)
603 for uuid in bundle_uuids:
604 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
607 and bundle.src_bundle is None
608 and bundle.backup_bundles is not None
612 # Schedule backups of bundles running
613 # longer; especially those that are
615 start_ts = self.status.start_per_bundle[uuid]
616 if start_ts is not None:
617 runtime = now - start_ts
619 logger.debug(f'score[{bundle}] => {score} # latency boost')
621 if bundle.slower_than_local_p95:
623 logger.debug(f'score[{bundle}] => {score} # >worker p95')
625 if bundle.slower_than_global_p95:
627 logger.debug(f'score[{bundle}] => {score} # >global p95')
629 # Prefer backups of bundles that don't
630 # have backups already.
631 backup_count = len(bundle.backup_bundles)
632 if backup_count == 0:
634 elif backup_count == 1:
636 elif backup_count == 2:
641 f'score[{bundle}] => {score} # {backup_count} dup backup factor'
644 if score != 0 and (best_score is None or score > best_score):
645 bundle_to_backup = bundle
646 assert bundle is not None
647 assert bundle.backup_bundles is not None
648 assert bundle.src_bundle is None
651 # Note: this is all still happening on the heartbeat
652 # runner thread. That's ok because
653 # schedule_backup_for_bundle uses the executor to
654 # submit the bundle again which will cause it to be
655 # picked up by a worker thread and allow this thread
656 # to return to run future heartbeats.
657 if bundle_to_backup is not None:
658 self.last_backup = now
660 f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
662 self.schedule_backup_for_bundle(bundle_to_backup)
664 self.backup_lock.release()
666 def is_worker_available(self) -> bool:
667 return self.policy.is_worker_available()
669 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
670 return self.policy.acquire_worker(machine_to_avoid)
672 def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
674 while not self.is_worker_available():
676 worker = self.acquire_worker(machine_to_avoid)
677 if worker is not None:
679 msg = "We should never reach this point in the code"
683 def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
684 worker = bundle.worker
685 assert worker is not None
686 logger.debug(f'Released worker {worker}')
687 self.status.record_release_worker(
695 self.adjust_task_count(-1)
697 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
698 with self.status.lock:
699 if bundle.is_cancelled.wait(timeout=0.0):
700 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
701 bundle.was_cancelled = True
705 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
706 """Find a worker for bundle or block until one is available."""
707 self.adjust_task_count(+1)
709 hostname = bundle.hostname
710 avoid_machine = override_avoid_machine
711 is_original = bundle.src_bundle is None
713 # Try not to schedule a backup on the same host as the original.
714 if avoid_machine is None and bundle.src_bundle is not None:
715 avoid_machine = bundle.src_bundle.machine
717 while worker is None:
718 worker = self.find_available_worker_or_block(avoid_machine)
719 assert worker is not None
721 # Ok, found a worker.
722 bundle.worker = worker
723 machine = bundle.machine = worker.machine
724 username = bundle.username = worker.username
725 self.status.record_acquire_worker(worker, uuid)
726 logger.debug(f'{bundle}: Running bundle on {worker}...')
728 # Before we do any work, make sure the bundle is still viable.
729 # It may have been some time between when it was submitted and
730 # now due to lack of worker availability and someone else may
731 # have already finished it.
732 if self.check_if_cancelled(bundle):
734 return self.process_work_result(bundle)
735 except Exception as e:
736 logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
737 self.release_worker(bundle)
739 # Weird. We are the original owner of this
740 # bundle. For it to have been cancelled, a backup
741 # must have already started and completed before
742 # we even for started. Moreover, the backup says
743 # it is done but we can't find the results it
744 # should have copied over. Reschedule the whole
748 f'{bundle}: We are the original owner thread and yet there are '
749 + 'no results for this bundle. This is unexpected and bad.'
751 return self.emergency_retry_nasty_bundle(bundle)
753 # Expected(?). We're a backup and our bundle is
754 # cancelled before we even got started. Something
755 # went bad in process_work_result (I acutually don't
756 # see what?) but probably not worth worrying
757 # about. Let the original thread worry about
758 # either finding the results or complaining about
762 # Send input code / data to worker machine if it's not local.
763 if hostname not in machine:
765 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
766 start_ts = time.time()
767 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
769 xfer_latency = time.time() - start_ts
770 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
771 except Exception as e:
772 self.release_worker(bundle)
774 # Weird. We tried to copy the code to the worker and it failed...
775 # And we're the original bundle. We have to retry.
778 f"{bundle}: Failed to send instructions to the worker machine?! "
779 + "This is not expected; we\'re the original bundle so this shouldn\'t "
780 + "be a race condition. Attempting an emergency retry..."
782 return self.emergency_retry_nasty_bundle(bundle)
784 # This is actually expected; we're a backup.
785 # There's a race condition where someone else
786 # already finished the work and removed the source
787 # code file before we could copy it. No biggie.
788 msg = f'{bundle}: Failed to send instructions to the worker machine... '
789 msg += 'We\'re a backup and this may be caused by the original (or some '
790 msg += 'other backup) already finishing this work. Ignoring this.'
794 # Kick off the work. Note that if this fails we let
795 # wait_for_process deal with it.
796 self.status.record_processing_began(uuid)
798 f'{SSH} {bundle.username}@{bundle.machine} '
799 f'"source py38-venv/bin/activate &&'
800 f' /home/scott/lib/python_modules/remote_worker.py'
801 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
803 logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
804 p = cmd_in_background(cmd, silent=True)
806 logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
807 return self.wait_for_process(p, bundle, 0)
809 def wait_for_process(
810 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
812 machine = bundle.machine
817 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
820 self.release_worker(bundle)
821 return self.emergency_retry_nasty_bundle(bundle)
823 # Spin until either the ssh job we scheduled finishes the
824 # bundle or some backup worker signals that they finished it
829 except subprocess.TimeoutExpired:
830 if self.check_if_cancelled(bundle):
831 logger.info(f'{bundle}: looks like another worker finished bundle...')
834 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
838 # If we get here we believe the bundle is done; either the ssh
839 # subprocess finished (hopefully successfully) or we noticed
840 # that some other worker seems to have completed the bundle
841 # and we're bailing out.
843 ret = self.process_work_result(bundle)
844 if ret is not None and p is not None:
848 # Something went wrong; e.g. we could not copy the results
849 # back, cleanup after ourselves on the remote machine, or
850 # unpickle the results we got from the remove machine. If we
851 # still have an active ssh subprocess, keep waiting on it.
852 # Otherwise, time for an emergency reschedule.
853 except Exception as e:
855 logger.error(f'{bundle}: Something unexpected just happened...')
857 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
859 return self.wait_for_process(p, bundle, depth + 1)
861 self.release_worker(bundle)
862 return self.emergency_retry_nasty_bundle(bundle)
864 def process_work_result(self, bundle: BundleDetails) -> Any:
865 with self.status.lock:
866 is_original = bundle.src_bundle is None
867 was_cancelled = bundle.was_cancelled
868 username = bundle.username
869 machine = bundle.machine
870 result_file = bundle.result_file
871 code_file = bundle.code_file
873 # Whether original or backup, if we finished first we must
874 # fetch the results if the computation happened on a
876 bundle.end_ts = time.time()
877 if not was_cancelled:
878 assert bundle.machine is not None
879 if bundle.hostname not in bundle.machine:
880 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
882 f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
885 # If either of these throw they are handled in
891 except Exception as e:
899 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
901 logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
902 dur = bundle.end_ts - bundle.start_ts
903 self.histogram.add_item(dur)
905 # Only the original worker should unpickle the file contents
906 # though since it's the only one whose result matters. The
907 # original is also the only job that may delete result_file
908 # from disk. Note that the original may have been cancelled
909 # if one of the backups finished first; it still must read the
912 logger.debug(f"{bundle}: Unpickling {result_file}.")
914 with open(result_file, 'rb') as rb:
915 serialized = rb.read()
916 result = cloudpickle.loads(serialized)
917 except Exception as e:
919 msg = f'Failed to load {result_file}... this is bad news.'
921 self.release_worker(bundle)
923 # Re-raise the exception; the code in wait_for_process may
924 # decide to emergency_retry_nasty_bundle here.
926 logger.debug(f'Removing local (master) {code_file} and {result_file}.')
927 os.remove(f'{result_file}')
928 os.remove(f'{code_file}')
930 # Notify any backups that the original is done so they
931 # should stop ASAP. Do this whether or not we
932 # finished first since there could be more than one
934 if bundle.backup_bundles is not None:
935 for backup in bundle.backup_bundles:
936 logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
937 backup.is_cancelled.set()
939 # This is a backup job and, by now, we have already fetched
940 # the bundle results.
942 # Backup results don't matter, they just need to leave the
943 # result file in the right place for their originals to
944 # read/unpickle later.
947 # Tell the original to stop if we finished first.
948 if not was_cancelled:
949 orig_bundle = bundle.src_bundle
950 assert orig_bundle is not None
951 logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
952 orig_bundle.is_cancelled.set()
953 self.release_worker(bundle, was_cancelled=was_cancelled)
956 def create_original_bundle(self, pickle, fname: str):
957 from string_utils import generate_uuid
959 uuid = generate_uuid(omit_dashes=True)
960 code_file = f'/tmp/{uuid}.code.bin'
961 result_file = f'/tmp/{uuid}.result.bin'
963 logger.debug(f'Writing pickled code to {code_file}')
964 with open(f'{code_file}', 'wb') as wb:
967 bundle = BundleDetails(
974 hostname=platform.node(),
976 result_file=result_file,
978 start_ts=time.time(),
980 slower_than_local_p95=False,
981 slower_than_global_p95=False,
983 is_cancelled=threading.Event(),
988 self.status.record_bundle_details(bundle)
989 logger.debug(f'{bundle}: Created an original bundle')
992 def create_backup_bundle(self, src_bundle: BundleDetails):
993 assert src_bundle.backup_bundles is not None
994 n = len(src_bundle.backup_bundles)
995 uuid = src_bundle.uuid + f'_backup#{n}'
997 backup_bundle = BundleDetails(
998 pickled_code=src_bundle.pickled_code,
1000 fname=src_bundle.fname,
1004 hostname=src_bundle.hostname,
1005 code_file=src_bundle.code_file,
1006 result_file=src_bundle.result_file,
1008 start_ts=time.time(),
1010 slower_than_local_p95=False,
1011 slower_than_global_p95=False,
1012 src_bundle=src_bundle,
1013 is_cancelled=threading.Event(),
1014 was_cancelled=False,
1015 backup_bundles=None, # backup backups not allowed
1018 src_bundle.backup_bundles.append(backup_bundle)
1019 self.status.record_bundle_details_already_locked(backup_bundle)
1020 logger.debug(f'{backup_bundle}: Created a backup bundle')
1021 return backup_bundle
1023 def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1024 assert self.status.lock.locked()
1025 assert src_bundle is not None
1026 backup_bundle = self.create_backup_bundle(src_bundle)
1028 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1030 self._helper_executor.submit(self.launch, backup_bundle)
1032 # Results from backups don't matter; if they finish first
1033 # they will move the result_file to this machine and let
1034 # the original pick them up and unpickle them.
1036 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1037 is_original = bundle.src_bundle is None
1038 bundle.worker = None
1039 avoid_last_machine = bundle.machine
1040 bundle.machine = None
1041 bundle.username = None
1042 bundle.failure_count += 1
1048 if bundle.failure_count > retry_limit:
1050 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1053 raise RemoteExecutorException(
1054 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1058 f'{bundle}: At least it\'s only a backup; better luck with the others.'
1062 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1065 return self.launch(bundle, avoid_last_machine)
1068 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1069 if self.already_shutdown:
1070 raise Exception('Submitted work after shutdown.')
1071 pickle = make_cloud_pickle(function, *args, **kwargs)
1072 bundle = self.create_original_bundle(pickle, function.__name__)
1073 self.total_bundles_submitted += 1
1074 return self._helper_executor.submit(self.launch, bundle)
1077 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1078 if not self.already_shutdown:
1079 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1080 self.heartbeat_stop_event.set()
1081 self.heartbeat_thread.join()
1082 self._helper_executor.shutdown(wait)
1084 print(self.histogram.__repr__(label_formatter='%ds'))
1085 self.already_shutdown = True
1089 class DefaultExecutors(object):
1091 self.thread_executor: Optional[ThreadExecutor] = None
1092 self.process_executor: Optional[ProcessExecutor] = None
1093 self.remote_executor: Optional[RemoteExecutor] = None
1095 def ping(self, host) -> bool:
1096 logger.debug(f'RUN> ping -c 1 {host}')
1098 x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1103 def thread_pool(self) -> ThreadExecutor:
1104 if self.thread_executor is None:
1105 self.thread_executor = ThreadExecutor()
1106 return self.thread_executor
1108 def process_pool(self) -> ProcessExecutor:
1109 if self.process_executor is None:
1110 self.process_executor = ProcessExecutor()
1111 return self.process_executor
1113 def remote_pool(self) -> RemoteExecutor:
1114 if self.remote_executor is None:
1115 logger.info('Looking for some helper machines...')
1116 pool: List[RemoteWorkerRecord] = []
1117 if self.ping('cheetah.house'):
1118 logger.info('Found cheetah.house')
1122 machine='cheetah.house',
1127 if self.ping('meerkat.cabin'):
1128 logger.info('Found meerkat.cabin')
1132 machine='meerkat.cabin',
1137 if self.ping('wannabe.house'):
1138 logger.info('Found wannabe.house')
1142 machine='wannabe.house',
1147 if self.ping('puma.cabin'):
1148 logger.info('Found puma.cabin')
1152 machine='puma.cabin',
1157 if self.ping('backup.house'):
1158 logger.info('Found backup.house')
1162 machine='backup.house',
1168 # The controller machine has a lot to do; go easy on it.
1170 if record.machine == platform.node() and record.count > 1:
1171 logger.info(f'Reducing workload for {record.machine}.')
1174 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1175 policy.register_worker_pool(pool)
1176 self.remote_executor = RemoteExecutor(pool, policy)
1177 return self.remote_executor
1179 def shutdown(self) -> None:
1180 if self.thread_executor is not None:
1181 self.thread_executor.shutdown(wait=True, quiet=True)
1182 self.thread_executor = None
1183 if self.process_executor is not None:
1184 self.process_executor.shutdown(wait=True, quiet=True)
1185 self.process_executor = None
1186 if self.remote_executor is not None:
1187 self.remote_executor.shutdown(wait=True, quiet=True)
1188 self.remote_executor = None