3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
19 import cloudpickle # type: ignore
21 from ansi import bg, fg, underline, reset
25 from decorator_utils import singleton
29 logger = logging.getLogger(__name__)
31 parser = config.add_commandline_args(
32 f"Executors ({__file__})",
33 "Args related to processing executors."
36 '--executors_threadpool_size',
39 help='Number of threads in the default threadpool, leave unset for default',
43 '--executors_processpool_size',
46 help='Number of processes in the default processpool, leave unset for default',
50 '--executors_schedule_remote_backups',
52 action=argparse_utils.ActionNoYes,
53 help='Should we schedule duplicative backup work if a remote bundle is slow',
56 '--executors_max_bundle_failures',
60 help='Maximum number of failures before giving up on a bundle',
63 RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
64 SSH = 'ssh -oForwardX11=no'
65 HIST = histogram.SimpleHistogram(
66 histogram.SimpleHistogram.n_evenly_spaced_buckets(
72 def run_local_bundle(fun, *args, **kwargs):
73 logger.debug(f"Running local bundle at {fun.__name__}")
75 result = fun(*args, **kwargs)
77 duration = end - start
78 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
79 HIST.add_item(duration)
83 def run_cloud_pickle(pickle):
84 fun, args, kwargs = cloudpickle.loads(pickle)
85 logger.debug(f"Running pickled bundle at {fun.__name__}")
87 result = fun(*args, **kwargs)
89 duration = end - start
90 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
94 def make_cloud_pickle(fun, *args, **kwargs):
95 logger.info(f"Making cloudpickled bundle at {fun.__name__}")
96 return cloudpickle.dumps((fun, args, kwargs))
99 class BaseExecutor(ABC):
107 **kwargs) -> fut.Future:
112 wait: bool = True) -> None:
116 class ThreadExecutor(BaseExecutor):
118 max_workers: Optional[int] = None):
121 if max_workers is not None:
122 workers = max_workers
123 elif 'executors_threadpool_size' in config.config:
124 workers = config.config['executors_threadpool_size']
125 logger.debug(f'Creating threadpool executor with {workers} workers')
126 self._thread_pool_executor = fut.ThreadPoolExecutor(
128 thread_name_prefix="thread_executor_helper"
135 **kwargs) -> fut.Future:
138 f'Submitted work to threadpool; there are now {self.job_count} items.'
141 newargs.append(function)
144 return self._thread_pool_executor.submit(
150 wait = True) -> None:
151 logger.debug("Shutting down threadpool executor.")
153 self._thread_pool_executor.shutdown(wait)
156 class ProcessExecutor(BaseExecutor):
161 if max_workers is not None:
162 workers = max_workers
163 elif 'executors_processpool_size' in config.config:
164 workers = config.config['executors_processpool_size']
165 logger.debug(f'Creating processpool executor with {workers} workers.')
166 self._process_executor = fut.ProcessPoolExecutor(
174 **kwargs) -> fut.Future:
175 # Bundle it up before submitting because pickle sucks.
176 pickle = make_cloud_pickle(function, *args, **kwargs)
179 f'Submitting work to processpool executor; there are now {self.job_count} items.'
181 return self._process_executor.submit(run_cloud_pickle, pickle)
183 def shutdown(self, wait=True) -> None:
184 logger.debug('Shutting down processpool executor')
186 self._process_executor.shutdown(wait)
190 class RemoteWorkerRecord:
197 return hash((self.username, self.machine))
200 return f'{self.username}@{self.machine}'
207 worker: Optional[RemoteWorkerRecord]
208 username: Optional[str]
209 machine: Optional[str]
218 src_bundle: BundleDetails
219 is_cancelled: threading.Event
221 backup_bundles: Optional[List[BundleDetails]]
225 class RemoteExecutorStatus:
226 def __init__(self, total_worker_count: int) -> None:
227 self.worker_count = total_worker_count
228 self.known_workers: Set[RemoteWorkerRecord] = set()
229 self.start_per_bundle: Dict[str, float] = defaultdict(float)
230 self.end_per_bundle: Dict[str, float] = defaultdict(float)
231 self.finished_bundle_timings_per_worker: Dict[
235 self.in_flight_bundles_by_worker: Dict[
239 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
240 self.finished_bundle_timings: List[float] = []
241 self.last_periodic_dump: Optional[float] = None
242 self.total_bundles_submitted = 0
244 # Protects reads and modification using self. Also used
245 # as a memory fence for modifications to bundle.
246 self.lock = threading.Lock()
248 def record_acquire_worker(
250 worker: RemoteWorkerRecord,
254 self.record_acquire_worker_already_locked(
259 def record_acquire_worker_already_locked(
261 worker: RemoteWorkerRecord,
264 assert self.lock.locked()
265 self.known_workers.add(worker)
266 self.start_per_bundle[uuid] = time.time()
267 x = self.in_flight_bundles_by_worker.get(worker, set())
269 self.in_flight_bundles_by_worker[worker] = x
271 def record_bundle_details(
273 details: BundleDetails) -> None:
275 self.record_bundle_details_already_locked(details)
277 def record_bundle_details_already_locked(
279 details: BundleDetails) -> None:
280 assert self.lock.locked()
281 self.bundle_details_by_uuid[details.uuid] = details
283 def record_release_worker_already_locked(
285 worker: RemoteWorkerRecord,
289 assert self.lock.locked()
291 self.end_per_bundle[uuid] = ts
292 self.in_flight_bundles_by_worker[worker].remove(uuid)
293 if not was_cancelled:
294 bundle_latency = ts - self.start_per_bundle[uuid]
295 x = self.finished_bundle_timings_per_worker.get(worker, list())
296 x.append(bundle_latency)
297 self.finished_bundle_timings_per_worker[worker] = x
298 self.finished_bundle_timings.append(bundle_latency)
300 def total_in_flight(self) -> int:
301 assert self.lock.locked()
303 for worker in self.known_workers:
304 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
305 return total_in_flight
307 def total_idle(self) -> int:
308 assert self.lock.locked()
309 return self.worker_count - self.total_in_flight()
312 assert self.lock.locked()
314 total_finished = len(self.finished_bundle_timings)
315 total_in_flight = self.total_in_flight()
316 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
318 if len(self.finished_bundle_timings) > 1:
319 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
321 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
322 f'✅={total_finished}/{self.total_bundles_submitted}, '
323 f'💻n={total_in_flight}/{self.worker_count}\n'
327 f' ✅={total_finished}/{self.total_bundles_submitted}, '
328 f'💻n={total_in_flight}/{self.worker_count}\n'
331 for worker in self.known_workers:
332 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
333 timings = self.finished_bundle_timings_per_worker.get(worker, [])
337 qworker = numpy.quantile(timings, [0.5, 0.95])
338 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
342 ret += f' ...finished {count} total bundle(s) so far\n'
343 in_flight = len(self.in_flight_bundles_by_worker[worker])
345 ret += f' ...{in_flight} bundles currently in flight:\n'
346 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
347 details = self.bundle_details_by_uuid.get(
351 pid = str(details.pid) if details is not None else "TBD"
352 sec = ts - self.start_per_bundle[bundle_uuid]
353 ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
354 if qworker is not None:
356 ret += f'{bg("red")}>💻p95{reset()} '
357 elif sec > qworker[0]:
358 ret += f'{fg("red")}>💻p50{reset()} '
360 if sec > qall[1] * 1.5:
361 ret += f'{bg("red")}!!!{reset()}'
362 if details is not None:
363 logger.debug(f'Flagging {details.uuid} for another backup')
364 details.super_slow = True
366 ret += f'{bg("red")}>∀p95{reset()} '
367 if details is not None:
368 logger.debug(f'Flagging {details.uuid} for a backup')
369 details.too_slow = True
371 ret += f'{fg("red")}>∀p50{reset()}'
375 def periodic_dump(self, total_bundles_submitted: int) -> None:
376 assert self.lock.locked()
377 self.total_bundles_submitted = total_bundles_submitted
380 self.last_periodic_dump is None
381 or ts - self.last_periodic_dump > 5.0
384 self.last_periodic_dump = ts
387 class RemoteWorkerSelectionPolicy(ABC):
388 def register_worker_pool(self, workers):
390 self.workers = workers
393 def is_worker_available(self) -> bool:
399 machine_to_avoid = None
400 ) -> Optional[RemoteWorkerRecord]:
404 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
405 def is_worker_available(self) -> bool:
406 for worker in self.workers:
413 machine_to_avoid = None
414 ) -> Optional[RemoteWorkerRecord]:
416 for worker in self.workers:
417 for x in range(0, worker.count):
418 for y in range(0, worker.weight):
419 grabbag.append(worker)
421 for _ in range(0, 5):
422 random.shuffle(grabbag)
424 if worker.machine != machine_to_avoid or _ > 2:
427 logger.debug(f'Selected worker {worker}')
429 logger.warning("Couldn't find a worker; go fish.")
433 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
434 def __init__(self) -> None:
437 def is_worker_available(self) -> bool:
438 for worker in self.workers:
445 machine_to_avoid: str = None
446 ) -> Optional[RemoteWorkerRecord]:
449 worker = self.workers[x]
453 if x >= len(self.workers):
456 logger.debug(f'Selected worker {worker}')
459 if x >= len(self.workers):
462 logger.warning("Couldn't find a worker; go fish.")
466 class RemoteExecutor(BaseExecutor):
468 workers: List[RemoteWorkerRecord],
469 policy: RemoteWorkerSelectionPolicy) -> None:
471 self.workers = workers
472 self.worker_count = 0
473 for worker in self.workers:
474 self.worker_count += worker.count
475 if self.worker_count <= 0:
476 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
480 self.policy.register_worker_pool(self.workers)
481 self.cv = threading.Condition()
482 self._helper_executor = fut.ThreadPoolExecutor(
483 thread_name_prefix="remote_executor_helper",
484 max_workers=self.worker_count,
486 self.status = RemoteExecutorStatus(self.worker_count)
487 self.total_bundles_submitted = 0
489 f'Creating remote processpool with {self.worker_count} remote endpoints.'
492 def is_worker_available(self) -> bool:
493 return self.policy.is_worker_available()
497 machine_to_avoid: str = None
498 ) -> Optional[RemoteWorkerRecord]:
499 return self.policy.acquire_worker(machine_to_avoid)
501 def find_available_worker_or_block(
503 machine_to_avoid: str = None
504 ) -> RemoteWorkerRecord:
506 while not self.is_worker_available():
508 worker = self.acquire_worker(machine_to_avoid)
509 if worker is not None:
511 msg = "We should never reach this point in the code"
515 def release_worker(self, worker: RemoteWorkerRecord) -> None:
516 logger.debug(f'Released worker {worker}')
521 def heartbeat(self) -> None:
522 with self.status.lock:
523 # Regular progress report
524 self.status.periodic_dump(self.total_bundles_submitted)
526 # Look for bundles to reschedule
527 if len(self.status.finished_bundle_timings) > 7:
528 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
529 for uuid in bundle_uuids:
530 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
532 bundle is not None and
534 bundle.src_bundle is None and
535 config.config['executors_schedule_remote_backups']
537 self.consider_backup_for_bundle(bundle)
539 def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
540 assert self.status.lock.locked()
543 and len(bundle.backup_bundles) == 0 # one backup per
545 msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
547 self.schedule_backup_for_bundle(bundle)
551 and len(bundle.backup_bundles) < 2 # two backups in dire situations
552 and self.status.total_idle() > 4
554 msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
556 self.schedule_backup_for_bundle(bundle)
559 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
560 with self.status.lock:
561 if bundle.is_cancelled.wait(timeout=0.0):
562 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
563 bundle.was_cancelled = True
567 def launch(self, bundle: BundleDetails) -> Any:
568 """Find a worker for bundle or block until one is available."""
570 hostname = bundle.hostname
573 # Try not to schedule a backup on the same host as the original.
574 if bundle.src_bundle is not None:
575 avoid_machine = bundle.src_bundle.machine
577 while worker is None:
578 worker = self.find_available_worker_or_block(avoid_machine)
579 bundle.worker = worker
580 machine = bundle.machine = worker.machine
581 username = bundle.username = worker.username
582 self.status.record_acquire_worker(worker, uuid)
583 logger.debug(f'Running bundle {uuid} on {worker}...')
585 # Before we do any work, make sure the bundle is still viable.
586 if self.check_if_cancelled(bundle):
588 return self.post_launch_work(bundle)
589 except Exception as e:
591 logger.info(f"Bundle {uuid} seems to have failed?!")
592 if bundle.failure_count < config.config['executors_max_bundle_failures']:
593 return self.launch(bundle)
595 logger.info(f"Bundle {uuid} is poison, giving up on it.")
598 # Send input to machine if it's not local.
599 if hostname not in machine:
600 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
601 logger.info(f"Copying work to {worker} via {cmd}")
602 exec_utils.run_silently(cmd)
605 cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
606 f'"source remote-execution/bin/activate &&'
607 f' /home/scott/lib/python_modules/remote_worker.py'
608 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
609 p = exec_utils.cmd_in_background(cmd, silent=True)
610 bundle.pid = pid = p.pid
611 logger.info(f"Running {cmd} in the background as process {pid}")
616 except subprocess.TimeoutExpired:
619 # Both source and backup bundles can be cancelled by
620 # the other depending on which finishes first.
621 if self.check_if_cancelled(bundle):
626 f"{pid}/{bundle.uuid} has finished its work normally."
631 return self.post_launch_work(bundle)
632 except Exception as e:
634 logger.info(f"Bundle {uuid} seems to have failed?!")
635 if bundle.failure_count < config.config['executors_max_bundle_failures']:
636 return self.launch(bundle)
637 logger.info(f"Bundle {uuid} is poison, giving up on it.")
640 def post_launch_work(self, bundle: BundleDetails) -> Any:
641 with self.status.lock:
642 is_original = bundle.src_bundle is None
643 was_cancelled = bundle.was_cancelled
644 username = bundle.username
645 machine = bundle.machine
646 result_file = bundle.result_file
647 code_file = bundle.code_file
649 # Whether original or backup, if we finished first we must
650 # fetch the results if the computation happened on a
652 if not was_cancelled:
653 assert bundle.machine is not None
654 if bundle.hostname not in bundle.machine:
655 cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
657 f"Fetching results from {username}@{machine} via {cmd}"
660 exec_utils.run_silently(cmd)
661 except subprocess.CalledProcessError:
663 exec_utils.run_silently(f'{SSH} {username}@{machine}'
664 f' "/bin/rm -f {code_file} {result_file}"')
665 bundle.end_ts = time.time()
666 assert bundle.worker is not None
667 self.status.record_release_worker_already_locked(
672 if not was_cancelled:
673 dur = bundle.end_ts - bundle.start_ts
676 # Original or not, the results should be back on the local
678 if not os.path.exists(result_file):
679 msg = f'{result_file} unexpectedly missing, wtf?!'
681 bundle.failure_count += 1
682 self.release_worker(bundle.worker)
685 # Only the original worker should unpickle the file contents
686 # though since it's the only one whose result matters.
688 logger.debug(f"Unpickling {result_file}.")
690 with open(f'{result_file}', 'rb') as rb:
691 serialized = rb.read()
692 result = cloudpickle.loads(serialized)
693 except Exception as e:
694 msg = f'Failed to load {result_file}'
696 bundle.failure_count += 1
697 self.release_worker(bundle.worker)
699 os.remove(f'{result_file}')
700 os.remove(f'{code_file}')
702 # Notify any backups that the original is done so they
703 # should stop ASAP. Do this whether or not we
704 # finished first since there could be more than one
706 if bundle.backup_bundles is not None:
707 for backup in bundle.backup_bundles:
709 f'Notifying backup {backup.uuid} that it is cancelled'
711 backup.is_cancelled.set()
715 # Backup results don't matter, they just need to leave the
716 # result file in the right place for their original to
720 # Tell the original to stop if we finished first.
721 if not was_cancelled:
723 f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
725 bundle.src_bundle.is_cancelled.set()
727 assert bundle.worker is not None
728 self.release_worker(bundle.worker)
731 def create_original_bundle(self, pickle):
732 uuid = string_utils.generate_uuid(as_hex=True)
733 code_file = f'/tmp/{uuid}.code.bin'
734 result_file = f'/tmp/{uuid}.result.bin'
736 logger.debug(f'Writing pickled code to {code_file}')
737 with open(f'{code_file}', 'wb') as wb:
740 bundle = BundleDetails(
741 pickled_code = pickle,
746 hostname = platform.node(),
747 code_file = code_file,
748 result_file = result_file,
750 start_ts = time.time(),
755 is_cancelled = threading.Event(),
756 was_cancelled = False,
760 self.status.record_bundle_details(bundle)
761 logger.debug(f'Created original bundle {uuid}')
764 def create_backup_bundle(self, src_bundle: BundleDetails):
765 assert src_bundle.backup_bundles is not None
766 n = len(src_bundle.backup_bundles)
767 uuid = src_bundle.uuid + f'_backup#{n}'
769 backup_bundle = BundleDetails(
770 pickled_code = src_bundle.pickled_code,
775 hostname = src_bundle.hostname,
776 code_file = src_bundle.code_file,
777 result_file = src_bundle.result_file,
779 start_ts = time.time(),
783 src_bundle = src_bundle,
784 is_cancelled = threading.Event(),
785 was_cancelled = False,
786 backup_bundles = None, # backup backups not allowed
789 src_bundle.backup_bundles.append(backup_bundle)
790 self.status.record_bundle_details_already_locked(backup_bundle)
791 logger.debug(f'Created backup bundle {uuid}')
794 def schedule_backup_for_bundle(self,
795 src_bundle: BundleDetails):
796 assert self.status.lock.locked()
797 backup_bundle = self.create_backup_bundle(src_bundle)
799 f'Scheduling backup bundle {backup_bundle.uuid} for execution'
801 self._helper_executor.submit(self.launch, backup_bundle)
803 # Results from backups don't matter; if they finish first
804 # they will move the result_file to this machine and let
805 # the original pick them up and unpickle them.
810 **kwargs) -> fut.Future:
811 pickle = make_cloud_pickle(function, *args, **kwargs)
812 bundle = self.create_original_bundle(pickle)
813 self.total_bundles_submitted += 1
815 f'Submitted work to remote executor; {self.total_bundles_submitted} items now submitted'
817 return self._helper_executor.submit(self.launch, bundle)
819 def shutdown(self, wait=True) -> None:
820 self._helper_executor.shutdown(wait)
825 class DefaultExecutors(object):
827 self.thread_executor: Optional[ThreadExecutor] = None
828 self.process_executor: Optional[ProcessExecutor] = None
829 self.remote_executor: Optional[RemoteExecutor] = None
831 def ping(self, host) -> bool:
832 command = ['ping', '-c', '1', host]
833 return subprocess.call(
835 stdout=subprocess.DEVNULL,
836 stderr=subprocess.DEVNULL,
839 def thread_pool(self) -> ThreadExecutor:
840 if self.thread_executor is None:
841 self.thread_executor = ThreadExecutor()
842 return self.thread_executor
844 def process_pool(self) -> ProcessExecutor:
845 if self.process_executor is None:
846 self.process_executor = ProcessExecutor()
847 return self.process_executor
849 def remote_pool(self) -> RemoteExecutor:
850 if self.remote_executor is None:
851 pool: List[RemoteWorkerRecord] = []
852 if self.ping('cheetah.house'):
856 machine = 'cheetah.house',
861 if self.ping('video.house'):
865 machine = 'video.house',
870 if self.ping('wannabe.house'):
874 machine = 'wannabe.house',
879 if self.ping('meerkat.cabin'):
883 machine = 'meerkat.cabin',
888 if self.ping('backup.house'):
892 machine = 'backup.house',
897 if self.ping('puma.cabin'):
901 machine = 'puma.cabin',
906 policy = WeightedRandomRemoteWorkerSelectionPolicy()
907 policy.register_worker_pool(pool)
908 self.remote_executor = RemoteExecutor(pool, policy)
909 return self.remote_executor