3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
19 import cloudpickle # type: ignore
20 from overrides import overrides
22 from ansi import bg, fg, underline, reset
25 from decorator_utils import singleton
26 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
27 import histogram as hist
29 logger = logging.getLogger(__name__)
31 parser = config.add_commandline_args(
32 f"Executors ({__file__})",
33 "Args related to processing executors."
36 '--executors_threadpool_size',
39 help='Number of threads in the default threadpool, leave unset for default',
43 '--executors_processpool_size',
46 help='Number of processes in the default processpool, leave unset for default',
50 '--executors_schedule_remote_backups',
52 action=argparse_utils.ActionNoYes,
53 help='Should we schedule duplicative backup work if a remote bundle is slow',
56 '--executors_max_bundle_failures',
60 help='Maximum number of failures before giving up on a bundle',
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
67 def make_cloud_pickle(fun, *args, **kwargs):
68 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
69 return cloudpickle.dumps((fun, args, kwargs))
72 class BaseExecutor(ABC):
73 def __init__(self, *, title=''):
76 self.histogram = hist.SimpleHistogram(
77 hist.SimpleHistogram.n_evenly_spaced_buckets(
86 **kwargs) -> fut.Future:
91 wait: bool = True) -> None:
94 def adjust_task_count(self, delta: int) -> None:
95 self.task_count += delta
96 logger.debug(f'Executor current task count is {self.task_count}')
99 class ThreadExecutor(BaseExecutor):
101 max_workers: Optional[int] = None):
104 if max_workers is not None:
105 workers = max_workers
106 elif 'executors_threadpool_size' in config.config:
107 workers = config.config['executors_threadpool_size']
108 logger.debug(f'Creating threadpool executor with {workers} workers')
109 self._thread_pool_executor = fut.ThreadPoolExecutor(
111 thread_name_prefix="thread_executor_helper"
114 def run_local_bundle(self, fun, *args, **kwargs):
115 logger.debug(f"Running local bundle at {fun.__name__}")
117 result = fun(*args, **kwargs)
119 self.adjust_task_count(-1)
120 duration = end - start
121 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
122 self.histogram.add_item(duration)
129 **kwargs) -> fut.Future:
130 self.adjust_task_count(+1)
132 newargs.append(function)
135 return self._thread_pool_executor.submit(
136 self.run_local_bundle,
142 wait = True) -> None:
143 logger.debug(f'Shutting down threadpool executor {self.title}')
144 print(self.histogram)
145 self._thread_pool_executor.shutdown(wait)
148 class ProcessExecutor(BaseExecutor):
153 if max_workers is not None:
154 workers = max_workers
155 elif 'executors_processpool_size' in config.config:
156 workers = config.config['executors_processpool_size']
157 logger.debug(f'Creating processpool executor with {workers} workers.')
158 self._process_executor = fut.ProcessPoolExecutor(
162 def run_cloud_pickle(self, pickle):
163 fun, args, kwargs = cloudpickle.loads(pickle)
164 logger.debug(f"Running pickled bundle at {fun.__name__}")
165 result = fun(*args, **kwargs)
166 self.adjust_task_count(-1)
173 **kwargs) -> fut.Future:
175 self.adjust_task_count(+1)
176 pickle = make_cloud_pickle(function, *args, **kwargs)
177 result = self._process_executor.submit(
178 self.run_cloud_pickle,
181 result.add_done_callback(
182 lambda _: self.histogram.add_item(
189 def shutdown(self, wait=True) -> None:
190 logger.debug(f'Shutting down processpool executor {self.title}')
191 self._process_executor.shutdown(wait)
192 print(self.histogram)
194 def __getstate__(self):
195 state = self.__dict__.copy()
196 state['_process_executor'] = None
200 class RemoteExecutorException(Exception):
201 """Thrown when a bundle cannot be executed despite several retries."""
206 class RemoteWorkerRecord:
213 return hash((self.username, self.machine))
216 return f'{self.username}@{self.machine}'
224 worker: Optional[RemoteWorkerRecord]
225 username: Optional[str]
226 machine: Optional[str]
233 slower_than_local_p95: bool
234 slower_than_global_p95: bool
235 src_bundle: BundleDetails
236 is_cancelled: threading.Event
238 backup_bundles: Optional[List[BundleDetails]]
243 if uuid[-9:-2] == '_backup':
245 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
255 fg('marigold yellow'),
258 fg('cornflower blue'),
259 fg('turquoise blue'),
261 fg('lavender purple'),
264 c = colorz[int(uuid[-2:], 16) % len(colorz)]
265 fname = self.fname if self.fname is not None else 'nofname'
266 machine = self.machine if self.machine is not None else 'nomachine'
267 return f'{c}{suffix}/{fname}/{machine}{reset()}'
270 class RemoteExecutorStatus:
271 def __init__(self, total_worker_count: int) -> None:
272 self.worker_count = total_worker_count
273 self.known_workers: Set[RemoteWorkerRecord] = set()
274 self.start_per_bundle: Dict[str, float] = defaultdict(float)
275 self.end_per_bundle: Dict[str, float] = defaultdict(float)
276 self.finished_bundle_timings_per_worker: Dict[
280 self.in_flight_bundles_by_worker: Dict[
284 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
285 self.finished_bundle_timings: List[float] = []
286 self.last_periodic_dump: Optional[float] = None
287 self.total_bundles_submitted = 0
289 # Protects reads and modification using self. Also used
290 # as a memory fence for modifications to bundle.
291 self.lock = threading.Lock()
293 def record_acquire_worker(
295 worker: RemoteWorkerRecord,
299 self.record_acquire_worker_already_locked(
304 def record_acquire_worker_already_locked(
306 worker: RemoteWorkerRecord,
309 assert self.lock.locked()
310 self.known_workers.add(worker)
311 self.start_per_bundle[uuid] = None
312 x = self.in_flight_bundles_by_worker.get(worker, set())
314 self.in_flight_bundles_by_worker[worker] = x
316 def record_bundle_details(
318 details: BundleDetails) -> None:
320 self.record_bundle_details_already_locked(details)
322 def record_bundle_details_already_locked(
324 details: BundleDetails) -> None:
325 assert self.lock.locked()
326 self.bundle_details_by_uuid[details.uuid] = details
328 def record_release_worker(
330 worker: RemoteWorkerRecord,
335 self.record_release_worker_already_locked(
336 worker, uuid, was_cancelled
339 def record_release_worker_already_locked(
341 worker: RemoteWorkerRecord,
345 assert self.lock.locked()
347 self.end_per_bundle[uuid] = ts
348 self.in_flight_bundles_by_worker[worker].remove(uuid)
349 if not was_cancelled:
350 bundle_latency = ts - self.start_per_bundle[uuid]
351 x = self.finished_bundle_timings_per_worker.get(worker, list())
352 x.append(bundle_latency)
353 self.finished_bundle_timings_per_worker[worker] = x
354 self.finished_bundle_timings.append(bundle_latency)
356 def record_processing_began(self, uuid: str):
358 self.start_per_bundle[uuid] = time.time()
360 def total_in_flight(self) -> int:
361 assert self.lock.locked()
363 for worker in self.known_workers:
364 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
365 return total_in_flight
367 def total_idle(self) -> int:
368 assert self.lock.locked()
369 return self.worker_count - self.total_in_flight()
372 assert self.lock.locked()
374 total_finished = len(self.finished_bundle_timings)
375 total_in_flight = self.total_in_flight()
376 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
378 if len(self.finished_bundle_timings) > 1:
379 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
381 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
382 f'✅={total_finished}/{self.total_bundles_submitted}, '
383 f'💻n={total_in_flight}/{self.worker_count}\n'
387 f' ✅={total_finished}/{self.total_bundles_submitted}, '
388 f'💻n={total_in_flight}/{self.worker_count}\n'
391 for worker in self.known_workers:
392 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
393 timings = self.finished_bundle_timings_per_worker.get(worker, [])
397 qworker = numpy.quantile(timings, [0.5, 0.95])
398 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
402 ret += f' ...finished {count} total bundle(s) so far\n'
403 in_flight = len(self.in_flight_bundles_by_worker[worker])
405 ret += f' ...{in_flight} bundles currently in flight:\n'
406 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
407 details = self.bundle_details_by_uuid.get(
411 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
412 if self.start_per_bundle[bundle_uuid] is not None:
413 sec = ts - self.start_per_bundle[bundle_uuid]
414 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
416 ret += f' {details} setting up / copying data...'
419 if qworker is not None:
421 ret += f'{bg("red")}>💻p95{reset()} '
422 if details is not None:
423 details.slower_than_local_p95 = True
425 if details is not None:
426 details.slower_than_local_p95 = False
430 ret += f'{bg("red")}>∀p95{reset()} '
431 if details is not None:
432 details.slower_than_global_p95 = True
434 details.slower_than_global_p95 = False
438 def periodic_dump(self, total_bundles_submitted: int) -> None:
439 assert self.lock.locked()
440 self.total_bundles_submitted = total_bundles_submitted
443 self.last_periodic_dump is None
444 or ts - self.last_periodic_dump > 5.0
447 self.last_periodic_dump = ts
450 class RemoteWorkerSelectionPolicy(ABC):
451 def register_worker_pool(self, workers):
452 self.workers = workers
455 def is_worker_available(self) -> bool:
461 machine_to_avoid = None
462 ) -> Optional[RemoteWorkerRecord]:
466 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
468 def is_worker_available(self) -> bool:
469 for worker in self.workers:
477 machine_to_avoid = None
478 ) -> Optional[RemoteWorkerRecord]:
480 for worker in self.workers:
481 for x in range(0, worker.count):
482 for y in range(0, worker.weight):
483 grabbag.append(worker)
485 for _ in range(0, 5):
486 random.shuffle(grabbag)
488 if worker.machine != machine_to_avoid or _ > 2:
491 logger.debug(f'Selected worker {worker}')
493 logger.warning("Couldn't find a worker; go fish.")
497 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
498 def __init__(self) -> None:
502 def is_worker_available(self) -> bool:
503 for worker in self.workers:
511 machine_to_avoid: str = None
512 ) -> Optional[RemoteWorkerRecord]:
515 worker = self.workers[x]
519 if x >= len(self.workers):
522 logger.debug(f'Selected worker {worker}')
525 if x >= len(self.workers):
528 logger.warning("Couldn't find a worker; go fish.")
532 class RemoteExecutor(BaseExecutor):
534 workers: List[RemoteWorkerRecord],
535 policy: RemoteWorkerSelectionPolicy) -> None:
537 self.workers = workers
539 self.worker_count = 0
540 for worker in self.workers:
541 self.worker_count += worker.count
542 if self.worker_count <= 0:
543 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
545 raise RemoteExecutorException(msg)
546 self.policy.register_worker_pool(self.workers)
547 self.cv = threading.Condition()
548 logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
549 self._helper_executor = fut.ThreadPoolExecutor(
550 thread_name_prefix="remote_executor_helper",
551 max_workers=self.worker_count,
553 self.status = RemoteExecutorStatus(self.worker_count)
554 self.total_bundles_submitted = 0
555 self.backup_lock = threading.Lock()
556 self.last_backup = None
558 def is_worker_available(self) -> bool:
559 return self.policy.is_worker_available()
563 machine_to_avoid: str = None
564 ) -> Optional[RemoteWorkerRecord]:
565 return self.policy.acquire_worker(machine_to_avoid)
567 def find_available_worker_or_block(
569 machine_to_avoid: str = None
570 ) -> RemoteWorkerRecord:
572 while not self.is_worker_available():
574 worker = self.acquire_worker(machine_to_avoid)
575 if worker is not None:
577 msg = "We should never reach this point in the code"
581 def release_worker(self, worker: RemoteWorkerRecord) -> None:
582 logger.debug(f'Released worker {worker}')
587 def heartbeat(self) -> None:
588 with self.status.lock:
589 # Regular progress report
590 self.status.periodic_dump(self.total_bundles_submitted)
592 # Look for bundles to reschedule.
593 num_done = len(self.status.finished_bundle_timings)
594 num_idle_workers = self.worker_count - self.task_count
597 config.config['executors_schedule_remote_backups']
599 and num_idle_workers > 1
600 and (self.last_backup is None or (now - self.last_backup > 1.0))
601 and self.backup_lock.acquire(blocking=False)
604 assert self.backup_lock.locked()
606 bundle_to_backup = None
608 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
609 # Prefer to schedule backups of bundles on slower machines.
611 for record in self.workers:
612 if worker.machine == record.machine:
613 base_score = float(record.weight)
614 base_score = 1.0 / base_score
616 base_score = int(base_score)
619 for uuid in bundle_uuids:
620 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
623 and bundle.src_bundle is None
624 and bundle.backup_bundles is not None
628 # Schedule backups of bundles running longer; especially those
629 # that are unexpectedly slow.
630 start_ts = self.status.start_per_bundle[uuid]
631 if start_ts is not None:
632 runtime = now - start_ts
634 logger.debug(f'score[{bundle}] => {score} # latency boost')
636 if bundle.slower_than_local_p95:
638 logger.debug(f'score[{bundle}] => {score} # >worker p95')
640 if bundle.slower_than_global_p95:
642 logger.debug(f'score[{bundle}] => {score} # >global p95')
644 # Prefer backups of bundles that don't have backups already.
645 backup_count = len(bundle.backup_bundles)
646 if backup_count == 0:
648 elif backup_count == 1:
650 elif backup_count == 2:
654 logger.debug(f'score[{bundle}] => {score} # {backup_count} dup backup factor')
658 and (best_score is None or score > best_score)
660 bundle_to_backup = bundle
661 assert bundle is not None
662 assert bundle.backup_bundles is not None
663 assert bundle.src_bundle is None
666 if bundle_to_backup is not None:
667 self.last_backup = now
668 logger.info(f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <=====')
669 self.schedule_backup_for_bundle(bundle_to_backup)
671 self.backup_lock.release()
673 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
674 with self.status.lock:
675 if bundle.is_cancelled.wait(timeout=0.0):
676 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
677 bundle.was_cancelled = True
681 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
682 """Find a worker for bundle or block until one is available."""
683 self.adjust_task_count(+1)
685 hostname = bundle.hostname
686 avoid_machine = override_avoid_machine
687 is_original = bundle.src_bundle is None
689 # Try not to schedule a backup on the same host as the original.
690 if avoid_machine is None and bundle.src_bundle is not None:
691 avoid_machine = bundle.src_bundle.machine
693 while worker is None:
694 worker = self.find_available_worker_or_block(avoid_machine)
696 # Ok, found a worker.
697 bundle.worker = worker
698 machine = bundle.machine = worker.machine
699 username = bundle.username = worker.username
701 self.status.record_acquire_worker(worker, uuid)
702 logger.debug(f'{bundle}: Running bundle on {worker}...')
704 # Before we do any work, make sure the bundle is still viable.
705 if self.check_if_cancelled(bundle):
707 return self.post_launch_work(bundle)
708 except Exception as e:
711 f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
713 assert bundle.worker is not None
714 self.status.record_release_worker(
719 self.release_worker(bundle.worker)
720 self.adjust_task_count(-1)
722 # Weird. We are the original owner of this
723 # bundle. For it to have been cancelled, a backup
724 # must have already started and completed before
725 # we even for started. Moreover, the backup says
726 # it is done but we can't find the results it
727 # should have copied over. Reschedule the whole
729 return self.emergency_retry_nasty_bundle(bundle)
731 # Expected(?). We're a backup and our bundle is
732 # cancelled before we even got started. Something
733 # went bad in post_launch_work (I acutually don't
734 # see what?) but probably not worth worrying
738 # Send input code / data to worker machine if it's not local.
739 if hostname not in machine:
741 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
742 start_ts = time.time()
743 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
745 xfer_latency = time.time() - start_ts
746 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
747 except Exception as e:
750 f'{bundle}: failed to send instructions to worker machine?!?'
752 assert bundle.worker is not None
753 self.status.record_release_worker(
758 self.release_worker(bundle.worker)
759 self.adjust_task_count(-1)
761 # Weird. We tried to copy the code to the worker and it failed...
762 # And we're the original bundle. We have to retry.
763 return self.emergency_retry_nasty_bundle(bundle)
765 # This is actually expected; we're a backup.
766 # There's a race condition where someone else
767 # already finished the work and removed the source
768 # code file before we could copy it. No biggie.
771 # Kick off the work. Note that if this fails we let
772 # wait_for_process deal with it.
773 self.status.record_processing_began(uuid)
774 cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
775 f'"source py39-venv/bin/activate &&'
776 f' /home/scott/lib/python_modules/remote_worker.py'
777 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
778 logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
779 p = cmd_in_background(cmd, silent=True)
780 bundle.pid = pid = p.pid
781 logger.debug(f'{bundle}: Local ssh process pid={pid}; remote worker is {machine}.')
782 return self.wait_for_process(p, bundle, 0)
784 def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
785 machine = bundle.machine
789 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
792 self.status.record_release_worker(
797 self.release_worker(bundle.worker)
798 self.adjust_task_count(-1)
799 return self.emergency_retry_nasty_bundle(bundle)
801 # Spin until either the ssh job we scheduled finishes the
802 # bundle or some backup worker signals that they finished it
807 except subprocess.TimeoutExpired:
809 if self.check_if_cancelled(bundle):
811 f'{bundle}: another worker finished bundle, checking it out...'
816 f"{bundle}: pid {pid} ({machine}) our ssh finished, checking it out..."
821 # If we get here we believe the bundle is done; either the ssh
822 # subprocess finished (hopefully successfully) or we noticed
823 # that some other worker seems to have completed the bundle
824 # and we're bailing out.
826 ret = self.post_launch_work(bundle)
827 if ret is not None and p is not None:
831 # Something went wrong; e.g. we could not copy the results
832 # back, cleanup after ourselves on the remote machine, or
833 # unpickle the results we got from the remove machine. If we
834 # still have an active ssh subprocess, keep waiting on it.
835 # Otherwise, time for an emergency reschedule.
836 except Exception as e:
838 logger.error(f'{bundle}: Something unexpected just happened...')
841 f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
843 return self.wait_for_process(p, bundle, depth + 1)
845 self.status.record_release_worker(
850 self.release_worker(bundle.worker)
851 self.adjust_task_count(-1)
852 return self.emergency_retry_nasty_bundle(bundle)
854 def post_launch_work(self, bundle: BundleDetails) -> Any:
855 with self.status.lock:
856 is_original = bundle.src_bundle is None
857 was_cancelled = bundle.was_cancelled
858 username = bundle.username
859 machine = bundle.machine
860 result_file = bundle.result_file
861 code_file = bundle.code_file
863 # Whether original or backup, if we finished first we must
864 # fetch the results if the computation happened on a
866 bundle.end_ts = time.time()
867 if not was_cancelled:
868 assert bundle.machine is not None
869 if bundle.hostname not in bundle.machine:
870 cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
872 f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
875 # If either of these throw they are handled in
878 run_silently(f'{SSH} {username}@{machine}'
879 f' "/bin/rm -f {code_file} {result_file}"')
880 dur = bundle.end_ts - bundle.start_ts
881 self.histogram.add_item(dur)
883 # Only the original worker should unpickle the file contents
884 # though since it's the only one whose result matters. The
885 # original is also the only job that may delete result_file
886 # from disk. Note that the original may have been cancelled
887 # if one of the backups finished first; it still must read the
890 logger.debug(f"{bundle}: Unpickling {result_file}.")
892 with open(f'{result_file}', 'rb') as rb:
893 serialized = rb.read()
894 result = cloudpickle.loads(serialized)
895 except Exception as e:
896 msg = f'Failed to load {result_file}, this is bad news.'
898 self.status.record_release_worker(
903 self.release_worker(bundle.worker)
905 # Re-raise the exception; the code in wait_for_process may
906 # decide to emergency_retry_nasty_bundle here.
910 f'Removing local (master) {code_file} and {result_file}.'
912 os.remove(f'{result_file}')
913 os.remove(f'{code_file}')
915 # Notify any backups that the original is done so they
916 # should stop ASAP. Do this whether or not we
917 # finished first since there could be more than one
919 if bundle.backup_bundles is not None:
920 for backup in bundle.backup_bundles:
922 f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
924 backup.is_cancelled.set()
926 # This is a backup job and, by now, we have already fetched
927 # the bundle results.
929 # Backup results don't matter, they just need to leave the
930 # result file in the right place for their originals to
931 # read/unpickle later.
934 # Tell the original to stop if we finished first.
935 if not was_cancelled:
937 f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
939 bundle.src_bundle.is_cancelled.set()
941 assert bundle.worker is not None
942 self.status.record_release_worker(
947 self.release_worker(bundle.worker)
948 self.adjust_task_count(-1)
951 def create_original_bundle(self, pickle, fname: str):
952 from string_utils import generate_uuid
953 uuid = generate_uuid(as_hex=True)
954 code_file = f'/tmp/{uuid}.code.bin'
955 result_file = f'/tmp/{uuid}.result.bin'
957 logger.debug(f'Writing pickled code to {code_file}')
958 with open(f'{code_file}', 'wb') as wb:
961 bundle = BundleDetails(
962 pickled_code = pickle,
968 hostname = platform.node(),
969 code_file = code_file,
970 result_file = result_file,
972 start_ts = time.time(),
974 slower_than_local_p95 = False,
975 slower_than_global_p95 = False,
977 is_cancelled = threading.Event(),
978 was_cancelled = False,
982 self.status.record_bundle_details(bundle)
983 logger.debug(f'{bundle}: Created an original bundle')
986 def create_backup_bundle(self, src_bundle: BundleDetails):
987 assert src_bundle.backup_bundles is not None
988 n = len(src_bundle.backup_bundles)
989 uuid = src_bundle.uuid + f'_backup#{n}'
991 backup_bundle = BundleDetails(
992 pickled_code = src_bundle.pickled_code,
994 fname = src_bundle.fname,
998 hostname = src_bundle.hostname,
999 code_file = src_bundle.code_file,
1000 result_file = src_bundle.result_file,
1002 start_ts = time.time(),
1004 slower_than_local_p95 = False,
1005 slower_than_global_p95 = False,
1006 src_bundle = src_bundle,
1007 is_cancelled = threading.Event(),
1008 was_cancelled = False,
1009 backup_bundles = None, # backup backups not allowed
1012 src_bundle.backup_bundles.append(backup_bundle)
1013 self.status.record_bundle_details_already_locked(backup_bundle)
1014 logger.debug(f'{backup_bundle}: Created a backup bundle')
1015 return backup_bundle
1017 def schedule_backup_for_bundle(self,
1018 src_bundle: BundleDetails):
1019 assert self.status.lock.locked()
1020 assert src_bundle is not None
1021 backup_bundle = self.create_backup_bundle(src_bundle)
1023 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1025 self._helper_executor.submit(self.launch, backup_bundle)
1027 # Results from backups don't matter; if they finish first
1028 # they will move the result_file to this machine and let
1029 # the original pick them up and unpickle them.
1031 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1032 is_original = bundle.src_bundle is None
1033 bundle.worker = None
1034 avoid_last_machine = bundle.machine
1035 bundle.machine = None
1036 bundle.username = None
1037 bundle.failure_count += 1
1043 if bundle.failure_count > retry_limit:
1045 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1048 raise RemoteExecutorException(
1049 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1052 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
1056 f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1058 return self.launch(bundle, avoid_last_machine)
1064 **kwargs) -> fut.Future:
1065 pickle = make_cloud_pickle(function, *args, **kwargs)
1066 bundle = self.create_original_bundle(pickle, function.__name__)
1067 self.total_bundles_submitted += 1
1068 return self._helper_executor.submit(self.launch, bundle)
1071 def shutdown(self, wait=True) -> None:
1072 self._helper_executor.shutdown(wait)
1073 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1074 print(self.histogram)
1078 class DefaultExecutors(object):
1080 self.thread_executor: Optional[ThreadExecutor] = None
1081 self.process_executor: Optional[ProcessExecutor] = None
1082 self.remote_executor: Optional[RemoteExecutor] = None
1084 def ping(self, host) -> bool:
1085 logger.debug(f'RUN> ping -c 1 {host}')
1087 x = cmd_with_timeout(
1088 f'ping -c 1 {host} >/dev/null 2>/dev/null',
1095 def thread_pool(self) -> ThreadExecutor:
1096 if self.thread_executor is None:
1097 self.thread_executor = ThreadExecutor()
1098 return self.thread_executor
1100 def process_pool(self) -> ProcessExecutor:
1101 if self.process_executor is None:
1102 self.process_executor = ProcessExecutor()
1103 return self.process_executor
1105 def remote_pool(self) -> RemoteExecutor:
1106 if self.remote_executor is None:
1107 logger.info('Looking for some helper machines...')
1108 pool: List[RemoteWorkerRecord] = []
1109 if self.ping('cheetah.house'):
1110 logger.info('Found cheetah.house')
1114 machine = 'cheetah.house',
1119 if self.ping('video.house'):
1120 logger.info('Found video.house')
1124 machine = 'video.house',
1129 if self.ping('gorilla.house'):
1130 logger.info('Found gorilla.house')
1134 machine = 'gorilla.house',
1139 if self.ping('meerkat.cabin'):
1140 logger.info('Found meerkat.cabin')
1144 machine = 'meerkat.cabin',
1149 if self.ping('kiosk.house'):
1150 logger.info('Found kiosk.house')
1154 machine = 'kiosk.house',
1159 if self.ping('hero.house'):
1160 logger.info('Found hero.house')
1164 machine = 'hero.house',
1169 if self.ping('puma.cabin'):
1170 logger.info('Found puma.cabin')
1174 machine = 'puma.cabin',
1179 if self.ping('puma.house'):
1180 logger.info('Found puma.house')
1184 machine = 'puma.house',
1190 # The controller machine has a lot to do; go easy on it.
1192 if record.machine == platform.node() and record.count > 1:
1193 logger.info(f'Reducing workload for {record.machine}.')
1196 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1197 policy.register_worker_pool(pool)
1198 self.remote_executor = RemoteExecutor(pool, policy)
1199 return self.remote_executor