3 from __future__ import annotations
5 from abc import ABC, abstractmethod
6 import concurrent.futures as fut
7 from collections import defaultdict
8 from dataclasses import dataclass
17 from typing import Any, Callable, Dict, List, Optional, Set
19 import cloudpickle # type: ignore
21 from ansi import bg, fg, underline, reset
25 from decorator_utils import singleton
29 logger = logging.getLogger(__name__)
31 parser = config.add_commandline_args(
32 f"Executors ({__file__})",
33 "Args related to processing executors."
36 '--executors_threadpool_size',
39 help='Number of threads in the default threadpool, leave unset for default',
43 '--executors_processpool_size',
46 help='Number of processes in the default processpool, leave unset for default',
50 '--executors_schedule_remote_backups',
52 action=argparse_utils.ActionNoYes,
53 help='Should we schedule duplicative backup work if a remote bundle is slow',
56 rsync = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
57 ssh = 'ssh -oForwardX11=no'
60 hist = histogram.SimpleHistogram(
61 histogram.SimpleHistogram.n_evenly_spaced_buckets(
67 def run_local_bundle(fun, *args, **kwargs):
68 logger.debug(f"Running local bundle at {fun.__name__}")
70 result = fun(*args, **kwargs)
72 duration = end - start
73 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
74 hist.add_item(duration)
78 def run_cloud_pickle(pickle):
79 fun, args, kwargs = cloudpickle.loads(pickle)
80 logger.debug(f"Running pickled bundle at {fun.__name__}")
82 result = fun(*args, **kwargs)
84 duration = end - start
85 logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
89 def make_cloud_pickle(fun, *args, **kwargs):
90 logger.info(f"Making cloudpickled bundle at {fun.__name__}")
91 return cloudpickle.dumps((fun, args, kwargs))
94 class BaseExecutor(ABC):
102 **kwargs) -> fut.Future:
107 wait: bool = True) -> None:
111 class ThreadExecutor(BaseExecutor):
113 max_workers: Optional[int] = None):
116 if max_workers is not None:
117 workers = max_workers
118 elif 'executors_threadpool_size' in config.config:
119 workers = config.config['executors_threadpool_size']
120 logger.debug(f'Creating threadpool executor with {workers} workers')
121 self._thread_pool_executor = fut.ThreadPoolExecutor(
123 thread_name_prefix="thread_executor_helper"
130 **kwargs) -> fut.Future:
133 f'Submitted work to threadpool; there are now {self.job_count} items.'
136 newargs.append(function)
139 return self._thread_pool_executor.submit(
145 wait = True) -> None:
146 logger.debug("Shutting down threadpool executor.")
148 self._thread_pool_executor.shutdown(wait)
151 class ProcessExecutor(BaseExecutor):
156 if max_workers is not None:
157 workers = max_workers
158 elif 'executors_processpool_size' in config.config:
159 workers = config.config['executors_processpool_size']
160 logger.debug(f'Creating processpool executor with {workers} workers.')
161 self._process_executor = fut.ProcessPoolExecutor(
169 **kwargs) -> fut.Future:
170 # Bundle it up before submitting because pickle sucks.
171 pickle = make_cloud_pickle(function, *args, **kwargs)
174 f'Submitting work to processpool executor; there are now {self.job_count} items.'
176 return self._process_executor.submit(run_cloud_pickle, pickle)
178 def shutdown(self, wait=True) -> None:
179 logger.debug('Shutting down processpool executor')
181 self._process_executor.shutdown(wait)
185 class RemoteWorkerRecord:
192 return hash((self.username, self.machine))
195 return f'{self.username}@{self.machine}'
202 worker: Optional[RemoteWorkerRecord]
203 username: Optional[str]
204 machine: Optional[str]
213 src_bundle: BundleDetails
214 is_cancelled: threading.Event
216 backup_bundles: Optional[List[BundleDetails]]
219 class RemoteExecutorStatus:
220 def __init__(self, total_worker_count: int) -> None:
221 self.worker_count = total_worker_count
222 self.known_workers: Set[RemoteWorkerRecord] = set()
223 self.start_per_bundle: Dict[str, float] = defaultdict(float)
224 self.end_per_bundle: Dict[str, float] = defaultdict(float)
225 self.finished_bundle_timings_per_worker: Dict[
229 self.in_flight_bundles_by_worker: Dict[
233 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
234 self.finished_bundle_timings: List[float] = []
235 self.last_periodic_dump: Optional[float] = None
236 self.total_bundles_submitted = 0
238 # Protects reads and modification using self. Also used
239 # as a memory fence for modifications to bundle.
240 self.lock = threading.Lock()
242 def record_acquire_worker(
244 worker: RemoteWorkerRecord,
248 self.record_acquire_worker_already_locked(
253 def record_acquire_worker_already_locked(
255 worker: RemoteWorkerRecord,
258 assert self.lock.locked()
259 self.known_workers.add(worker)
260 self.start_per_bundle[uuid] = time.time()
261 x = self.in_flight_bundles_by_worker.get(worker, set())
263 self.in_flight_bundles_by_worker[worker] = x
265 def record_bundle_details(
267 details: BundleDetails) -> None:
269 self.record_bundle_details_already_locked(details)
271 def record_bundle_details_already_locked(
273 details: BundleDetails) -> None:
274 assert self.lock.locked()
275 self.bundle_details_by_uuid[details.uuid] = details
277 def record_release_worker_already_locked(
279 worker: RemoteWorkerRecord,
283 assert self.lock.locked()
285 self.end_per_bundle[uuid] = ts
286 self.in_flight_bundles_by_worker[worker].remove(uuid)
287 if not was_cancelled:
288 bundle_latency = ts - self.start_per_bundle[uuid]
289 x = self.finished_bundle_timings_per_worker.get(worker, list())
290 x.append(bundle_latency)
291 self.finished_bundle_timings_per_worker[worker] = x
292 self.finished_bundle_timings.append(bundle_latency)
294 def total_in_flight(self) -> int:
295 assert self.lock.locked()
297 for worker in self.known_workers:
298 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
299 return total_in_flight
301 def total_idle(self) -> int:
302 assert self.lock.locked()
303 return self.worker_count - self.total_in_flight()
306 assert self.lock.locked()
308 total_finished = len(self.finished_bundle_timings)
309 total_in_flight = self.total_in_flight()
310 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
312 if len(self.finished_bundle_timings) > 1:
313 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
315 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
316 f'✅={total_finished}/{self.total_bundles_submitted}, '
317 f'💻n={total_in_flight}/{self.worker_count}\n'
321 f' ✅={total_finished}/{self.total_bundles_submitted}, '
322 f'💻n={total_in_flight}/{self.worker_count}\n'
325 for worker in self.known_workers:
326 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
327 timings = self.finished_bundle_timings_per_worker.get(worker, [])
331 qworker = numpy.quantile(timings, [0.5, 0.95])
332 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
336 ret += f' ...finished {count} total bundle(s) so far\n'
337 in_flight = len(self.in_flight_bundles_by_worker[worker])
339 ret += f' ...{in_flight} bundles currently in flight:\n'
340 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
341 details = self.bundle_details_by_uuid.get(
345 pid = str(details.pid) if details is not None else "TBD"
346 sec = ts - self.start_per_bundle[bundle_uuid]
347 ret += f' (pid={pid}): {bundle_uuid} for {sec:.1f}s so far '
348 if qworker is not None:
350 ret += f'{bg("red")}>💻p95{reset()} '
351 elif sec > qworker[0]:
352 ret += f'{fg("red")}>💻p50{reset()} '
354 if sec > qall[1] * 1.5:
355 ret += f'{bg("red")}!!!{reset()}'
356 if details is not None:
357 logger.debug(f'Flagging {details.uuid} for another backup')
358 details.super_slow = True
360 ret += f'{bg("red")}>∀p95{reset()} '
361 if details is not None:
362 logger.debug(f'Flagging {details.uuid} for a backup')
363 details.too_slow = True
365 ret += f'{fg("red")}>∀p50{reset()}'
369 def periodic_dump(self, total_bundles_submitted: int) -> None:
370 assert self.lock.locked()
371 self.total_bundles_submitted = total_bundles_submitted
374 self.last_periodic_dump is None
375 or ts - self.last_periodic_dump > 5.0
378 self.last_periodic_dump = ts
381 class RemoteWorkerSelectionPolicy(ABC):
382 def register_worker_pool(self, workers):
384 self.workers = workers
387 def is_worker_available(self) -> bool:
393 machine_to_avoid = None
394 ) -> Optional[RemoteWorkerRecord]:
398 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
399 def is_worker_available(self) -> bool:
400 for worker in self.workers:
407 machine_to_avoid = None
408 ) -> Optional[RemoteWorkerRecord]:
410 for worker in self.workers:
411 for x in range(0, worker.count):
412 for y in range(0, worker.weight):
413 grabbag.append(worker)
415 for _ in range(0, 5):
416 random.shuffle(grabbag)
418 if worker.machine != machine_to_avoid or _ > 2:
421 logger.debug(f'Selected worker {worker}')
423 logger.warning("Couldn't find a worker; go fish.")
427 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
428 def __init__(self) -> None:
431 def is_worker_available(self) -> bool:
432 for worker in self.workers:
439 machine_to_avoid: str = None
440 ) -> Optional[RemoteWorkerRecord]:
443 worker = self.workers[x]
447 if x >= len(self.workers):
450 logger.debug(f'Selected worker {worker}')
453 if x >= len(self.workers):
456 logger.warning("Couldn't find a worker; go fish.")
460 class RemoteExecutor(BaseExecutor):
462 workers: List[RemoteWorkerRecord],
463 policy: RemoteWorkerSelectionPolicy) -> None:
465 self.workers = workers
466 self.worker_count = 0
467 for worker in self.workers:
468 self.worker_count += worker.count
469 if self.worker_count <= 0:
470 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
474 self.policy.register_worker_pool(self.workers)
475 self.cv = threading.Condition()
476 self._helper_executor = fut.ThreadPoolExecutor(
477 thread_name_prefix="remote_executor_helper",
478 max_workers=self.worker_count,
480 self.status = RemoteExecutorStatus(self.worker_count)
481 self.total_bundles_submitted = 0
483 f'Creating remote processpool with {self.worker_count} remote endpoints.'
486 def is_worker_available(self) -> bool:
487 return self.policy.is_worker_available()
491 machine_to_avoid: str = None
492 ) -> Optional[RemoteWorkerRecord]:
493 return self.policy.acquire_worker(machine_to_avoid)
495 def find_available_worker_or_block(
497 machine_to_avoid: str = None
498 ) -> RemoteWorkerRecord:
500 while not self.is_worker_available():
502 worker = self.acquire_worker(machine_to_avoid)
503 if worker is not None:
505 msg = "We should never reach this point in the code"
509 def release_worker(self, worker: RemoteWorkerRecord) -> None:
510 logger.debug(f'Released worker {worker}')
515 def heartbeat(self) -> None:
516 with self.status.lock:
517 # Regular progress report
518 self.status.periodic_dump(self.total_bundles_submitted)
520 # Look for bundles to reschedule
521 if len(self.status.finished_bundle_timings) > 7:
522 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
523 for uuid in bundle_uuids:
524 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
526 bundle is not None and
528 bundle.src_bundle is None and
529 config.config['executors_schedule_remote_backups']
531 self.consider_backup_for_bundle(bundle)
533 def consider_backup_for_bundle(self, bundle: BundleDetails) -> None:
534 assert self.status.lock.locked()
537 and len(bundle.backup_bundles) == 0 # one backup per
539 msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
541 self.schedule_backup_for_bundle(bundle)
545 and len(bundle.backup_bundles) < 2 # two backups in dire situations
546 and self.status.total_idle() > 4
548 msg = f"*** Rescheduling {bundle.pid}/{bundle.uuid} ***"
550 self.schedule_backup_for_bundle(bundle)
553 def check_if_cancelled(self, bundle: BundleDetails) -> bool:
554 with self.status.lock:
555 if bundle.is_cancelled.wait(timeout=0.0):
556 logger.debug(f'Bundle {bundle.uuid} is cancelled, bail out.')
557 bundle.was_cancelled = True
561 def launch(self, bundle: BundleDetails) -> Any:
562 # Find a worker for bundle or block until one is available.
564 hostname = bundle.hostname
566 if bundle.src_bundle is not None:
567 avoid_machine = bundle.src_bundle.machine
569 while worker is None:
570 worker = self.find_available_worker_or_block(avoid_machine)
571 bundle.worker = worker
572 machine = bundle.machine = worker.machine
573 username = bundle.username = worker.username
574 self.status.record_acquire_worker(worker, uuid)
575 logger.debug(f'Running bundle {uuid} on {worker}...')
577 # Before we do work, make sure it's still viable.
578 if self.check_if_cancelled(bundle):
579 return self.post_launch_work(bundle)
581 # Send input to machine if it's not local.
582 if hostname not in machine:
583 cmd = f'{rsync} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
584 logger.debug(f"Copying work to {worker} via {cmd}")
585 exec_utils.run_silently(cmd)
587 # Before we do more work, make sure it's still viable.
588 if self.check_if_cancelled(bundle):
589 return self.post_launch_work(bundle)
591 # Fucking Apple has a python3 binary in /usr/sbin that is not
592 # the one we want and is protected by the OS so make sure that
593 # /usr/local/bin is early in the path.
594 cmd = (f'{ssh} {bundle.username}@{bundle.machine} '
595 f'"export PATH=/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/local/sbin:/home/scott/bin:/home/scott/.local/bin; /home/scott/lib/python_modules/remote_worker.py'
596 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
597 p = exec_utils.cmd_in_background(cmd, silent=True)
598 bundle.pid = pid = p.pid
599 logger.debug(f"Running {cmd} in the background as process {pid}")
604 except subprocess.TimeoutExpired:
607 # Both source and backup bundles can be cancelled by
608 # the other depending on which finishes first.
609 if self.check_if_cancelled(bundle):
614 f"{pid}/{bundle.uuid} has finished its work normally."
617 return self.post_launch_work(bundle)
619 def post_launch_work(self, bundle: BundleDetails) -> Any:
620 with self.status.lock:
621 is_original = bundle.src_bundle is None
622 was_cancelled = bundle.was_cancelled
623 username = bundle.username
624 machine = bundle.machine
625 result_file = bundle.result_file
626 code_file = bundle.code_file
628 # Whether original or backup, if we finished first we must
629 # fetch the results if the computation happened on a
631 if not was_cancelled:
632 assert bundle.machine is not None
633 if bundle.hostname not in bundle.machine:
634 cmd = f'{rsync} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
636 f"Fetching results from {username}@{machine} via {cmd}"
639 exec_utils.run_silently(cmd)
640 except subprocess.CalledProcessError:
642 exec_utils.run_silently(f'{ssh} {username}@{machine}'
643 f' "/bin/rm -f {code_file} {result_file}"')
644 bundle.end_ts = time.time()
645 assert bundle.worker is not None
646 self.status.record_release_worker_already_locked(
651 if not was_cancelled:
652 dur = bundle.end_ts - bundle.start_ts
655 # Only the original worker should unpickle the file contents
656 # though since it's the only one whose result matters.
658 logger.debug(f"Unpickling {result_file}.")
659 with open(f'{result_file}', 'rb') as rb:
660 serialized = rb.read()
661 result = cloudpickle.loads(serialized)
662 os.remove(f'{result_file}')
663 os.remove(f'{code_file}')
665 # Notify any backups that the original is done so they
666 # should stop ASAP. Do this whether or not we
667 # finished first since there could be more than one
669 if bundle.backup_bundles is not None:
670 for backup in bundle.backup_bundles:
672 f'Notifying backup {backup.uuid} that it is cancelled'
674 backup.is_cancelled.set()
678 # Backup results don't matter, they just need to leave the
679 # result file in the right place for their original to
683 # Tell the original to stop if we finished first.
684 if not was_cancelled:
686 f'Notifying original {bundle.src_bundle.uuid} that it is cancelled'
688 bundle.src_bundle.is_cancelled.set()
690 assert bundle.worker is not None
691 self.release_worker(bundle.worker)
694 def create_original_bundle(self, pickle):
695 uuid = string_utils.generate_uuid(as_hex=True)
696 code_file = f'/tmp/{uuid}.code.bin'
697 result_file = f'/tmp/{uuid}.result.bin'
699 logger.debug(f'Writing pickled code to {code_file}')
700 with open(f'{code_file}', 'wb') as wb:
703 bundle = BundleDetails(
704 pickled_code = pickle,
709 hostname = platform.node(),
710 code_file = code_file,
711 result_file = result_file,
713 start_ts = time.time(),
718 is_cancelled = threading.Event(),
719 was_cancelled = False,
722 self.status.record_bundle_details(bundle)
723 logger.debug(f'Created original bundle {uuid}')
726 def create_backup_bundle(self, src_bundle: BundleDetails):
727 assert src_bundle.backup_bundles is not None
728 n = len(src_bundle.backup_bundles)
729 uuid = src_bundle.uuid + f'_backup#{n}'
731 backup_bundle = BundleDetails(
732 pickled_code = src_bundle.pickled_code,
737 hostname = src_bundle.hostname,
738 code_file = src_bundle.code_file,
739 result_file = src_bundle.result_file,
741 start_ts = time.time(),
745 src_bundle = src_bundle,
746 is_cancelled = threading.Event(),
747 was_cancelled = False,
748 backup_bundles = None, # backup backups not allowed
750 src_bundle.backup_bundles.append(backup_bundle)
751 self.status.record_bundle_details_already_locked(backup_bundle)
752 logger.debug(f'Created backup bundle {uuid}')
755 def schedule_backup_for_bundle(self,
756 src_bundle: BundleDetails):
757 assert self.status.lock.locked()
758 backup_bundle = self.create_backup_bundle(src_bundle)
760 f'Scheduling backup bundle {backup_bundle.uuid} for execution'
762 self._helper_executor.submit(self.launch, backup_bundle)
764 # Results from backups don't matter; if they finish first
765 # they will move the result_file to this machine and let
766 # the original pick them up and unpickle them.
771 **kwargs) -> fut.Future:
772 pickle = make_cloud_pickle(function, *args, **kwargs)
773 bundle = self.create_original_bundle(pickle)
774 self.total_bundles_submitted += 1
776 f'Submitted work to remote executor; {self.total_bundles_submitted} items now submitted'
778 return self._helper_executor.submit(self.launch, bundle)
780 def shutdown(self, wait=True) -> None:
781 self._helper_executor.shutdown(wait)
786 class DefaultExecutors(object):
788 self.thread_executor: Optional[ThreadExecutor] = None
789 self.process_executor: Optional[ProcessExecutor] = None
790 self.remote_executor: Optional[RemoteExecutor] = None
792 def ping(self, host) -> bool:
793 command = ['ping', '-c', '1', host]
794 return subprocess.call(
796 stdout=subprocess.DEVNULL,
797 stderr=subprocess.DEVNULL,
800 def thread_pool(self) -> ThreadExecutor:
801 if self.thread_executor is None:
802 self.thread_executor = ThreadExecutor()
803 return self.thread_executor
805 def process_pool(self) -> ProcessExecutor:
806 if self.process_executor is None:
807 self.process_executor = ProcessExecutor()
808 return self.process_executor
810 def remote_pool(self) -> RemoteExecutor:
811 if self.remote_executor is None:
812 pool: List[RemoteWorkerRecord] = []
813 if self.ping('cheetah.house'):
817 machine = 'cheetah.house',
822 if self.ping('video.house'):
826 machine = 'video.house',
831 if self.ping('wannabe.house'):
835 machine = 'wannabe.house',
840 if self.ping('meerkat.cabin'):
844 machine = 'meerkat.cabin',
849 if self.ping('backup.house'):
853 machine = 'backup.house',
858 if self.ping('puma.cabin'):
862 machine = 'puma.cabin',
867 policy = WeightedRandomRemoteWorkerSelectionPolicy()
868 policy.register_worker_pool(pool)
869 self.remote_executor = RemoteExecutor(pool, policy)
870 return self.remote_executor