2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
14 from __future__ import annotations
15 import concurrent.futures as fut
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
29 import cloudpickle # type: ignore
31 from overrides import overrides
35 import histogram as hist
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
40 from thread_utils import background_thread
42 logger = logging.getLogger(__name__)
44 parser = config.add_commandline_args(
45 f"Executors ({__file__})", "Args related to processing executors."
48 '--executors_threadpool_size',
51 help='Number of threads in the default threadpool, leave unset for default',
55 '--executors_processpool_size',
58 help='Number of processes in the default processpool, leave unset for default',
62 '--executors_schedule_remote_backups',
64 action=argparse_utils.ActionNoYes,
65 help='Should we schedule duplicative backup work if a remote bundle is slow',
68 '--executors_max_bundle_failures',
72 help='Maximum number of failures before giving up on a bundle',
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
79 def make_cloud_pickle(fun, *args, **kwargs):
80 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
81 return cloudpickle.dumps((fun, args, kwargs))
84 class BaseExecutor(ABC):
85 """The base executor interface definition."""
87 def __init__(self, *, title=''):
89 self.histogram = hist.SimpleHistogram(
90 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
95 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
99 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
102 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
103 """Shutdown the executor and return True if the executor is idle
104 (i.e. there are no pending or active tasks). Return False
105 otherwise. Note: this should only be called by the launcher
109 if self.task_count == 0:
110 self.shutdown(wait=True, quiet=quiet)
114 def adjust_task_count(self, delta: int) -> None:
115 """Change the task count. Note: do not call this method from a
116 worker, it should only be called by the launcher process /
120 self.task_count += delta
121 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
123 def get_task_count(self) -> int:
124 """Change the task count. Note: do not call this method from a
125 worker, it should only be called by the launcher process /
129 return self.task_count
132 class ThreadExecutor(BaseExecutor):
133 """A threadpool executor instance."""
135 def __init__(self, max_workers: Optional[int] = None):
138 if max_workers is not None:
139 workers = max_workers
140 elif 'executors_threadpool_size' in config.config:
141 workers = config.config['executors_threadpool_size']
142 logger.debug('Creating threadpool executor with %d workers', workers)
143 self._thread_pool_executor = fut.ThreadPoolExecutor(
144 max_workers=workers, thread_name_prefix="thread_executor_helper"
146 self.already_shutdown = False
148 # This is run on a different thread; do not adjust task count here.
150 def run_local_bundle(fun, *args, **kwargs):
151 logger.debug("Running local bundle at %s", fun.__name__)
152 result = fun(*args, **kwargs)
156 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
157 if self.already_shutdown:
158 raise Exception('Submitted work after shutdown.')
159 self.adjust_task_count(+1)
161 newargs.append(function)
165 result = self._thread_pool_executor.submit(
166 ThreadExecutor.run_local_bundle, *newargs, **kwargs
168 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
169 result.add_done_callback(lambda _: self.adjust_task_count(-1))
173 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
174 if not self.already_shutdown:
175 logger.debug('Shutting down threadpool executor %s', self.title)
176 self._thread_pool_executor.shutdown(wait)
178 print(self.histogram.__repr__(label_formatter='%ds'))
179 self.already_shutdown = True
182 class ProcessExecutor(BaseExecutor):
183 """A processpool executor."""
185 def __init__(self, max_workers=None):
188 if max_workers is not None:
189 workers = max_workers
190 elif 'executors_processpool_size' in config.config:
191 workers = config.config['executors_processpool_size']
192 logger.debug('Creating processpool executor with %d workers.', workers)
193 self._process_executor = fut.ProcessPoolExecutor(
196 self.already_shutdown = False
198 # This is run in another process; do not adjust task count here.
200 def run_cloud_pickle(pickle):
201 fun, args, kwargs = cloudpickle.loads(pickle)
202 logger.debug("Running pickled bundle at %s", fun.__name__)
203 result = fun(*args, **kwargs)
207 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
208 if self.already_shutdown:
209 raise Exception('Submitted work after shutdown.')
211 self.adjust_task_count(+1)
212 pickle = make_cloud_pickle(function, *args, **kwargs)
213 result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
214 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
215 result.add_done_callback(lambda _: self.adjust_task_count(-1))
219 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
220 if not self.already_shutdown:
221 logger.debug('Shutting down processpool executor %s', self.title)
222 self._process_executor.shutdown(wait)
224 print(self.histogram.__repr__(label_formatter='%ds'))
225 self.already_shutdown = True
227 def __getstate__(self):
228 state = self.__dict__.copy()
229 state['_process_executor'] = None
233 class RemoteExecutorException(Exception):
234 """Thrown when a bundle cannot be executed despite several retries."""
240 class RemoteWorkerRecord:
241 """A record of info about a remote worker."""
249 return hash((self.username, self.machine))
252 return f'{self.username}@{self.machine}'
257 """All info necessary to define some unit of work that needs to be
258 done, where it is being run, its state, whether it is an original
259 bundle of a backup bundle, how many times it has failed, etc...
266 worker: Optional[RemoteWorkerRecord]
267 username: Optional[str]
268 machine: Optional[str]
275 slower_than_local_p95: bool
276 slower_than_global_p95: bool
277 src_bundle: Optional[BundleDetails]
278 is_cancelled: threading.Event
280 backup_bundles: Optional[List[BundleDetails]]
285 if uuid[-9:-2] == '_backup':
287 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
297 fg('marigold yellow'),
300 fg('cornflower blue'),
301 fg('turquoise blue'),
303 fg('lavender purple'),
306 c = colorz[int(uuid[-2:], 16) % len(colorz)]
307 fname = self.fname if self.fname is not None else 'nofname'
308 machine = self.machine if self.machine is not None else 'nomachine'
309 return f'{c}{suffix}/{fname}/{machine}{reset()}'
312 class RemoteExecutorStatus:
313 """A status 'scoreboard' for a remote executor."""
315 def __init__(self, total_worker_count: int) -> None:
316 self.worker_count: int = total_worker_count
317 self.known_workers: Set[RemoteWorkerRecord] = set()
318 self.start_time: float = time.time()
319 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
320 self.end_per_bundle: Dict[str, float] = defaultdict(float)
321 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
322 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
323 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
324 self.finished_bundle_timings: List[float] = []
325 self.last_periodic_dump: Optional[float] = None
326 self.total_bundles_submitted: int = 0
328 # Protects reads and modification using self. Also used
329 # as a memory fence for modifications to bundle.
330 self.lock: threading.Lock = threading.Lock()
332 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
334 self.record_acquire_worker_already_locked(worker, uuid)
336 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
337 assert self.lock.locked()
338 self.known_workers.add(worker)
339 self.start_per_bundle[uuid] = None
340 x = self.in_flight_bundles_by_worker.get(worker, set())
342 self.in_flight_bundles_by_worker[worker] = x
344 def record_bundle_details(self, details: BundleDetails) -> None:
346 self.record_bundle_details_already_locked(details)
348 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
349 assert self.lock.locked()
350 self.bundle_details_by_uuid[details.uuid] = details
352 def record_release_worker(
354 worker: RemoteWorkerRecord,
359 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
361 def record_release_worker_already_locked(
363 worker: RemoteWorkerRecord,
367 assert self.lock.locked()
369 self.end_per_bundle[uuid] = ts
370 self.in_flight_bundles_by_worker[worker].remove(uuid)
371 if not was_cancelled:
372 start = self.start_per_bundle[uuid]
373 assert start is not None
374 bundle_latency = ts - start
375 x = self.finished_bundle_timings_per_worker.get(worker, [])
376 x.append(bundle_latency)
377 self.finished_bundle_timings_per_worker[worker] = x
378 self.finished_bundle_timings.append(bundle_latency)
380 def record_processing_began(self, uuid: str):
382 self.start_per_bundle[uuid] = time.time()
384 def total_in_flight(self) -> int:
385 assert self.lock.locked()
387 for worker in self.known_workers:
388 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
389 return total_in_flight
391 def total_idle(self) -> int:
392 assert self.lock.locked()
393 return self.worker_count - self.total_in_flight()
396 assert self.lock.locked()
398 total_finished = len(self.finished_bundle_timings)
399 total_in_flight = self.total_in_flight()
400 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
402 if len(self.finished_bundle_timings) > 1:
403 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
405 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
406 f'✅={total_finished}/{self.total_bundles_submitted}, '
407 f'💻n={total_in_flight}/{self.worker_count}\n'
411 f'⏱={ts-self.start_time:.1f}s, '
412 f'✅={total_finished}/{self.total_bundles_submitted}, '
413 f'💻n={total_in_flight}/{self.worker_count}\n'
416 for worker in self.known_workers:
417 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
418 timings = self.finished_bundle_timings_per_worker.get(worker, [])
422 qworker = numpy.quantile(timings, [0.5, 0.95])
423 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
427 ret += f' ...finished {count} total bundle(s) so far\n'
428 in_flight = len(self.in_flight_bundles_by_worker[worker])
430 ret += f' ...{in_flight} bundles currently in flight:\n'
431 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
432 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
433 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
434 if self.start_per_bundle[bundle_uuid] is not None:
435 sec = ts - self.start_per_bundle[bundle_uuid]
436 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
438 ret += f' {details} setting up / copying data...'
441 if qworker is not None:
443 ret += f'{bg("red")}>💻p95{reset()} '
444 if details is not None:
445 details.slower_than_local_p95 = True
447 if details is not None:
448 details.slower_than_local_p95 = False
452 ret += f'{bg("red")}>∀p95{reset()} '
453 if details is not None:
454 details.slower_than_global_p95 = True
456 details.slower_than_global_p95 = False
460 def periodic_dump(self, total_bundles_submitted: int) -> None:
461 assert self.lock.locked()
462 self.total_bundles_submitted = total_bundles_submitted
464 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
466 self.last_periodic_dump = ts
469 class RemoteWorkerSelectionPolicy(ABC):
470 """A policy for selecting a remote worker base class."""
473 self.workers: Optional[List[RemoteWorkerRecord]] = None
475 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
476 self.workers = workers
479 def is_worker_available(self) -> bool:
483 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
487 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
488 """A remote worker selector that uses weighted RNG."""
491 def is_worker_available(self) -> bool:
493 for worker in self.workers:
499 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
502 for worker in self.workers:
503 if worker.machine != machine_to_avoid:
505 for _ in range(worker.count * worker.weight):
506 grabbag.append(worker)
508 if len(grabbag) == 0:
509 logger.debug('There are no available workers that avoid %s', machine_to_avoid)
511 for worker in self.workers:
513 for _ in range(worker.count * worker.weight):
514 grabbag.append(worker)
516 if len(grabbag) == 0:
517 logger.warning('There are no available workers?!')
520 worker = random.sample(grabbag, 1)[0]
521 assert worker.count > 0
523 logger.debug('Selected worker %s', worker)
527 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
528 """A remote worker selector that just round robins."""
530 def __init__(self) -> None:
535 def is_worker_available(self) -> bool:
537 for worker in self.workers:
543 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
547 worker = self.workers[x]
551 if x >= len(self.workers):
554 logger.debug('Selected worker %s', worker)
557 if x >= len(self.workers):
560 logger.warning('Unexpectedly could not find a worker, retrying...')
565 class RemoteExecutor(BaseExecutor):
566 """A remote work executor."""
570 workers: List[RemoteWorkerRecord],
571 policy: RemoteWorkerSelectionPolicy,
574 self.workers = workers
576 self.worker_count = 0
577 for worker in self.workers:
578 self.worker_count += worker.count
579 if self.worker_count <= 0:
580 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
582 raise RemoteExecutorException(msg)
583 self.policy.register_worker_pool(self.workers)
584 self.cv = threading.Condition()
585 logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
586 self._helper_executor = fut.ThreadPoolExecutor(
587 thread_name_prefix="remote_executor_helper",
588 max_workers=self.worker_count,
590 self.status = RemoteExecutorStatus(self.worker_count)
591 self.total_bundles_submitted = 0
592 self.backup_lock = threading.Lock()
593 self.last_backup = None
595 self.heartbeat_thread,
596 self.heartbeat_stop_event,
597 ) = self.run_periodic_heartbeat()
598 self.already_shutdown = False
601 def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
602 while not stop_event.is_set():
604 logger.debug('Running periodic heartbeat code...')
606 logger.debug('Periodic heartbeat thread shutting down.')
608 def heartbeat(self) -> None:
609 # Note: this is invoked on a background thread, not an
610 # executor thread. Be careful what you do with it b/c it
611 # needs to get back and dump status again periodically.
612 with self.status.lock:
613 self.status.periodic_dump(self.total_bundles_submitted)
615 # Look for bundles to reschedule via executor.submit
616 if config.config['executors_schedule_remote_backups']:
617 self.maybe_schedule_backup_bundles()
619 def maybe_schedule_backup_bundles(self):
620 assert self.status.lock.locked()
621 num_done = len(self.status.finished_bundle_timings)
622 num_idle_workers = self.worker_count - self.task_count
626 and num_idle_workers > 0
627 and (self.last_backup is None or (now - self.last_backup > 9.0))
628 and self.backup_lock.acquire(blocking=False)
631 assert self.backup_lock.locked()
633 bundle_to_backup = None
638 ) in self.status.in_flight_bundles_by_worker.items():
640 # Prefer to schedule backups of bundles running on
643 for record in self.workers:
644 if worker.machine == record.machine:
645 base_score = float(record.weight)
646 base_score = 1.0 / base_score
648 base_score = int(base_score)
651 for uuid in bundle_uuids:
652 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
655 and bundle.src_bundle is None
656 and bundle.backup_bundles is not None
660 # Schedule backups of bundles running
661 # longer; especially those that are
663 start_ts = self.status.start_per_bundle[uuid]
664 if start_ts is not None:
665 runtime = now - start_ts
667 logger.debug('score[%s] => %.1f # latency boost', bundle, score)
669 if bundle.slower_than_local_p95:
671 logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
673 if bundle.slower_than_global_p95:
675 logger.debug('score[%s] => %.1f # >global p95', bundle, score)
677 # Prefer backups of bundles that don't
678 # have backups already.
679 backup_count = len(bundle.backup_bundles)
680 if backup_count == 0:
682 elif backup_count == 1:
684 elif backup_count == 2:
689 'score[%s] => %.1f # {backup_count} dup backup factor',
694 if score != 0 and (best_score is None or score > best_score):
695 bundle_to_backup = bundle
696 assert bundle is not None
697 assert bundle.backup_bundles is not None
698 assert bundle.src_bundle is None
701 # Note: this is all still happening on the heartbeat
702 # runner thread. That's ok because
703 # schedule_backup_for_bundle uses the executor to
704 # submit the bundle again which will cause it to be
705 # picked up by a worker thread and allow this thread
706 # to return to run future heartbeats.
707 if bundle_to_backup is not None:
708 self.last_backup = now
710 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
714 self.schedule_backup_for_bundle(bundle_to_backup)
716 self.backup_lock.release()
718 def is_worker_available(self) -> bool:
719 return self.policy.is_worker_available()
721 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
722 return self.policy.acquire_worker(machine_to_avoid)
724 def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
726 while not self.is_worker_available():
728 worker = self.acquire_worker(machine_to_avoid)
729 if worker is not None:
731 msg = "We should never reach this point in the code"
735 def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
736 worker = bundle.worker
737 assert worker is not None
738 logger.debug('Released worker %s', worker)
739 self.status.record_release_worker(
747 self.adjust_task_count(-1)
749 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
750 with self.status.lock:
751 if bundle.is_cancelled.wait(timeout=0.0):
752 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
753 bundle.was_cancelled = True
757 def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
758 """Find a worker for bundle or block until one is available."""
760 self.adjust_task_count(+1)
762 hostname = bundle.hostname
763 avoid_machine = override_avoid_machine
764 is_original = bundle.src_bundle is None
766 # Try not to schedule a backup on the same host as the original.
767 if avoid_machine is None and bundle.src_bundle is not None:
768 avoid_machine = bundle.src_bundle.machine
770 while worker is None:
771 worker = self.find_available_worker_or_block(avoid_machine)
772 assert worker is not None
774 # Ok, found a worker.
775 bundle.worker = worker
776 machine = bundle.machine = worker.machine
777 username = bundle.username = worker.username
778 self.status.record_acquire_worker(worker, uuid)
779 logger.debug('%s: Running bundle on %s...', bundle, worker)
781 # Before we do any work, make sure the bundle is still viable.
782 # It may have been some time between when it was submitted and
783 # now due to lack of worker availability and someone else may
784 # have already finished it.
785 if self.check_if_cancelled(bundle):
787 return self.process_work_result(bundle)
788 except Exception as e:
789 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
790 self.release_worker(bundle)
792 # Weird. We are the original owner of this
793 # bundle. For it to have been cancelled, a backup
794 # must have already started and completed before
795 # we even for started. Moreover, the backup says
796 # it is done but we can't find the results it
797 # should have copied over. Reschedule the whole
801 '%s: We are the original owner thread and yet there are '
802 'no results for this bundle. This is unexpected and bad.',
805 return self.emergency_retry_nasty_bundle(bundle)
807 # We're a backup and our bundle is cancelled
808 # before we even got started. Do nothing and let
809 # the original bundle's thread worry about either
810 # finding the results or complaining about it.
813 # Send input code / data to worker machine if it's not local.
814 if hostname not in machine:
816 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
817 start_ts = time.time()
818 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
820 xfer_latency = time.time() - start_ts
821 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
822 except Exception as e:
823 self.release_worker(bundle)
825 # Weird. We tried to copy the code to the worker
826 # and it failed... And we're the original bundle.
830 "%s: Failed to send instructions to the worker machine?! "
831 "This is not expected; we\'re the original bundle so this shouldn\'t "
832 "be a race condition. Attempting an emergency retry...",
835 return self.emergency_retry_nasty_bundle(bundle)
837 # This is actually expected; we're a backup.
838 # There's a race condition where someone else
839 # already finished the work and removed the source
840 # code_file before we could copy it. Ignore.
842 '%s: Failed to send instructions to the worker machine... '
843 'We\'re a backup and this may be caused by the original (or '
844 'some other backup) already finishing this work. Ignoring.',
849 # Kick off the work. Note that if this fails we let
850 # wait_for_process deal with it.
851 self.status.record_processing_began(uuid)
853 f'{SSH} {bundle.username}@{bundle.machine} '
854 f'"source py38-venv/bin/activate &&'
855 f' /home/scott/lib/python_modules/remote_worker.py'
856 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
858 logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
859 p = cmd_in_background(cmd, silent=True)
861 logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
862 return self.wait_for_process(p, bundle, 0)
864 def wait_for_process(
865 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
867 machine = bundle.machine
872 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
875 self.release_worker(bundle)
876 return self.emergency_retry_nasty_bundle(bundle)
878 # Spin until either the ssh job we scheduled finishes the
879 # bundle or some backup worker signals that they finished it
884 except subprocess.TimeoutExpired:
885 if self.check_if_cancelled(bundle):
886 logger.info('%s: looks like another worker finished bundle...', bundle)
889 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
893 # If we get here we believe the bundle is done; either the ssh
894 # subprocess finished (hopefully successfully) or we noticed
895 # that some other worker seems to have completed the bundle
896 # and we're bailing out.
898 ret = self.process_work_result(bundle)
899 if ret is not None and p is not None:
903 # Something went wrong; e.g. we could not copy the results
904 # back, cleanup after ourselves on the remote machine, or
905 # unpickle the results we got from the remove machine. If we
906 # still have an active ssh subprocess, keep waiting on it.
907 # Otherwise, time for an emergency reschedule.
908 except Exception as e:
910 logger.error('%s: Something unexpected just happened...', bundle)
913 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
915 return self.wait_for_process(p, bundle, depth + 1)
917 self.release_worker(bundle)
918 return self.emergency_retry_nasty_bundle(bundle)
920 def process_work_result(self, bundle: BundleDetails) -> Any:
921 with self.status.lock:
922 is_original = bundle.src_bundle is None
923 was_cancelled = bundle.was_cancelled
924 username = bundle.username
925 machine = bundle.machine
926 result_file = bundle.result_file
927 code_file = bundle.code_file
929 # Whether original or backup, if we finished first we must
930 # fetch the results if the computation happened on a
932 bundle.end_ts = time.time()
933 if not was_cancelled:
934 assert bundle.machine is not None
935 if bundle.hostname not in bundle.machine:
936 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
938 "%s: Fetching results back from %s@%s via %s",
945 # If either of these throw they are handled in
951 except Exception as e:
958 # Cleanup remote /tmp files.
960 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
962 logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
963 dur = bundle.end_ts - bundle.start_ts
964 self.histogram.add_item(dur)
966 # Only the original worker should unpickle the file contents
967 # though since it's the only one whose result matters. The
968 # original is also the only job that may delete result_file
969 # from disk. Note that the original may have been cancelled
970 # if one of the backups finished first; it still must read the
971 # result from disk. It still does that here with is_cancelled
974 logger.debug("%s: Unpickling %s.", bundle, result_file)
976 with open(result_file, 'rb') as rb:
977 serialized = rb.read()
978 result = cloudpickle.loads(serialized)
979 except Exception as e:
981 logger.error('Failed to load %s... this is bad news.', result_file)
982 self.release_worker(bundle)
984 # Re-raise the exception; the code in wait_for_process may
985 # decide to emergency_retry_nasty_bundle here.
987 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
988 os.remove(result_file)
991 # Notify any backups that the original is done so they
992 # should stop ASAP. Do this whether or not we
993 # finished first since there could be more than one
995 if bundle.backup_bundles is not None:
996 for backup in bundle.backup_bundles:
998 '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1000 backup.is_cancelled.set()
1002 # This is a backup job and, by now, we have already fetched
1003 # the bundle results.
1005 # Backup results don't matter, they just need to leave the
1006 # result file in the right place for their originals to
1007 # read/unpickle later.
1010 # Tell the original to stop if we finished first.
1011 if not was_cancelled:
1012 orig_bundle = bundle.src_bundle
1013 assert orig_bundle is not None
1015 '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1017 orig_bundle.is_cancelled.set()
1018 self.release_worker(bundle, was_cancelled=was_cancelled)
1021 def create_original_bundle(self, pickle, fname: str):
1022 uuid = string_utils.generate_uuid(omit_dashes=True)
1023 code_file = f'/tmp/{uuid}.code.bin'
1024 result_file = f'/tmp/{uuid}.result.bin'
1026 logger.debug('Writing pickled code to %s', code_file)
1027 with open(code_file, 'wb') as wb:
1030 bundle = BundleDetails(
1031 pickled_code=pickle,
1037 hostname=platform.node(),
1038 code_file=code_file,
1039 result_file=result_file,
1041 start_ts=time.time(),
1043 slower_than_local_p95=False,
1044 slower_than_global_p95=False,
1046 is_cancelled=threading.Event(),
1047 was_cancelled=False,
1051 self.status.record_bundle_details(bundle)
1052 logger.debug('%s: Created an original bundle', bundle)
1055 def create_backup_bundle(self, src_bundle: BundleDetails):
1056 assert self.status.lock.locked()
1057 assert src_bundle.backup_bundles is not None
1058 n = len(src_bundle.backup_bundles)
1059 uuid = src_bundle.uuid + f'_backup#{n}'
1061 backup_bundle = BundleDetails(
1062 pickled_code=src_bundle.pickled_code,
1064 fname=src_bundle.fname,
1068 hostname=src_bundle.hostname,
1069 code_file=src_bundle.code_file,
1070 result_file=src_bundle.result_file,
1072 start_ts=time.time(),
1074 slower_than_local_p95=False,
1075 slower_than_global_p95=False,
1076 src_bundle=src_bundle,
1077 is_cancelled=threading.Event(),
1078 was_cancelled=False,
1079 backup_bundles=None, # backup backups not allowed
1082 src_bundle.backup_bundles.append(backup_bundle)
1083 self.status.record_bundle_details_already_locked(backup_bundle)
1084 logger.debug('%s: Created a backup bundle', backup_bundle)
1085 return backup_bundle
1087 def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1088 assert self.status.lock.locked()
1089 assert src_bundle is not None
1090 backup_bundle = self.create_backup_bundle(src_bundle)
1092 '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
1094 self._helper_executor.submit(self.launch, backup_bundle)
1096 # Results from backups don't matter; if they finish first
1097 # they will move the result_file to this machine and let
1098 # the original pick them up and unpickle them (and return
1101 def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1102 is_original = bundle.src_bundle is None
1103 bundle.worker = None
1104 avoid_last_machine = bundle.machine
1105 bundle.machine = None
1106 bundle.username = None
1107 bundle.failure_count += 1
1113 if bundle.failure_count > retry_limit:
1115 '%s: Tried this bundle too many times already (%dx); giving up.',
1120 raise RemoteExecutorException(
1121 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1125 '%s: At least it\'s only a backup; better luck with the others.', bundle
1129 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1132 return self.launch(bundle, avoid_last_machine)
1135 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1136 if self.already_shutdown:
1137 raise Exception('Submitted work after shutdown.')
1138 pickle = make_cloud_pickle(function, *args, **kwargs)
1139 bundle = self.create_original_bundle(pickle, function.__name__)
1140 self.total_bundles_submitted += 1
1141 return self._helper_executor.submit(self.launch, bundle)
1144 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1145 if not self.already_shutdown:
1146 logging.debug('Shutting down RemoteExecutor %s', self.title)
1147 self.heartbeat_stop_event.set()
1148 self.heartbeat_thread.join()
1149 self._helper_executor.shutdown(wait)
1151 print(self.histogram.__repr__(label_formatter='%ds'))
1152 self.already_shutdown = True
1156 class DefaultExecutors(object):
1157 """A container for a default thread, process and remote executor.
1158 These are not created until needed and we take care to clean up
1159 before process exit.
1164 self.thread_executor: Optional[ThreadExecutor] = None
1165 self.process_executor: Optional[ProcessExecutor] = None
1166 self.remote_executor: Optional[RemoteExecutor] = None
1169 def ping(host) -> bool:
1170 logger.debug('RUN> ping -c 1 %s', host)
1172 x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1177 def thread_pool(self) -> ThreadExecutor:
1178 if self.thread_executor is None:
1179 self.thread_executor = ThreadExecutor()
1180 return self.thread_executor
1182 def process_pool(self) -> ProcessExecutor:
1183 if self.process_executor is None:
1184 self.process_executor = ProcessExecutor()
1185 return self.process_executor
1187 def remote_pool(self) -> RemoteExecutor:
1188 if self.remote_executor is None:
1189 logger.info('Looking for some helper machines...')
1190 pool: List[RemoteWorkerRecord] = []
1191 if self.ping('cheetah.house'):
1192 logger.info('Found cheetah.house')
1196 machine='cheetah.house',
1201 if self.ping('meerkat.cabin'):
1202 logger.info('Found meerkat.cabin')
1206 machine='meerkat.cabin',
1211 if self.ping('wannabe.house'):
1212 logger.info('Found wannabe.house')
1216 machine='wannabe.house',
1221 if self.ping('puma.cabin'):
1222 logger.info('Found puma.cabin')
1226 machine='puma.cabin',
1231 if self.ping('backup.house'):
1232 logger.info('Found backup.house')
1236 machine='backup.house',
1242 # The controller machine has a lot to do; go easy on it.
1244 if record.machine == platform.node() and record.count > 1:
1245 logger.info('Reducing workload for %s.', record.machine)
1246 record.count = max(int(record.count / 2), 1)
1248 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1249 policy.register_worker_pool(pool)
1250 self.remote_executor = RemoteExecutor(pool, policy)
1251 return self.remote_executor
1253 def shutdown(self) -> None:
1254 if self.thread_executor is not None:
1255 self.thread_executor.shutdown(wait=True, quiet=True)
1256 self.thread_executor = None
1257 if self.process_executor is not None:
1258 self.process_executor.shutdown(wait=True, quiet=True)
1259 self.process_executor = None
1260 if self.remote_executor is not None:
1261 self.remote_executor.shutdown(wait=True, quiet=True)
1262 self.remote_executor = None