2 # -*- coding: utf-8 -*-
4 from __future__ import annotations
6 import concurrent.futures as fut
15 from abc import ABC, abstractmethod
16 from collections import defaultdict
17 from dataclasses import dataclass
18 from typing import Any, Callable, Dict, List, Optional, Set
20 import cloudpickle # type: ignore
22 from overrides import overrides
26 import histogram as hist
27 from ansi import bg, fg, reset, underline
28 from decorator_utils import singleton
29 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
30 from thread_utils import background_thread
32 logger = logging.getLogger(__name__)
34 parser = config.add_commandline_args(
35 f"Executors ({__file__})", "Args related to processing executors."
38 '--executors_threadpool_size',
41 help='Number of threads in the default threadpool, leave unset for default',
45 '--executors_processpool_size',
48 help='Number of processes in the default processpool, leave unset for default',
52 '--executors_schedule_remote_backups',
54 action=argparse_utils.ActionNoYes,
55 help='Should we schedule duplicative backup work if a remote bundle is slow',
58 '--executors_max_bundle_failures',
62 help='Maximum number of failures before giving up on a bundle',
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
69 def make_cloud_pickle(fun, *args, **kwargs):
70 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71 return cloudpickle.dumps((fun, args, kwargs))
74 class BaseExecutor(ABC):
75 def __init__(self, *, title=''):
77 self.histogram = hist.SimpleHistogram(
78 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
83 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
87 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
90 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
91 """Shutdown the executor and return True if the executor is idle
92 (i.e. there are no pending or active tasks). Return False
93 otherwise. Note: this should only be called by the launcher
97 if self.task_count == 0:
98 self.shutdown(wait=True, quiet=quiet)
102 def adjust_task_count(self, delta: int) -> None:
103 """Change the task count. Note: do not call this method from a
104 worker, it should only be called by the launcher process /
108 self.task_count += delta
109 logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
111 def get_task_count(self) -> int:
112 """Change the task count. Note: do not call this method from a
113 worker, it should only be called by the launcher process /
117 return self.task_count
120 class ThreadExecutor(BaseExecutor):
121 def __init__(self, max_workers: Optional[int] = None):
124 if max_workers is not None:
125 workers = max_workers
126 elif 'executors_threadpool_size' in config.config:
127 workers = config.config['executors_threadpool_size']
128 logger.debug(f'Creating threadpool executor with {workers} workers')
129 self._thread_pool_executor = fut.ThreadPoolExecutor(
130 max_workers=workers, thread_name_prefix="thread_executor_helper"
132 self.already_shutdown = False
134 # This is run on a different thread; do not adjust task count here.
135 def run_local_bundle(self, fun, *args, **kwargs):
136 logger.debug(f"Running local bundle at {fun.__name__}")
137 result = fun(*args, **kwargs)
141 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142 if self.already_shutdown:
143 raise Exception('Submitted work after shutdown.')
144 self.adjust_task_count(+1)
146 newargs.append(function)
150 result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
151 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
152 result.add_done_callback(lambda _: self.adjust_task_count(-1))
156 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
157 if not self.already_shutdown:
158 logger.debug(f'Shutting down threadpool executor {self.title}')
159 self._thread_pool_executor.shutdown(wait)
161 print(self.histogram.__repr__(label_formatter='%ds'))
162 self.already_shutdown = True
165 class ProcessExecutor(BaseExecutor):
166 def __init__(self, max_workers=None):
169 if max_workers is not None:
170 workers = max_workers
171 elif 'executors_processpool_size' in config.config:
172 workers = config.config['executors_processpool_size']
173 logger.debug(f'Creating processpool executor with {workers} workers.')
174 self._process_executor = fut.ProcessPoolExecutor(
177 self.already_shutdown = False
179 # This is run in another process; do not adjust task count here.
180 def run_cloud_pickle(self, pickle):
181 fun, args, kwargs = cloudpickle.loads(pickle)
182 logger.debug(f"Running pickled bundle at {fun.__name__}")
183 result = fun(*args, **kwargs)
187 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
188 if self.already_shutdown:
189 raise Exception('Submitted work after shutdown.')
191 self.adjust_task_count(+1)
192 pickle = make_cloud_pickle(function, *args, **kwargs)
193 result = self._process_executor.submit(self.run_cloud_pickle, pickle)
194 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
195 result.add_done_callback(lambda _: self.adjust_task_count(-1))
199 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
200 if not self.already_shutdown:
201 logger.debug(f'Shutting down processpool executor {self.title}')
202 self._process_executor.shutdown(wait)
204 print(self.histogram.__repr__(label_formatter='%ds'))
205 self.already_shutdown = True
207 def __getstate__(self):
208 state = self.__dict__.copy()
209 state['_process_executor'] = None
213 class RemoteExecutorException(Exception):
214 """Thrown when a bundle cannot be executed despite several retries."""
220 class RemoteWorkerRecord:
227 return hash((self.username, self.machine))
230 return f'{self.username}@{self.machine}'
238 worker: Optional[RemoteWorkerRecord]
239 username: Optional[str]
240 machine: Optional[str]
247 slower_than_local_p95: bool
248 slower_than_global_p95: bool
249 src_bundle: Optional[BundleDetails]
250 is_cancelled: threading.Event
252 backup_bundles: Optional[List[BundleDetails]]
257 if uuid[-9:-2] == '_backup':
259 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
269 fg('marigold yellow'),
272 fg('cornflower blue'),
273 fg('turquoise blue'),
275 fg('lavender purple'),
278 c = colorz[int(uuid[-2:], 16) % len(colorz)]
279 fname = self.fname if self.fname is not None else 'nofname'
280 machine = self.machine if self.machine is not None else 'nomachine'
281 return f'{c}{suffix}/{fname}/{machine}{reset()}'
284 class RemoteExecutorStatus:
285 def __init__(self, total_worker_count: int) -> None:
286 self.worker_count: int = total_worker_count
287 self.known_workers: Set[RemoteWorkerRecord] = set()
288 self.start_time: float = time.time()
289 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
290 self.end_per_bundle: Dict[str, float] = defaultdict(float)
291 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
292 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
293 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
294 self.finished_bundle_timings: List[float] = []
295 self.last_periodic_dump: Optional[float] = None
296 self.total_bundles_submitted: int = 0
298 # Protects reads and modification using self. Also used
299 # as a memory fence for modifications to bundle.
300 self.lock: threading.Lock = threading.Lock()
302 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
304 self.record_acquire_worker_already_locked(worker, uuid)
306 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
307 assert self.lock.locked()
308 self.known_workers.add(worker)
309 self.start_per_bundle[uuid] = None
310 x = self.in_flight_bundles_by_worker.get(worker, set())
312 self.in_flight_bundles_by_worker[worker] = x
314 def record_bundle_details(self, details: BundleDetails) -> None:
316 self.record_bundle_details_already_locked(details)
318 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
319 assert self.lock.locked()
320 self.bundle_details_by_uuid[details.uuid] = details
322 def record_release_worker(
324 worker: RemoteWorkerRecord,
329 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
331 def record_release_worker_already_locked(
333 worker: RemoteWorkerRecord,
337 assert self.lock.locked()
339 self.end_per_bundle[uuid] = ts
340 self.in_flight_bundles_by_worker[worker].remove(uuid)
341 if not was_cancelled:
342 start = self.start_per_bundle[uuid]
343 assert start is not None
344 bundle_latency = ts - start
345 x = self.finished_bundle_timings_per_worker.get(worker, list())
346 x.append(bundle_latency)
347 self.finished_bundle_timings_per_worker[worker] = x
348 self.finished_bundle_timings.append(bundle_latency)
350 def record_processing_began(self, uuid: str):
352 self.start_per_bundle[uuid] = time.time()
354 def total_in_flight(self) -> int:
355 assert self.lock.locked()
357 for worker in self.known_workers:
358 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
359 return total_in_flight
361 def total_idle(self) -> int:
362 assert self.lock.locked()
363 return self.worker_count - self.total_in_flight()
366 assert self.lock.locked()
368 total_finished = len(self.finished_bundle_timings)
369 total_in_flight = self.total_in_flight()
370 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
372 if len(self.finished_bundle_timings) > 1:
373 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
375 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
376 f'✅={total_finished}/{self.total_bundles_submitted}, '
377 f'💻n={total_in_flight}/{self.worker_count}\n'
381 f'⏱={ts-self.start_time:.1f}s, '
382 f'✅={total_finished}/{self.total_bundles_submitted}, '
383 f'💻n={total_in_flight}/{self.worker_count}\n'
386 for worker in self.known_workers:
387 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
388 timings = self.finished_bundle_timings_per_worker.get(worker, [])
392 qworker = numpy.quantile(timings, [0.5, 0.95])
393 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
397 ret += f' ...finished {count} total bundle(s) so far\n'
398 in_flight = len(self.in_flight_bundles_by_worker[worker])
400 ret += f' ...{in_flight} bundles currently in flight:\n'
401 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
402 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
403 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
404 if self.start_per_bundle[bundle_uuid] is not None:
405 sec = ts - self.start_per_bundle[bundle_uuid]
406 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
408 ret += f' {details} setting up / copying data...'
411 if qworker is not None:
413 ret += f'{bg("red")}>💻p95{reset()} '
414 if details is not None:
415 details.slower_than_local_p95 = True
417 if details is not None:
418 details.slower_than_local_p95 = False
422 ret += f'{bg("red")}>∀p95{reset()} '
423 if details is not None:
424 details.slower_than_global_p95 = True
426 details.slower_than_global_p95 = False
430 def periodic_dump(self, total_bundles_submitted: int) -> None:
431 assert self.lock.locked()
432 self.total_bundles_submitted = total_bundles_submitted
434 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
436 self.last_periodic_dump = ts
439 class RemoteWorkerSelectionPolicy(ABC):
440 def register_worker_pool(self, workers):
441 self.workers = workers
444 def is_worker_available(self) -> bool:
448 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
452 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
454 def is_worker_available(self) -> bool:
455 for worker in self.workers:
461 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
463 for worker in self.workers:
464 if worker.machine != machine_to_avoid:
466 for _ in range(worker.count * worker.weight):
467 grabbag.append(worker)
469 if len(grabbag) == 0:
470 logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
471 for worker in self.workers:
473 for _ in range(worker.count * worker.weight):
474 grabbag.append(worker)
476 if len(grabbag) == 0:
477 logger.warning('There are no available workers?!')
480 worker = random.sample(grabbag, 1)[0]
481 assert worker.count > 0
483 logger.debug(f'Chose worker {worker}')
487 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
488 def __init__(self) -> None:
492 def is_worker_available(self) -> bool:
493 for worker in self.workers:
499 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
502 worker = self.workers[x]
506 if x >= len(self.workers):
509 logger.debug(f'Selected worker {worker}')
512 if x >= len(self.workers):
515 msg = 'Unexpectedly could not find a worker, retrying...'
520 class RemoteExecutor(BaseExecutor):
523 workers: List[RemoteWorkerRecord],
524 policy: RemoteWorkerSelectionPolicy,
527 self.workers = workers
529 self.worker_count = 0
530 for worker in self.workers:
531 self.worker_count += worker.count
532 if self.worker_count <= 0:
533 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
535 raise RemoteExecutorException(msg)
536 self.policy.register_worker_pool(self.workers)
537 self.cv = threading.Condition()
538 logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
539 self._helper_executor = fut.ThreadPoolExecutor(
540 thread_name_prefix="remote_executor_helper",
541 max_workers=self.worker_count,
543 self.status = RemoteExecutorStatus(self.worker_count)
544 self.total_bundles_submitted = 0
545 self.backup_lock = threading.Lock()
546 self.last_backup = None
548 self.heartbeat_thread,
549 self.heartbeat_stop_event,
550 ) = self.run_periodic_heartbeat()
551 self.already_shutdown = False
554 def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
555 while not stop_event.is_set():
557 logger.debug('Running periodic heartbeat code...')
559 logger.debug('Periodic heartbeat thread shutting down.')
561 def heartbeat(self) -> None:
562 # Note: this is invoked on a background thread, not an
563 # executor thread. Be careful what you do with it b/c it
564 # needs to get back and dump status again periodically.
565 with self.status.lock:
566 self.status.periodic_dump(self.total_bundles_submitted)
568 # Look for bundles to reschedule via executor.submit
569 if config.config['executors_schedule_remote_backups']:
570 self.maybe_schedule_backup_bundles()
572 def maybe_schedule_backup_bundles(self):
573 assert self.status.lock.locked()
574 num_done = len(self.status.finished_bundle_timings)
575 num_idle_workers = self.worker_count - self.task_count
579 and num_idle_workers > 1
580 and (self.last_backup is None or (now - self.last_backup > 9.0))
581 and self.backup_lock.acquire(blocking=False)
584 assert self.backup_lock.locked()
586 bundle_to_backup = None
591 ) in self.status.in_flight_bundles_by_worker.items():
593 # Prefer to schedule backups of bundles running on
596 for record in self.workers:
597 if worker.machine == record.machine:
598 base_score = float(record.weight)
599 base_score = 1.0 / base_score
601 base_score = int(base_score)
604 for uuid in bundle_uuids:
605 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
608 and bundle.src_bundle is None
609 and bundle.backup_bundles is not None
613 # Schedule backups of bundles running
614 # longer; especially those that are
616 start_ts = self.status.start_per_bundle[uuid]
617 if start_ts is not None:
618 runtime = now - start_ts
620 logger.debug(f'score[{bundle}] => {score} # latency boost')
622 if bundle.slower_than_local_p95:
624 logger.debug(f'score[{bundle}] => {score} # >worker p95')
626 if bundle.slower_than_global_p95:
628 logger.debug(f'score[{bundle}] => {score} # >global p95')
630 # Prefer backups of bundles that don't
631 # have backups already.
632 backup_count = len(bundle.backup_bundles)
633 if backup_count == 0:
635 elif backup_count == 1:
637 elif backup_count == 2:
642 f'score[{bundle}] => {score} # {backup_count} dup backup factor'
645 if score != 0 and (best_score is None or score > best_score):
646 bundle_to_backup = bundle
647 assert bundle is not None
648 assert bundle.backup_bundles is not None
649 assert bundle.src_bundle is None
652 # Note: this is all still happening on the heartbeat
653 # runner thread. That's ok because
654 # schedule_backup_for_bundle uses the executor to
655 # submit the bundle again which will cause it to be
656 # picked up by a worker thread and allow this thread
657 # to return to run future heartbeats.
658 if bundle_to_backup is not None:
659 self.last_backup = now
661 f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
663 self.schedule_backup_for_bundle(bundle_to_backup)
665 self.backup_lock.release()
667 def is_worker_available(self) -> bool:
668 return self.policy.is_worker_available()
670 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
671 return self.policy.acquire_worker(machine_to_avoid)
673 def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
675 while not self.is_worker_available():
677 worker = self.acquire_worker(machine_to_avoid)
678 if worker is not None:
680 msg = "We should never reach this point in the code"
684 def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
685 worker = bundle.worker
686 assert worker is not None
687 logger.debug(f'Released worker {worker}')
688 self.status.record_release_worker(
696 self.adjust_task_count(-1)
698 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
699 with self.status.lock:
700 if bundle.is_cancelled.wait(timeout=0.0):
701 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
702 bundle.was_cancelled = True
706 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
707 """Find a worker for bundle or block until one is available."""
708 self.adjust_task_count(+1)
710 hostname = bundle.hostname
711 avoid_machine = override_avoid_machine
712 is_original = bundle.src_bundle is None
714 # Try not to schedule a backup on the same host as the original.
715 if avoid_machine is None and bundle.src_bundle is not None:
716 avoid_machine = bundle.src_bundle.machine
718 while worker is None:
719 worker = self.find_available_worker_or_block(avoid_machine)
720 assert worker is not None
722 # Ok, found a worker.
723 bundle.worker = worker
724 machine = bundle.machine = worker.machine
725 username = bundle.username = worker.username
726 self.status.record_acquire_worker(worker, uuid)
727 logger.debug(f'{bundle}: Running bundle on {worker}...')
729 # Before we do any work, make sure the bundle is still viable.
730 # It may have been some time between when it was submitted and
731 # now due to lack of worker availability and someone else may
732 # have already finished it.
733 if self.check_if_cancelled(bundle):
735 return self.process_work_result(bundle)
736 except Exception as e:
737 logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
738 self.release_worker(bundle)
740 # Weird. We are the original owner of this
741 # bundle. For it to have been cancelled, a backup
742 # must have already started and completed before
743 # we even for started. Moreover, the backup says
744 # it is done but we can't find the results it
745 # should have copied over. Reschedule the whole
749 f'{bundle}: We are the original owner thread and yet there are '
750 + 'no results for this bundle. This is unexpected and bad.'
752 return self.emergency_retry_nasty_bundle(bundle)
754 # Expected(?). We're a backup and our bundle is
755 # cancelled before we even got started. Something
756 # went bad in process_work_result (I acutually don't
757 # see what?) but probably not worth worrying
758 # about. Let the original thread worry about
759 # either finding the results or complaining about
763 # Send input code / data to worker machine if it's not local.
764 if hostname not in machine:
766 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
767 start_ts = time.time()
768 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
770 xfer_latency = time.time() - start_ts
771 logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
772 except Exception as e:
773 self.release_worker(bundle)
775 # Weird. We tried to copy the code to the worker and it failed...
776 # And we're the original bundle. We have to retry.
779 f"{bundle}: Failed to send instructions to the worker machine?! "
780 + "This is not expected; we\'re the original bundle so this shouldn\'t "
781 + "be a race condition. Attempting an emergency retry..."
783 return self.emergency_retry_nasty_bundle(bundle)
785 # This is actually expected; we're a backup.
786 # There's a race condition where someone else
787 # already finished the work and removed the source
788 # code file before we could copy it. No biggie.
789 msg = f'{bundle}: Failed to send instructions to the worker machine... '
790 msg += 'We\'re a backup and this may be caused by the original (or some '
791 msg += 'other backup) already finishing this work. Ignoring this.'
795 # Kick off the work. Note that if this fails we let
796 # wait_for_process deal with it.
797 self.status.record_processing_began(uuid)
799 f'{SSH} {bundle.username}@{bundle.machine} '
800 f'"source py38-venv/bin/activate &&'
801 f' /home/scott/lib/python_modules/remote_worker.py'
802 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
804 logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
805 p = cmd_in_background(cmd, silent=True)
807 logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
808 return self.wait_for_process(p, bundle, 0)
810 def wait_for_process(
811 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
813 machine = bundle.machine
818 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
821 self.release_worker(bundle)
822 return self.emergency_retry_nasty_bundle(bundle)
824 # Spin until either the ssh job we scheduled finishes the
825 # bundle or some backup worker signals that they finished it
830 except subprocess.TimeoutExpired:
831 if self.check_if_cancelled(bundle):
832 logger.info(f'{bundle}: looks like another worker finished bundle...')
835 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
839 # If we get here we believe the bundle is done; either the ssh
840 # subprocess finished (hopefully successfully) or we noticed
841 # that some other worker seems to have completed the bundle
842 # and we're bailing out.
844 ret = self.process_work_result(bundle)
845 if ret is not None and p is not None:
849 # Something went wrong; e.g. we could not copy the results
850 # back, cleanup after ourselves on the remote machine, or
851 # unpickle the results we got from the remove machine. If we
852 # still have an active ssh subprocess, keep waiting on it.
853 # Otherwise, time for an emergency reschedule.
854 except Exception as e:
856 logger.error(f'{bundle}: Something unexpected just happened...')
858 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
860 return self.wait_for_process(p, bundle, depth + 1)
862 self.release_worker(bundle)
863 return self.emergency_retry_nasty_bundle(bundle)
865 def process_work_result(self, bundle: BundleDetails) -> Any:
866 with self.status.lock:
867 is_original = bundle.src_bundle is None
868 was_cancelled = bundle.was_cancelled
869 username = bundle.username
870 machine = bundle.machine
871 result_file = bundle.result_file
872 code_file = bundle.code_file
874 # Whether original or backup, if we finished first we must
875 # fetch the results if the computation happened on a
877 bundle.end_ts = time.time()
878 if not was_cancelled:
879 assert bundle.machine is not None
880 if bundle.hostname not in bundle.machine:
881 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
883 f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
886 # If either of these throw they are handled in
892 except Exception as e:
900 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
902 logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
903 dur = bundle.end_ts - bundle.start_ts
904 self.histogram.add_item(dur)
906 # Only the original worker should unpickle the file contents
907 # though since it's the only one whose result matters. The
908 # original is also the only job that may delete result_file
909 # from disk. Note that the original may have been cancelled
910 # if one of the backups finished first; it still must read the
913 logger.debug(f"{bundle}: Unpickling {result_file}.")
915 with open(result_file, 'rb') as rb:
916 serialized = rb.read()
917 result = cloudpickle.loads(serialized)
918 except Exception as e:
920 msg = f'Failed to load {result_file}... this is bad news.'
922 self.release_worker(bundle)
924 # Re-raise the exception; the code in wait_for_process may
925 # decide to emergency_retry_nasty_bundle here.
927 logger.debug(f'Removing local (master) {code_file} and {result_file}.')
928 os.remove(f'{result_file}')
929 os.remove(f'{code_file}')
931 # Notify any backups that the original is done so they
932 # should stop ASAP. Do this whether or not we
933 # finished first since there could be more than one
935 if bundle.backup_bundles is not None:
936 for backup in bundle.backup_bundles:
937 logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
938 backup.is_cancelled.set()
940 # This is a backup job and, by now, we have already fetched
941 # the bundle results.
943 # Backup results don't matter, they just need to leave the
944 # result file in the right place for their originals to
945 # read/unpickle later.
948 # Tell the original to stop if we finished first.
949 if not was_cancelled:
950 orig_bundle = bundle.src_bundle
951 assert orig_bundle is not None
952 logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
953 orig_bundle.is_cancelled.set()
954 self.release_worker(bundle, was_cancelled=was_cancelled)
957 def create_original_bundle(self, pickle, fname: str):
958 from string_utils import generate_uuid
960 uuid = generate_uuid(omit_dashes=True)
961 code_file = f'/tmp/{uuid}.code.bin'
962 result_file = f'/tmp/{uuid}.result.bin'
964 logger.debug(f'Writing pickled code to {code_file}')
965 with open(f'{code_file}', 'wb') as wb:
968 bundle = BundleDetails(
975 hostname=platform.node(),
977 result_file=result_file,
979 start_ts=time.time(),
981 slower_than_local_p95=False,
982 slower_than_global_p95=False,
984 is_cancelled=threading.Event(),
989 self.status.record_bundle_details(bundle)
990 logger.debug(f'{bundle}: Created an original bundle')
993 def create_backup_bundle(self, src_bundle: BundleDetails):
994 assert src_bundle.backup_bundles is not None
995 n = len(src_bundle.backup_bundles)
996 uuid = src_bundle.uuid + f'_backup#{n}'
998 backup_bundle = BundleDetails(
999 pickled_code=src_bundle.pickled_code,
1001 fname=src_bundle.fname,
1005 hostname=src_bundle.hostname,
1006 code_file=src_bundle.code_file,
1007 result_file=src_bundle.result_file,
1009 start_ts=time.time(),
1011 slower_than_local_p95=False,
1012 slower_than_global_p95=False,
1013 src_bundle=src_bundle,
1014 is_cancelled=threading.Event(),
1015 was_cancelled=False,
1016 backup_bundles=None, # backup backups not allowed
1019 src_bundle.backup_bundles.append(backup_bundle)
1020 self.status.record_bundle_details_already_locked(backup_bundle)
1021 logger.debug(f'{backup_bundle}: Created a backup bundle')
1022 return backup_bundle
1024 def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1025 assert self.status.lock.locked()
1026 assert src_bundle is not None
1027 backup_bundle = self.create_backup_bundle(src_bundle)
1029 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1031 self._helper_executor.submit(self.launch, backup_bundle)
1033 # Results from backups don't matter; if they finish first
1034 # they will move the result_file to this machine and let
1035 # the original pick them up and unpickle them.
1037 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1038 is_original = bundle.src_bundle is None
1039 bundle.worker = None
1040 avoid_last_machine = bundle.machine
1041 bundle.machine = None
1042 bundle.username = None
1043 bundle.failure_count += 1
1049 if bundle.failure_count > retry_limit:
1051 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1054 raise RemoteExecutorException(
1055 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1059 f'{bundle}: At least it\'s only a backup; better luck with the others.'
1063 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1066 return self.launch(bundle, avoid_last_machine)
1069 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1070 if self.already_shutdown:
1071 raise Exception('Submitted work after shutdown.')
1072 pickle = make_cloud_pickle(function, *args, **kwargs)
1073 bundle = self.create_original_bundle(pickle, function.__name__)
1074 self.total_bundles_submitted += 1
1075 return self._helper_executor.submit(self.launch, bundle)
1078 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1079 if not self.already_shutdown:
1080 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1081 self.heartbeat_stop_event.set()
1082 self.heartbeat_thread.join()
1083 self._helper_executor.shutdown(wait)
1085 print(self.histogram.__repr__(label_formatter='%ds'))
1086 self.already_shutdown = True
1090 class DefaultExecutors(object):
1092 self.thread_executor: Optional[ThreadExecutor] = None
1093 self.process_executor: Optional[ProcessExecutor] = None
1094 self.remote_executor: Optional[RemoteExecutor] = None
1096 def ping(self, host) -> bool:
1097 logger.debug(f'RUN> ping -c 1 {host}')
1099 x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1104 def thread_pool(self) -> ThreadExecutor:
1105 if self.thread_executor is None:
1106 self.thread_executor = ThreadExecutor()
1107 return self.thread_executor
1109 def process_pool(self) -> ProcessExecutor:
1110 if self.process_executor is None:
1111 self.process_executor = ProcessExecutor()
1112 return self.process_executor
1114 def remote_pool(self) -> RemoteExecutor:
1115 if self.remote_executor is None:
1116 logger.info('Looking for some helper machines...')
1117 pool: List[RemoteWorkerRecord] = []
1118 if self.ping('cheetah.house'):
1119 logger.info('Found cheetah.house')
1123 machine='cheetah.house',
1128 if self.ping('meerkat.cabin'):
1129 logger.info('Found meerkat.cabin')
1133 machine='meerkat.cabin',
1138 if self.ping('wannabe.house'):
1139 logger.info('Found wannabe.house')
1143 machine='wannabe.house',
1148 if self.ping('puma.cabin'):
1149 logger.info('Found puma.cabin')
1153 machine='puma.cabin',
1158 if self.ping('backup.house'):
1159 logger.info('Found backup.house')
1163 machine='backup.house',
1169 # The controller machine has a lot to do; go easy on it.
1171 if record.machine == platform.node() and record.count > 1:
1172 logger.info(f'Reducing workload for {record.machine}.')
1175 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1176 policy.register_worker_pool(pool)
1177 self.remote_executor = RemoteExecutor(pool, policy)
1178 return self.remote_executor
1180 def shutdown(self) -> None:
1181 if self.thread_executor is not None:
1182 self.thread_executor.shutdown(wait=True, quiet=True)
1183 self.thread_executor = None
1184 if self.process_executor is not None:
1185 self.process_executor.shutdown(wait=True, quiet=True)
1186 self.process_executor = None
1187 if self.remote_executor is not None:
1188 self.remote_executor.shutdown(wait=True, quiet=True)
1189 self.remote_executor = None