3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
20 import cloudpickle # type: ignore
21 from overrides import overrides
23 from ansi import bg, fg, underline, reset
26 from decorator_utils import singleton
27 from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
28 import histogram as hist
29 from thread_utils import background_thread
32 logger = logging.getLogger(__name__)
34 parser = config.add_commandline_args(
35 f"Executors ({__file__})", "Args related to processing executors."
38 '--executors_threadpool_size',
41 help='Number of threads in the default threadpool, leave unset for default',
45 '--executors_processpool_size',
48 help='Number of processes in the default processpool, leave unset for default',
52 '--executors_schedule_remote_backups',
54 action=argparse_utils.ActionNoYes,
55 help='Should we schedule duplicative backup work if a remote bundle is slow',
58 '--executors_max_bundle_failures',
62 help='Maximum number of failures before giving up on a bundle',
65 SSH = '/usr/bin/ssh -oForwardX11=no'
66 SCP = '/usr/bin/scp -C'
69 def make_cloud_pickle(fun, *args, **kwargs):
70 logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
71 return cloudpickle.dumps((fun, args, kwargs))
74 class BaseExecutor(ABC):
75 def __init__(self, *, title=''):
78 self.histogram = hist.SimpleHistogram(
79 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
83 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
87 def shutdown(self, wait: bool = True) -> None:
90 def adjust_task_count(self, delta: int) -> None:
91 self.task_count += delta
92 logger.debug(f'Executor current task count is {self.task_count}')
95 class ThreadExecutor(BaseExecutor):
96 def __init__(self, max_workers: Optional[int] = None):
99 if max_workers is not None:
100 workers = max_workers
101 elif 'executors_threadpool_size' in config.config:
102 workers = config.config['executors_threadpool_size']
103 logger.debug(f'Creating threadpool executor with {workers} workers')
104 self._thread_pool_executor = fut.ThreadPoolExecutor(
105 max_workers=workers, thread_name_prefix="thread_executor_helper"
108 def run_local_bundle(self, fun, *args, **kwargs):
109 logger.debug(f"Running local bundle at {fun.__name__}")
111 result = fun(*args, **kwargs)
113 self.adjust_task_count(-1)
114 duration = end - start
115 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
116 self.histogram.add_item(duration)
120 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
121 self.adjust_task_count(+1)
123 newargs.append(function)
126 return self._thread_pool_executor.submit(
127 self.run_local_bundle, *newargs, **kwargs
131 def shutdown(self, wait=True) -> None:
132 logger.debug(f'Shutting down threadpool executor {self.title}')
133 print(self.histogram)
134 self._thread_pool_executor.shutdown(wait)
137 class ProcessExecutor(BaseExecutor):
138 def __init__(self, max_workers=None):
141 if max_workers is not None:
142 workers = max_workers
143 elif 'executors_processpool_size' in config.config:
144 workers = config.config['executors_processpool_size']
145 logger.debug(f'Creating processpool executor with {workers} workers.')
146 self._process_executor = fut.ProcessPoolExecutor(
150 def run_cloud_pickle(self, pickle):
151 fun, args, kwargs = cloudpickle.loads(pickle)
152 logger.debug(f"Running pickled bundle at {fun.__name__}")
153 result = fun(*args, **kwargs)
154 self.adjust_task_count(-1)
158 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
160 self.adjust_task_count(+1)
161 pickle = make_cloud_pickle(function, *args, **kwargs)
162 result = self._process_executor.submit(self.run_cloud_pickle, pickle)
163 result.add_done_callback(
164 lambda _: self.histogram.add_item(time.time() - start)
169 def shutdown(self, wait=True) -> None:
170 logger.debug(f'Shutting down processpool executor {self.title}')
171 self._process_executor.shutdown(wait)
172 print(self.histogram)
174 def __getstate__(self):
175 state = self.__dict__.copy()
176 state['_process_executor'] = None
180 class RemoteExecutorException(Exception):
181 """Thrown when a bundle cannot be executed despite several retries."""
187 class RemoteWorkerRecord:
194 return hash((self.username, self.machine))
197 return f'{self.username}@{self.machine}'
205 worker: Optional[RemoteWorkerRecord]
206 username: Optional[str]
207 machine: Optional[str]
214 slower_than_local_p95: bool
215 slower_than_global_p95: bool
216 src_bundle: BundleDetails
217 is_cancelled: threading.Event
219 backup_bundles: Optional[List[BundleDetails]]
224 if uuid[-9:-2] == '_backup':
226 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
236 fg('marigold yellow'),
239 fg('cornflower blue'),
240 fg('turquoise blue'),
242 fg('lavender purple'),
245 c = colorz[int(uuid[-2:], 16) % len(colorz)]
246 fname = self.fname if self.fname is not None else 'nofname'
247 machine = self.machine if self.machine is not None else 'nomachine'
248 return f'{c}{suffix}/{fname}/{machine}{reset()}'
251 class RemoteExecutorStatus:
252 def __init__(self, total_worker_count: int) -> None:
253 self.worker_count: int = total_worker_count
254 self.known_workers: Set[RemoteWorkerRecord] = set()
255 self.start_time: float = time.time()
256 self.start_per_bundle: Dict[str, float] = defaultdict(float)
257 self.end_per_bundle: Dict[str, float] = defaultdict(float)
258 self.finished_bundle_timings_per_worker: Dict[
259 RemoteWorkerRecord, List[float]
261 self.in_flight_bundles_by_worker: Dict[
262 RemoteWorkerRecord, Set[str]
264 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
265 self.finished_bundle_timings: List[float] = []
266 self.last_periodic_dump: Optional[float] = None
267 self.total_bundles_submitted: int = 0
269 # Protects reads and modification using self. Also used
270 # as a memory fence for modifications to bundle.
271 self.lock: threading.Lock = threading.Lock()
273 def record_acquire_worker(
274 self, worker: RemoteWorkerRecord, uuid: str
277 self.record_acquire_worker_already_locked(worker, uuid)
279 def record_acquire_worker_already_locked(
280 self, worker: RemoteWorkerRecord, uuid: str
282 assert self.lock.locked()
283 self.known_workers.add(worker)
284 self.start_per_bundle[uuid] = None
285 x = self.in_flight_bundles_by_worker.get(worker, set())
287 self.in_flight_bundles_by_worker[worker] = x
289 def record_bundle_details(self, details: BundleDetails) -> None:
291 self.record_bundle_details_already_locked(details)
293 def record_bundle_details_already_locked(
294 self, details: BundleDetails
296 assert self.lock.locked()
297 self.bundle_details_by_uuid[details.uuid] = details
299 def record_release_worker(
301 worker: RemoteWorkerRecord,
306 self.record_release_worker_already_locked(
307 worker, uuid, was_cancelled
310 def record_release_worker_already_locked(
312 worker: RemoteWorkerRecord,
316 assert self.lock.locked()
318 self.end_per_bundle[uuid] = ts
319 self.in_flight_bundles_by_worker[worker].remove(uuid)
320 if not was_cancelled:
321 bundle_latency = ts - self.start_per_bundle[uuid]
322 x = self.finished_bundle_timings_per_worker.get(worker, list())
323 x.append(bundle_latency)
324 self.finished_bundle_timings_per_worker[worker] = x
325 self.finished_bundle_timings.append(bundle_latency)
327 def record_processing_began(self, uuid: str):
329 self.start_per_bundle[uuid] = time.time()
331 def total_in_flight(self) -> int:
332 assert self.lock.locked()
334 for worker in self.known_workers:
335 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
336 return total_in_flight
338 def total_idle(self) -> int:
339 assert self.lock.locked()
340 return self.worker_count - self.total_in_flight()
343 assert self.lock.locked()
345 total_finished = len(self.finished_bundle_timings)
346 total_in_flight = self.total_in_flight()
347 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
349 if len(self.finished_bundle_timings) > 1:
350 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
352 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
353 f'✅={total_finished}/{self.total_bundles_submitted}, '
354 f'💻n={total_in_flight}/{self.worker_count}\n'
358 f'⏱={ts-self.start_time:.1f}s, '
359 f'✅={total_finished}/{self.total_bundles_submitted}, '
360 f'💻n={total_in_flight}/{self.worker_count}\n'
363 for worker in self.known_workers:
364 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
365 timings = self.finished_bundle_timings_per_worker.get(worker, [])
369 qworker = numpy.quantile(timings, [0.5, 0.95])
370 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
374 ret += f' ...finished {count} total bundle(s) so far\n'
375 in_flight = len(self.in_flight_bundles_by_worker[worker])
377 ret += f' ...{in_flight} bundles currently in flight:\n'
378 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
379 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
382 if (details and details.pid != 0)
385 if self.start_per_bundle[bundle_uuid] is not None:
386 sec = ts - self.start_per_bundle[bundle_uuid]
387 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
389 ret += f' {details} setting up / copying data...'
392 if qworker is not None:
394 ret += f'{bg("red")}>💻p95{reset()} '
395 if details is not None:
396 details.slower_than_local_p95 = True
398 if details is not None:
399 details.slower_than_local_p95 = False
403 ret += f'{bg("red")}>∀p95{reset()} '
404 if details is not None:
405 details.slower_than_global_p95 = True
407 details.slower_than_global_p95 = False
411 def periodic_dump(self, total_bundles_submitted: int) -> None:
412 assert self.lock.locked()
413 self.total_bundles_submitted = total_bundles_submitted
416 self.last_periodic_dump is None
417 or ts - self.last_periodic_dump > 5.0
420 self.last_periodic_dump = ts
423 class RemoteWorkerSelectionPolicy(ABC):
424 def register_worker_pool(self, workers):
425 self.workers = workers
428 def is_worker_available(self) -> bool:
433 self, machine_to_avoid=None
434 ) -> Optional[RemoteWorkerRecord]:
438 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
440 def is_worker_available(self) -> bool:
441 for worker in self.workers:
448 self, machine_to_avoid=None
449 ) -> Optional[RemoteWorkerRecord]:
451 for worker in self.workers:
452 for x in range(0, worker.count):
453 for y in range(0, worker.weight):
454 grabbag.append(worker)
456 for _ in range(0, 5):
457 random.shuffle(grabbag)
459 if worker.machine != machine_to_avoid or _ > 2:
462 logger.debug(f'Selected worker {worker}')
464 msg = 'Unexpectedly could not find a worker, retrying...'
469 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
470 def __init__(self) -> None:
474 def is_worker_available(self) -> bool:
475 for worker in self.workers:
482 self, machine_to_avoid: str = None
483 ) -> Optional[RemoteWorkerRecord]:
486 worker = self.workers[x]
490 if x >= len(self.workers):
493 logger.debug(f'Selected worker {worker}')
496 if x >= len(self.workers):
499 msg = 'Unexpectedly could not find a worker, retrying...'
504 class RemoteExecutor(BaseExecutor):
507 workers: List[RemoteWorkerRecord],
508 policy: RemoteWorkerSelectionPolicy,
511 self.workers = workers
513 self.worker_count = 0
514 for worker in self.workers:
515 self.worker_count += worker.count
516 if self.worker_count <= 0:
517 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
519 raise RemoteExecutorException(msg)
520 self.policy.register_worker_pool(self.workers)
521 self.cv = threading.Condition()
523 f'Creating {self.worker_count} local threads, one per remote worker.'
525 self._helper_executor = fut.ThreadPoolExecutor(
526 thread_name_prefix="remote_executor_helper",
527 max_workers=self.worker_count,
529 self.status = RemoteExecutorStatus(self.worker_count)
530 self.total_bundles_submitted = 0
531 self.backup_lock = threading.Lock()
532 self.last_backup = None
534 self.heartbeat_thread,
535 self.heartbeat_stop_event,
536 ) = self.run_periodic_heartbeat()
539 def run_periodic_heartbeat(self, stop_event) -> None:
540 while not stop_event.is_set():
542 logger.debug('Running periodic heartbeat code...')
544 logger.debug('Periodic heartbeat thread shutting down.')
546 def heartbeat(self) -> None:
547 with self.status.lock:
548 # Dump regular progress report
549 self.status.periodic_dump(self.total_bundles_submitted)
551 # Look for bundles to reschedule via executor.submit
552 if config.config['executors_schedule_remote_backups']:
553 self.maybe_schedule_backup_bundles()
555 def maybe_schedule_backup_bundles(self):
556 assert self.status.lock.locked()
557 num_done = len(self.status.finished_bundle_timings)
558 num_idle_workers = self.worker_count - self.task_count
562 and num_idle_workers > 1
563 and (self.last_backup is None or (now - self.last_backup > 6.0))
564 and self.backup_lock.acquire(blocking=False)
567 assert self.backup_lock.locked()
569 bundle_to_backup = None
574 ) in self.status.in_flight_bundles_by_worker.items():
576 # Prefer to schedule backups of bundles running on
579 for record in self.workers:
580 if worker.machine == record.machine:
581 base_score = float(record.weight)
582 base_score = 1.0 / base_score
584 base_score = int(base_score)
587 for uuid in bundle_uuids:
588 bundle = self.status.bundle_details_by_uuid.get(
593 and bundle.src_bundle is None
594 and bundle.backup_bundles is not None
598 # Schedule backups of bundles running
599 # longer; especially those that are
601 start_ts = self.status.start_per_bundle[uuid]
602 if start_ts is not None:
603 runtime = now - start_ts
606 f'score[{bundle}] => {score} # latency boost'
609 if bundle.slower_than_local_p95:
612 f'score[{bundle}] => {score} # >worker p95'
615 if bundle.slower_than_global_p95:
618 f'score[{bundle}] => {score} # >global p95'
621 # Prefer backups of bundles that don't
622 # have backups already.
623 backup_count = len(bundle.backup_bundles)
624 if backup_count == 0:
626 elif backup_count == 1:
628 elif backup_count == 2:
633 f'score[{bundle}] => {score} # {backup_count} dup backup factor'
637 best_score is None or score > best_score
639 bundle_to_backup = bundle
640 assert bundle is not None
641 assert bundle.backup_bundles is not None
642 assert bundle.src_bundle is None
645 # Note: this is all still happening on the heartbeat
646 # runner thread. That's ok because
647 # schedule_backup_for_bundle uses the executor to
648 # submit the bundle again which will cause it to be
649 # picked up by a worker thread and allow this thread
650 # to return to run future heartbeats.
651 if bundle_to_backup is not None:
652 self.last_backup = now
654 f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
656 self.schedule_backup_for_bundle(bundle_to_backup)
658 self.backup_lock.release()
660 def is_worker_available(self) -> bool:
661 return self.policy.is_worker_available()
664 self, machine_to_avoid: str = None
665 ) -> Optional[RemoteWorkerRecord]:
666 return self.policy.acquire_worker(machine_to_avoid)
668 def find_available_worker_or_block(
669 self, machine_to_avoid: str = None
670 ) -> RemoteWorkerRecord:
672 while not self.is_worker_available():
674 worker = self.acquire_worker(machine_to_avoid)
675 if worker is not None:
677 msg = "We should never reach this point in the code"
682 self, bundle: BundleDetails, *, was_cancelled=True
684 worker = bundle.worker
685 assert worker is not None
686 logger.debug(f'Released worker {worker}')
687 self.status.record_release_worker(
695 self.adjust_task_count(-1)
697 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
698 with self.status.lock:
699 if bundle.is_cancelled.wait(timeout=0.0):
700 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
701 bundle.was_cancelled = True
705 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
706 """Find a worker for bundle or block until one is available."""
707 self.adjust_task_count(+1)
709 hostname = bundle.hostname
710 avoid_machine = override_avoid_machine
711 is_original = bundle.src_bundle is None
713 # Try not to schedule a backup on the same host as the original.
714 if avoid_machine is None and bundle.src_bundle is not None:
715 avoid_machine = bundle.src_bundle.machine
717 while worker is None:
718 worker = self.find_available_worker_or_block(avoid_machine)
721 # Ok, found a worker.
722 bundle.worker = worker
723 machine = bundle.machine = worker.machine
724 username = bundle.username = worker.username
725 self.status.record_acquire_worker(worker, uuid)
726 logger.debug(f'{bundle}: Running bundle on {worker}...')
728 # Before we do any work, make sure the bundle is still viable.
729 # It may have been some time between when it was submitted and
730 # now due to lack of worker availability and someone else may
731 # have already finished it.
732 if self.check_if_cancelled(bundle):
734 return self.process_work_result(bundle)
735 except Exception as e:
737 f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
739 self.release_worker(bundle)
741 # Weird. We are the original owner of this
742 # bundle. For it to have been cancelled, a backup
743 # must have already started and completed before
744 # we even for started. Moreover, the backup says
745 # it is done but we can't find the results it
746 # should have copied over. Reschedule the whole
750 f'{bundle}: We are the original owner thread and yet there are '
751 + 'no results for this bundle. This is unexpected and bad.'
753 return self.emergency_retry_nasty_bundle(bundle)
755 # Expected(?). We're a backup and our bundle is
756 # cancelled before we even got started. Something
757 # went bad in process_work_result (I acutually don't
758 # see what?) but probably not worth worrying
759 # about. Let the original thread worry about
760 # either finding the results or complaining about
764 # Send input code / data to worker machine if it's not local.
765 if hostname not in machine:
767 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
768 start_ts = time.time()
769 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
771 xfer_latency = time.time() - start_ts
773 f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s."
775 except Exception as e:
776 self.release_worker(bundle)
778 # Weird. We tried to copy the code to the worker and it failed...
779 # And we're the original bundle. We have to retry.
782 f"{bundle}: Failed to send instructions to the worker machine?! "
783 + "This is not expected; we\'re the original bundle so this shouldn\'t "
784 + "be a race condition. Attempting an emergency retry..."
786 return self.emergency_retry_nasty_bundle(bundle)
788 # This is actually expected; we're a backup.
789 # There's a race condition where someone else
790 # already finished the work and removed the source
791 # code file before we could copy it. No biggie.
792 msg = f'{bundle}: Failed to send instructions to the worker machine... '
793 msg += 'We\'re a backup and this may be caused by the original (or some '
794 msg += 'other backup) already finishing this work. Ignoring this.'
798 # Kick off the work. Note that if this fails we let
799 # wait_for_process deal with it.
800 self.status.record_processing_began(uuid)
802 f'{SSH} {bundle.username}@{bundle.machine} '
803 f'"source py38-venv/bin/activate &&'
804 f' /home/scott/lib/python_modules/remote_worker.py'
805 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
808 f'{bundle}: Executing {cmd} in the background to kick off work...'
810 p = cmd_in_background(cmd, silent=True)
813 f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
815 return self.wait_for_process(p, bundle, 0)
817 def wait_for_process(
818 self, p: subprocess.Popen, bundle: BundleDetails, depth: int
820 machine = bundle.machine
824 f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
827 self.release_worker(bundle)
828 return self.emergency_retry_nasty_bundle(bundle)
830 # Spin until either the ssh job we scheduled finishes the
831 # bundle or some backup worker signals that they finished it
836 except subprocess.TimeoutExpired:
837 if self.check_if_cancelled(bundle):
839 f'{bundle}: looks like another worker finished bundle...'
843 logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
847 # If we get here we believe the bundle is done; either the ssh
848 # subprocess finished (hopefully successfully) or we noticed
849 # that some other worker seems to have completed the bundle
850 # and we're bailing out.
852 ret = self.process_work_result(bundle)
853 if ret is not None and p is not None:
857 # Something went wrong; e.g. we could not copy the results
858 # back, cleanup after ourselves on the remote machine, or
859 # unpickle the results we got from the remove machine. If we
860 # still have an active ssh subprocess, keep waiting on it.
861 # Otherwise, time for an emergency reschedule.
862 except Exception as e:
864 logger.error(f'{bundle}: Something unexpected just happened...')
866 msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
868 return self.wait_for_process(p, bundle, depth + 1)
870 self.release_worker(bundle)
871 return self.emergency_retry_nasty_bundle(bundle)
873 def process_work_result(self, bundle: BundleDetails) -> Any:
874 with self.status.lock:
875 is_original = bundle.src_bundle is None
876 was_cancelled = bundle.was_cancelled
877 username = bundle.username
878 machine = bundle.machine
879 result_file = bundle.result_file
880 code_file = bundle.code_file
882 # Whether original or backup, if we finished first we must
883 # fetch the results if the computation happened on a
885 bundle.end_ts = time.time()
886 if not was_cancelled:
887 assert bundle.machine is not None
888 if bundle.hostname not in bundle.machine:
889 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
891 f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
894 # If either of these throw they are handled in
900 except Exception as e:
908 f'{SSH} {username}@{machine}'
909 f' "/bin/rm -f {code_file} {result_file}"'
912 f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
914 dur = bundle.end_ts - bundle.start_ts
915 self.histogram.add_item(dur)
917 # Only the original worker should unpickle the file contents
918 # though since it's the only one whose result matters. The
919 # original is also the only job that may delete result_file
920 # from disk. Note that the original may have been cancelled
921 # if one of the backups finished first; it still must read the
924 logger.debug(f"{bundle}: Unpickling {result_file}.")
926 with open(result_file, 'rb') as rb:
927 serialized = rb.read()
928 result = cloudpickle.loads(serialized)
929 except Exception as e:
931 msg = f'Failed to load {result_file}... this is bad news.'
933 self.release_worker(bundle)
935 # Re-raise the exception; the code in wait_for_process may
936 # decide to emergency_retry_nasty_bundle here.
939 f'Removing local (master) {code_file} and {result_file}.'
941 os.remove(f'{result_file}')
942 os.remove(f'{code_file}')
944 # Notify any backups that the original is done so they
945 # should stop ASAP. Do this whether or not we
946 # finished first since there could be more than one
948 if bundle.backup_bundles is not None:
949 for backup in bundle.backup_bundles:
951 f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
953 backup.is_cancelled.set()
955 # This is a backup job and, by now, we have already fetched
956 # the bundle results.
958 # Backup results don't matter, they just need to leave the
959 # result file in the right place for their originals to
960 # read/unpickle later.
963 # Tell the original to stop if we finished first.
964 if not was_cancelled:
966 f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
968 bundle.src_bundle.is_cancelled.set()
969 self.release_worker(bundle, was_cancelled=was_cancelled)
972 def create_original_bundle(self, pickle, fname: str):
973 from string_utils import generate_uuid
975 uuid = generate_uuid(omit_dashes=True)
976 code_file = f'/tmp/{uuid}.code.bin'
977 result_file = f'/tmp/{uuid}.result.bin'
979 logger.debug(f'Writing pickled code to {code_file}')
980 with open(f'{code_file}', 'wb') as wb:
983 bundle = BundleDetails(
990 hostname=platform.node(),
992 result_file=result_file,
994 start_ts=time.time(),
996 slower_than_local_p95=False,
997 slower_than_global_p95=False,
999 is_cancelled=threading.Event(),
1000 was_cancelled=False,
1004 self.status.record_bundle_details(bundle)
1005 logger.debug(f'{bundle}: Created an original bundle')
1008 def create_backup_bundle(self, src_bundle: BundleDetails):
1009 assert src_bundle.backup_bundles is not None
1010 n = len(src_bundle.backup_bundles)
1011 uuid = src_bundle.uuid + f'_backup#{n}'
1013 backup_bundle = BundleDetails(
1014 pickled_code=src_bundle.pickled_code,
1016 fname=src_bundle.fname,
1020 hostname=src_bundle.hostname,
1021 code_file=src_bundle.code_file,
1022 result_file=src_bundle.result_file,
1024 start_ts=time.time(),
1026 slower_than_local_p95=False,
1027 slower_than_global_p95=False,
1028 src_bundle=src_bundle,
1029 is_cancelled=threading.Event(),
1030 was_cancelled=False,
1031 backup_bundles=None, # backup backups not allowed
1034 src_bundle.backup_bundles.append(backup_bundle)
1035 self.status.record_bundle_details_already_locked(backup_bundle)
1036 logger.debug(f'{backup_bundle}: Created a backup bundle')
1037 return backup_bundle
1039 def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1040 assert self.status.lock.locked()
1041 assert src_bundle is not None
1042 backup_bundle = self.create_backup_bundle(src_bundle)
1044 f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
1046 self._helper_executor.submit(self.launch, backup_bundle)
1048 # Results from backups don't matter; if they finish first
1049 # they will move the result_file to this machine and let
1050 # the original pick them up and unpickle them.
1052 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
1053 is_original = bundle.src_bundle is None
1054 bundle.worker = None
1055 avoid_last_machine = bundle.machine
1056 bundle.machine = None
1057 bundle.username = None
1058 bundle.failure_count += 1
1064 if bundle.failure_count > retry_limit:
1066 f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
1069 raise RemoteExecutorException(
1070 f'{bundle}: This bundle can\'t be completed despite several backups and retries'
1074 f'{bundle}: At least it\'s only a backup; better luck with the others.'
1078 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1081 return self.launch(bundle, avoid_last_machine)
1084 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1085 pickle = make_cloud_pickle(function, *args, **kwargs)
1086 bundle = self.create_original_bundle(pickle, function.__name__)
1087 self.total_bundles_submitted += 1
1088 return self._helper_executor.submit(self.launch, bundle)
1091 def shutdown(self, wait=True) -> None:
1092 logging.debug(f'Shutting down RemoteExecutor {self.title}')
1093 self.heartbeat_stop_event.set()
1094 self.heartbeat_thread.join()
1095 self._helper_executor.shutdown(wait)
1096 print(self.histogram)
1100 class DefaultExecutors(object):
1102 self.thread_executor: Optional[ThreadExecutor] = None
1103 self.process_executor: Optional[ProcessExecutor] = None
1104 self.remote_executor: Optional[RemoteExecutor] = None
1106 def ping(self, host) -> bool:
1107 logger.debug(f'RUN> ping -c 1 {host}')
1109 x = cmd_with_timeout(
1110 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1116 def thread_pool(self) -> ThreadExecutor:
1117 if self.thread_executor is None:
1118 self.thread_executor = ThreadExecutor()
1119 return self.thread_executor
1121 def process_pool(self) -> ProcessExecutor:
1122 if self.process_executor is None:
1123 self.process_executor = ProcessExecutor()
1124 return self.process_executor
1126 def remote_pool(self) -> RemoteExecutor:
1127 if self.remote_executor is None:
1128 logger.info('Looking for some helper machines...')
1129 pool: List[RemoteWorkerRecord] = []
1130 if self.ping('cheetah.house'):
1131 logger.info('Found cheetah.house')
1135 machine='cheetah.house',
1140 if self.ping('meerkat.cabin'):
1141 logger.info('Found meerkat.cabin')
1145 machine='meerkat.cabin',
1150 if self.ping('wannabe.house'):
1151 logger.info('Found wannabe.house')
1155 machine='wannabe.house',
1160 if self.ping('puma.cabin'):
1161 logger.info('Found puma.cabin')
1165 machine='puma.cabin',
1170 if self.ping('backup.house'):
1171 logger.info('Found backup.house')
1175 machine='backup.house',
1181 # The controller machine has a lot to do; go easy on it.
1183 if record.machine == platform.node() and record.count > 1:
1184 logger.info(f'Reducing workload for {record.machine}.')
1187 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1188 policy.register_worker_pool(pool)
1189 self.remote_executor = RemoteExecutor(pool, policy)
1190 return self.remote_executor
1192 def shutdown(self) -> None:
1193 if self.thread_executor is not None:
1194 self.thread_executor.shutdown()
1195 self.thread_executor = None
1196 if self.process_executor is not None:
1197 self.process_executor.shutdown()
1198 self.process_executor = None
1199 if self.remote_executor is not None:
1200 self.remote_executor.shutdown()
1201 self.remote_executor = None