3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
20 import cloudpickle # type: ignore
21 from overrides import overrides
23 from ansi import bg, fg, underline, reset
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
30 logger = logging.getLogger(__name__)
32 parser = config.add_commandline_args(
33 f"Executors ({__file__})",
34 "Args related to processing executors."
37 '--executors_threadpool_size',
40 help='Number of threads in the default threadpool, leave unset for default',
44 '--executors_processpool_size',
47 help='Number of processes in the default processpool, leave unset for default',
51 '--executors_schedule_remote_backups',
53 action=argparse_utils.ActionNoYes,
54 help='Should we schedule duplicative backup work if a remote bundle is slow',
57 '--executors_max_bundle_failures',
61 help='Maximum number of failures before giving up on a bundle',
64 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
65 SSH = 'ssh -oForwardX11=no'
68 def make_cloud_pickle(fun, *args, **kwargs):
69 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
70 return cloudpickle.dumps((fun, args, kwargs))
73 class BaseExecutor(ABC):
74 def __init__(self, *, title=''):
77 self.histogram = hist.SimpleHistogram(
78 hist.SimpleHistogram.n_evenly_spaced_buckets(
87 **kwargs) -> fut.Future:
92 wait: bool = True) -> None:
95 def adjust_task_count(self, delta: int) -> None:
96 self.task_count += delta
97 logger.debug(f'Executor current task count is {self.task_count}')
100 class ThreadExecutor(BaseExecutor):
102 max_workers: Optional[int] = None):
105 if max_workers is not None:
106 workers = max_workers
107 elif 'executors_threadpool_size' in config.config:
108 workers = config.config['executors_threadpool_size']
109 logger.debug(f'Creating threadpool executor with {workers} workers')
110 self._thread_pool_executor = fut.ThreadPoolExecutor(
112 thread_name_prefix="thread_executor_helper"
115 def run_local_bundle(self, fun, *args, **kwargs):
116 logger.debug(f"Running local bundle at {fun.__name__}")
118 result = fun(*args, **kwargs)
120 self.adjust_task_count(-1)
121 duration = end - start
122 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
123 self.histogram.add_item(duration)
130 **kwargs) -> fut.Future:
131 self.adjust_task_count(+1)
133 newargs.append(function)
136 return self._thread_pool_executor.submit(
137 self.run_local_bundle,
143 wait = True) -> None:
144 logger.debug(f'Shutting down threadpool executor {self.title}')
145 print(self.histogram)
146 self._thread_pool_executor.shutdown(wait)
149 class ProcessExecutor(BaseExecutor):
154 if max_workers is not None:
155 workers = max_workers
156 elif 'executors_processpool_size' in config.config:
157 workers = config.config['executors_processpool_size']
158 logger.debug(f'Creating processpool executor with {workers} workers.')
159 self._process_executor = fut.ProcessPoolExecutor(
163 def run_cloud_pickle(self, pickle):
164 fun, args, kwargs = cloudpickle.loads(pickle)
165 logger.debug(f"Running pickled bundle at {fun.__name__}")
166 result = fun(*args, **kwargs)
167 self.adjust_task_count(-1)
174 **kwargs) -> fut.Future:
176 self.adjust_task_count(+1)
177 pickle = make_cloud_pickle(function, *args, **kwargs)
178 result = self._process_executor.submit(
179 self.run_cloud_pickle,
182 result.add_done_callback(
183 lambda _: self.histogram.add_item(
190 def shutdown(self, wait=True) -> None:
191 logger.debug(f'Shutting down processpool executor {self.title}')
192 self._process_executor.shutdown(wait)
193 print(self.histogram)
195 def __getstate__(self):
196 state = self.__dict__.copy()
197 state['_process_executor'] = None
201 class RemoteExecutorException(Exception):
202 """Thrown when a bundle cannot be executed despite several retries."""
207 class RemoteWorkerRecord:
214 return hash((self.username, self.machine))
217 return f'{self.username}@{self.machine}'
225 worker: Optional[RemoteWorkerRecord]
226 username: Optional[str]
227 machine: Optional[str]
234 slower_than_local_p95: bool
235 slower_than_global_p95: bool
236 src_bundle: BundleDetails
237 is_cancelled: threading.Event
239 backup_bundles: Optional[List[BundleDetails]]
244 if uuid[-9:-2] == '_backup':
246 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
256 fg('marigold yellow'),
259 fg('cornflower blue'),
260 fg('turquoise blue'),
262 fg('lavender purple'),
265 c = colorz[int(uuid[-2:], 16) % len(colorz)]
266 fname = self.fname if self.fname is not None else 'nofname'
267 machine = self.machine if self.machine is not None else 'nomachine'
268 return f'{c}{suffix}/{fname}/{machine}{reset()}'
271 class RemoteExecutorStatus:
272 def __init__(self, total_worker_count: int) -> None:
273 self.worker_count = total_worker_count
274 self.known_workers: Set[RemoteWorkerRecord] = set()
275 self.start_per_bundle: Dict[str, float] = defaultdict(float)
276 self.end_per_bundle: Dict[str, float] = defaultdict(float)
277 self.finished_bundle_timings_per_worker: Dict[
281 self.in_flight_bundles_by_worker: Dict[
285 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
286 self.finished_bundle_timings: List[float] = []
287 self.last_periodic_dump: Optional[float] = None
288 self.total_bundles_submitted = 0
290 # Protects reads and modification using self. Also used
291 # as a memory fence for modifications to bundle.
292 self.lock = threading.Lock()
294 def record_acquire_worker(
296 worker: RemoteWorkerRecord,
300 self.record_acquire_worker_already_locked(
305 def record_acquire_worker_already_locked(
307 worker: RemoteWorkerRecord,
310 assert self.lock.locked()
311 self.known_workers.add(worker)
312 self.start_per_bundle[uuid] = None
313 x = self.in_flight_bundles_by_worker.get(worker, set())
315 self.in_flight_bundles_by_worker[worker] = x
317 def record_bundle_details(
319 details: BundleDetails) -> None:
321 self.record_bundle_details_already_locked(details)
323 def record_bundle_details_already_locked(
325 details: BundleDetails) -> None:
326 assert self.lock.locked()
327 self.bundle_details_by_uuid[details.uuid] = details
329 def record_release_worker(
331 worker: RemoteWorkerRecord,
336 self.record_release_worker_already_locked(
337 worker, uuid, was_cancelled
340 def record_release_worker_already_locked(
342 worker: RemoteWorkerRecord,
346 assert self.lock.locked()
348 self.end_per_bundle[uuid] = ts
349 self.in_flight_bundles_by_worker[worker].remove(uuid)
350 if not was_cancelled:
351 bundle_latency = ts - self.start_per_bundle[uuid]
352 x = self.finished_bundle_timings_per_worker.get(worker, list())
353 x.append(bundle_latency)
354 self.finished_bundle_timings_per_worker[worker] = x
355 self.finished_bundle_timings.append(bundle_latency)
357 def record_processing_began(self, uuid: str):
359 self.start_per_bundle[uuid] = time.time()
361 def total_in_flight(self) -> int:
362 assert self.lock.locked()
364 for worker in self.known_workers:
365 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
366 return total_in_flight
368 def total_idle(self) -> int:
369 assert self.lock.locked()
370 return self.worker_count - self.total_in_flight()
373 assert self.lock.locked()
375 total_finished = len(self.finished_bundle_timings)
376 total_in_flight = self.total_in_flight()
377 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
379 if len(self.finished_bundle_timings) > 1:
380 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
382 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
383 f'✅={total_finished}/{self.total_bundles_submitted}, '
384 f'💻n={total_in_flight}/{self.worker_count}\n'
388 f' ✅={total_finished}/{self.total_bundles_submitted}, '
389 f'💻n={total_in_flight}/{self.worker_count}\n'
392 for worker in self.known_workers:
393 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
394 timings = self.finished_bundle_timings_per_worker.get(worker, [])
398 qworker = numpy.quantile(timings, [0.5, 0.95])
399 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
403 ret += f' ...finished {count} total bundle(s) so far\n'
404 in_flight = len(self.in_flight_bundles_by_worker[worker])
406 ret += f' ...{in_flight} bundles currently in flight:\n'
407 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
408 details = self.bundle_details_by_uuid.get(
412 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
413 if self.start_per_bundle[bundle_uuid] is not None:
414 sec = ts - self.start_per_bundle[bundle_uuid]
415 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
417 ret += f' {details} setting up / copying data...'
420 if qworker is not None:
422 ret += f'{bg("red")}>💻p95{reset()} '
423 if details is not None:
424 details.slower_than_local_p95 = True
426 if details is not None:
427 details.slower_than_local_p95 = False
431 ret += f'{bg("red")}>∀p95{reset()} '
432 if details is not None:
433 details.slower_than_global_p95 = True
435 details.slower_than_global_p95 = False
439 def periodic_dump(self, total_bundles_submitted: int) -> None:
440 assert self.lock.locked()
441 self.total_bundles_submitted = total_bundles_submitted
444 self.last_periodic_dump is None
445 or ts - self.last_periodic_dump > 5.0
448 self.last_periodic_dump = ts
451 class RemoteWorkerSelectionPolicy(ABC):
452 def register_worker_pool(self, workers):
453 self.workers = workers
456 def is_worker_available(self) -> bool:
462 machine_to_avoid = None
463 ) -> Optional[RemoteWorkerRecord]:
467 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
469 def is_worker_available(self) -> bool:
470 for worker in self.workers:
478 machine_to_avoid = None
479 ) -> Optional[RemoteWorkerRecord]:
481 for worker in self.workers:
482 for x in range(0, worker.count):
483 for y in range(0, worker.weight):
484 grabbag.append(worker)
486 for _ in range(0, 5):
487 random.shuffle(grabbag)
489 if worker.machine != machine_to_avoid or _ > 2:
492 logger.debug(f'Selected worker {worker}')
494 msg = 'Unexpectedly could not find a worker, retrying...'
500 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
501 def __init__(self) -> None:
505 def is_worker_available(self) -> bool:
506 for worker in self.workers:
514 machine_to_avoid: str = None
515 ) -> Optional[RemoteWorkerRecord]:
518 worker = self.workers[x]
522 if x >= len(self.workers):
525 logger.debug(f'Selected worker {worker}')
528 if x >= len(self.workers):
531 msg = 'Unexpectedly could not find a worker, retrying...'
537 class RemoteExecutor(BaseExecutor):
539 workers: List[RemoteWorkerRecord],
540 policy: RemoteWorkerSelectionPolicy) -> None:
542 self.workers = workers
544 self.worker_count = 0
545 for worker in self.workers:
546 self.worker_count += worker.count
547 if self.worker_count <= 0:
548 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
550 raise RemoteExecutorException(msg)
551 self.policy.register_worker_pool(self.workers)
552 self.cv = threading.Condition()
553 logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
554 self._helper_executor = fut.ThreadPoolExecutor(
555 thread_name_prefix="remote_executor_helper",
556 max_workers=self.worker_count,
558 self.status = RemoteExecutorStatus(self.worker_count)
559 self.total_bundles_submitted = 0
560 self.backup_lock = threading.Lock()
561 self.last_backup = None
563 def is_worker_available(self) -> bool:
564 return self.policy.is_worker_available()
568 machine_to_avoid: str = None
569 ) -> Optional[RemoteWorkerRecord]:
570 return self.policy.acquire_worker(machine_to_avoid)
572 def find_available_worker_or_block(
574 machine_to_avoid: str = None
575 ) -> RemoteWorkerRecord:
577 while not self.is_worker_available():
579 worker = self.acquire_worker(machine_to_avoid)
580 if worker is not None:
582 msg = "We should never reach this point in the code"
586 def release_worker(self, worker: RemoteWorkerRecord) -> None:
587 logger.debug(f'Released worker {worker}')
592 def heartbeat(self) -> None:
593 with self.status.lock:
594 # Regular progress report
595 self.status.periodic_dump(self.total_bundles_submitted)
597 # Look for bundles to reschedule.
598 num_done = len(self.status.finished_bundle_timings)
599 num_idle_workers = self.worker_count - self.task_count
602 config.config['executors_schedule_remote_backups']
604 and num_idle_workers > 1
605 and (self.last_backup is None or (now - self.last_backup > 1.0))
606 and self.backup_lock.acquire(blocking=False)
609 assert self.backup_lock.locked()
611 bundle_to_backup = None
613 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
614 # Prefer to schedule backups of bundles on slower machines.
616 for record in self.workers:
617 if worker.machine == record.machine:
618 base_score = float(record.weight)
619 base_score = 1.0 / base_score
621 base_score = int(base_score)
624 for uuid in bundle_uuids:
625 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
628 and bundle.src_bundle is None
629 and bundle.backup_bundles is not None
633 # Schedule backups of bundles running longer; especially those
634 # that are unexpectedly slow.
635 start_ts = self.status.start_per_bundle[uuid]
636 if start_ts is not None:
637 runtime = now - start_ts
639 logger.debug(f'score[{bundle}] => {score} # latency boost')
641 if bundle.slower_than_local_p95:
643 logger.debug(f'score[{bundle}] => {score} # >worker p95')
645 if bundle.slower_than_global_p95:
647 logger.debug(f'score[{bundle}] => {score} # >global p95')
649 # Prefer backups of bundles that don't have backups already.
650 backup_count = len(bundle.backup_bundles)
651 if backup_count == 0:
653 elif backup_count == 1:
655 elif backup_count == 2:
659 logger.debug(f'score[{bundle}] => {score} # {backup_count} dup backup factor')
663 and (best_score is None or score > best_score)
665 bundle_to_backup = bundle
666 assert bundle is not None
667 assert bundle.backup_bundles is not None
668 assert bundle.src_bundle is None
671 if bundle_to_backup is not None:
672 self.last_backup = now
673 logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====')
674 self.schedule_backup_for_bundle(bundle_to_backup)
676 self.backup_lock.release()
678 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
679 with self.status.lock:
680 if bundle.is_cancelled.wait(timeout=0.0):
681 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
682 bundle.was_cancelled = True
686 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
687 """Find a worker for bundle or block until one is available."""
688 self.adjust_task_count(+1)
690 hostname = bundle.hostname
691 avoid_machine = override_avoid_machine
692 is_original = bundle.src_bundle is None
694 # Try not to schedule a backup on the same host as the original.
695 if avoid_machine is None and bundle.src_bundle is not None:
696 avoid_machine = bundle.src_bundle.machine
698 while worker is None:
699 worker = self.find_available_worker_or_block(avoid_machine)
701 # Ok, found a worker.
702 bundle.worker = worker
703 machine = bundle.machine = worker.machine
704 username = bundle.username = worker.username
706 self.status.record_acquire_worker(worker, uuid)
707 logger.debug(f'{bundle}: Running bundle on {worker}...')
709 # Before we do any work, make sure the bundle is still viable.
710 if self.check_if_cancelled(bundle):
712 return self.post_launch_work(bundle)
713 except Exception as e:
716 f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
718 assert bundle.worker is not None
719 self.status.record_release_worker(
724 self.release_worker(bundle.worker)
725 self.adjust_task_count(-1)
727 # Weird. We are the original owner of this
728 # bundle. For it to have been cancelled, a backup
729 # must have already started and completed before
730 # we even for started. Moreover, the backup says
731 # it is done but we can't find the results it
732 # should have copied over. Reschedule the whole
734 return self.emergency_retry_nasty_bundle(bundle)
736 # Expected(?). We're a backup and our bundle is
737 # cancelled before we even got started. Something
738 # went bad in post_launch_work (I acutually don't
739 # see what?) but probably not worth worrying
743 # Send input code / data to worker machine if it's not local.
744 if hostname not in machine:
746 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
747 start_ts = time.time()
748 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
750 xfer_latency = time.time() - start_ts
751 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
752 except Exception as e:
753 assert bundle.worker is not None
754 self.status.record_release_worker(
759 self.release_worker(bundle.worker)
760 self.adjust_task_count(-1)
762 # Weird. We tried to copy the code to the worker and it failed...
763 # And we're the original bundle. We have to retry.
766 f'{bundle}: Failed to send instructions to the worker machine?! ' +
767 'This is not expected; we\'re the original bundle so this shouldn\'t ' +
768 'be a race condition. Attempting an emergency retry...'
770 return self.emergency_retry_nasty_bundle(bundle)
772 # This is actually expected; we're a backup.
773 # There's a race condition where someone else
774 # already finished the work and removed the source
775 # code file before we could copy it. No biggie.
776 msg = f'{bundle}: Failed to send instructions to the worker machine... '
777 msg += 'We\'re a backup and this may be caused by the original (or some '
778 msg += 'other backup) already finishing this work. Ignoring this.'
783 # Kick off the work. Note that if this fails we let
784 # wait_for_process deal with it.
785 self.status.record_processing_began(uuid)
786 cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
787 f'"source py38-venv/bin/activate &&'
788 f' /home/scott/lib/python_modules/remote_worker.py'
789 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
790 logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
791 p = cmd_in_background(cmd, silent=True)
792 bundle.pid = pid = p.pid
793 logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.')
794 return self.wait_for_process(p, bundle, 0)
796 def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
797 machine = bundle.machine
801 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
804 self.status.record_release_worker(
809 self.release_worker(bundle.worker)
810 self.adjust_task_count(-1)
811 return self.emergency_retry_nasty_bundle(bundle)
813 # Spin until either the ssh job we scheduled finishes the
814 # bundle or some backup worker signals that they finished it
819 except subprocess.TimeoutExpired:
821 if self.check_if_cancelled(bundle):
823 f'{bundle}: another worker finished bundle, checking it out...'
828 f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..."
833 # If we get here we believe the bundle is done; either the ssh
834 # subprocess finished (hopefully successfully) or we noticed
835 # that some other worker seems to have completed the bundle
836 # and we're bailing out.
838 ret = self.post_launch_work(bundle)
839 if ret is not None and p is not None:
843 # Something went wrong; e.g. we could not copy the results
844 # back, cleanup after ourselves on the remote machine, or
845 # unpickle the results we got from the remove machine. If we
846 # still have an active ssh subprocess, keep waiting on it.
847 # Otherwise, time for an emergency reschedule.
848 except Exception as e:
850 logger.error(f'{bundle}: Something unexpected just happened...')
852 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
855 return self.wait_for_process(p, bundle, depth + 1)
857 self.status.record_release_worker(
862 self.release_worker(bundle.worker)
863 self.adjust_task_count(-1)
864 return self.emergency_retry_nasty_bundle(bundle)
866 def post_launch_work(self, bundle: BundleDetails) -> Any:
867 with self.status.lock:
868 is_original = bundle.src_bundle is None
869 was_cancelled = bundle.was_cancelled
870 username = bundle.username
871 machine = bundle.machine
872 result_file = bundle.result_file
873 code_file = bundle.code_file
875 # Whether original or backup, if we finished first we must
876 # fetch the results if the computation happened on a
878 bundle.end_ts = time.time()
879 if not was_cancelled:
880 assert bundle.machine is not None
881 if bundle.hostname not in bundle.machine:
882 cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
884 f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
887 # If either of these throw they are handled in
890 run_silently(f'{SSH} {username}@{machine}'
891 f' "/bin/rm -f {code_file} {result_file}"')
892 dur = bundle.end_ts - bundle.start_ts
893 self.histogram.add_item(dur)
895 # Only the original worker should unpickle the file contents
896 # though since it's the only one whose result matters. The
897 # original is also the only job that may delete result_file
898 # from disk. Note that the original may have been cancelled
899 # if one of the backups finished first; it still must read the
902 logger.debug(f"{bundle}: Unpickling {result_file}.")
904 with open(result_file, 'rb') as rb:
905 serialized = rb.read()
906 result = cloudpickle.loads(serialized)
907 except Exception as e:
908 msg = f'Failed to load {result_file}, this is bad news.'
910 self.status.record_release_worker(
915 self.release_worker(bundle.worker)
917 # Re-raise the exception; the code in wait_for_process may
918 # decide to emergency_retry_nasty_bundle here.
922 f'Removing local (master) {code_file} and {result_file}.'
924 os.remove(f'{result_file}')
925 os.remove(f'{code_file}')
927 # Notify any backups that the original is done so they
928 # should stop ASAP. Do this whether or not we
929 # finished first since there could be more than one
931 if bundle.backup_bundles is not None:
932 for backup in bundle.backup_bundles:
934 f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
936 backup.is_cancelled.set()
938 # This is a backup job and, by now, we have already fetched
939 # the bundle results.
941 # Backup results don't matter, they just need to leave the
942 # result file in the right place for their originals to
943 # read/unpickle later.
946 # Tell the original to stop if we finished first.
947 if not was_cancelled:
949 f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
951 bundle.src_bundle.is_cancelled.set()
953 assert bundle.worker is not None
954 self.status.record_release_worker(
959 self.release_worker(bundle.worker)
960 self.adjust_task_count(-1)
963 def create_original_bundle(self, pickle, fname: str):
964 from string_utils import generate_uuid
965 uuid = generate_uuid(omit_dashes=True)
966 code_file = f'/tmp/{uuid}.code.bin'
967 result_file = f'/tmp/{uuid}.result.bin'
969 logger.debug(f'Writing pickled code to {code_file}')
970 with open(f'{code_file}', 'wb') as wb:
973 bundle = BundleDetails(
974 pickled_code = pickle,
980 hostname = platform.node(),
981 code_file = code_file,
982 result_file = result_file,
984 start_ts = time.time(),
986 slower_than_local_p95 = False,
987 slower_than_global_p95 = False,
989 is_cancelled = threading.Event(),
990 was_cancelled = False,
994 self.status.record_bundle_details(bundle)
995 logger.debug(f'{bundle}: Created an original bundle')
998 def create_backup_bundle(self, src_bundle: BundleDetails):
999 assert src_bundle.backup_bundles is not None
1000 n = len(src_bundle.backup_bundles)
1001 uuid = src_bundle.uuid + f'_backup#{n}'
1003 backup_bundle = BundleDetails(
1004 pickled_code = src_bundle.pickled_code,
1006 fname = src_bundle.fname,
1010 hostname = src_bundle.hostname,
1011 code_file = src_bundle.code_file,
1012 result_file = src_bundle.result_file,
1014 start_ts = time.time(),
1016 slower_than_local_p95 = False,
1017 slower_than_global_p95 = False,
1018 src_bundle = src_bundle,
1019 is_cancelled = threading.Event(),
1020 was_cancelled = False,
1021 backup_bundles = None, # backup backups not allowed
1024 src_bundle.backup_bundles.append(backup_bundle)
1025 self.status.record_bundle_details_already_locked(backup_bundle)
1026 logger.debug(f'{backup_bundle}: Created a backup bundle')
1027 return backup_bundle
1029 def schedule_backup_for_bundle(self,
1030 src_bundle: BundleDetails):
1031 assert self.status.lock.locked()
1032 assert src_bundle is not None
1033 backup_bundle = self.create_backup_bundle(src_bundle)
1035 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1037 self._helper_executor.submit(self.launch, backup_bundle)
1039 # Results from backups don't matter; if they finish first
1040 # they will move the result_file to this machine and let
1041 # the original pick them up and unpickle them.
1043 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1044 is_original = bundle.src_bundle is None
1045 bundle.worker = None
1046 avoid_last_machine = bundle.machine
1047 bundle.machine = None
1048 bundle.username = None
1049 bundle.failure_count += 1
1055 if bundle.failure_count > retry_limit:
1057 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1060 raise RemoteExecutorException(
1061 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1064 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
1067 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1070 return self.launch(bundle, avoid_last_machine)
1076 **kwargs) -> fut.Future:
1077 pickle = make_cloud_pickle(function, *args, **kwargs)
1078 bundle = self.create_original_bundle(pickle, function.__name__)
1079 self.total_bundles_submitted += 1
1080 return self._helper_executor.submit(self.launch, bundle)
1083 def shutdown(self, wait=True) -> None:
1084 self._helper_executor.shutdown(wait)
1085 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1086 print(self.histogram)
1090 class DefaultExecutors(object):
1092 self.thread_executor: Optional[ThreadExecutor] = None
1093 self.process_executor: Optional[ProcessExecutor] = None
1094 self.remote_executor: Optional[RemoteExecutor] = None
1096 def ping(self, host) -> bool:
1097 logger.debug(f'RUN> ping -c 1 {host}')
1099 x = cmd_with_timeout(
1100 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1107 def thread_pool(self) -> ThreadExecutor:
1108 if self.thread_executor is None:
1109 self.thread_executor = ThreadExecutor()
1110 return self.thread_executor
1112 def process_pool(self) -> ProcessExecutor:
1113 if self.process_executor is None:
1114 self.process_executor = ProcessExecutor()
1115 return self.process_executor
1117 def remote_pool(self) -> RemoteExecutor:
1118 if self.remote_executor is None:
1119 logger.info('Looking for some helper machines...')
1120 pool: List[RemoteWorkerRecord] = []
1121 if self.ping('cheetah.house'):
1122 logger.info('Found cheetah.house')
1126 machine = 'cheetah.house',
1131 if self.ping('meerkat.cabin'):
1132 logger.info('Found meerkat.cabin')
1136 machine = 'meerkat.cabin',
1141 # if self.ping('kiosk.house'):
1142 # logger.info('Found kiosk.house')
1144 # RemoteWorkerRecord(
1146 # machine = 'kiosk.house',
1151 if self.ping('hero.house'):
1152 logger.info('Found hero.house')
1156 machine = 'hero.house',
1161 if self.ping('puma.cabin'):
1162 logger.info('Found puma.cabin')
1166 machine = 'puma.cabin',
1171 if self.ping('backup.house'):
1172 logger.info('Found backup.house')
1176 machine = 'backup.house',
1182 # The controller machine has a lot to do; go easy on it.
1184 if record.machine == platform.node() and record.count > 1:
1185 logger.info(f'Reducing workload for {record.machine}.')
1188 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1189 policy.register_worker_pool(pool)
1190 self.remote_executor = RemoteExecutor(pool, policy)
1191 return self.remote_executor