2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
14 from __future__ import annotations
16 import concurrent.futures as fut
25 from abc import ABC, abstractmethod
26 from collections import defaultdict
27 from dataclasses import dataclass, fields
28 from typing import Any, Callable, Dict, List, Optional, Set
30 import cloudpickle # type: ignore
32 from overrides import overrides
34 import pyutils.typez.histogram as hist
35 from pyutils import argparse_utils, config, persistent, string_utils
36 from pyutils.ansi import bg, fg, reset, underline
37 from pyutils.decorator_utils import singleton
38 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
39 from pyutils.parallelize.thread_utils import background_thread
41 logger = logging.getLogger(__name__)
43 parser = config.add_commandline_args(
44 f"Executors ({__file__})", "Args related to processing executors."
47 '--executors_threadpool_size',
50 help='Number of threads in the default threadpool, leave unset for default',
54 '--executors_processpool_size',
57 help='Number of processes in the default processpool, leave unset for default',
61 '--executors_schedule_remote_backups',
63 action=argparse_utils.ActionNoYes,
64 help='Should we schedule duplicative backup work if a remote bundle is slow',
67 '--executors_max_bundle_failures',
71 help='Maximum number of failures before giving up on a bundle',
74 '--remote_worker_records_file',
77 help='Path of the remote worker records file (JSON)',
78 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
82 SSH = '/usr/bin/ssh -oForwardX11=no'
83 SCP = '/usr/bin/scp -C'
86 def _make_cloud_pickle(fun, *args, **kwargs):
87 """Internal helper to create cloud pickles."""
88 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
89 return cloudpickle.dumps((fun, args, kwargs))
92 class BaseExecutor(ABC):
93 """The base executor interface definition. The interface for
94 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
95 :class:`ThreadExecutor`.
98 def __init__(self, *, title=''):
100 self.histogram = hist.SimpleHistogram(
101 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
106 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
110 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
113 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
114 """Shutdown the executor and return True if the executor is idle
115 (i.e. there are no pending or active tasks). Return False
116 otherwise. Note: this should only be called by the launcher
120 if self.task_count == 0:
121 self.shutdown(wait=True, quiet=quiet)
125 def adjust_task_count(self, delta: int) -> None:
126 """Change the task count. Note: do not call this method from a
127 worker, it should only be called by the launcher process /
131 self.task_count += delta
132 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
134 def get_task_count(self) -> int:
135 """Change the task count. Note: do not call this method from a
136 worker, it should only be called by the launcher process /
140 return self.task_count
143 class ThreadExecutor(BaseExecutor):
144 """A threadpool executor. This executor uses python threads to
145 schedule tasks. Note that, at least as of python3.10, because of
146 the global lock in the interpreter itself, these do not
147 parallelize very well so this class is useful mostly for non-CPU
150 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
153 def __init__(self, max_workers: Optional[int] = None):
156 if max_workers is not None:
157 workers = max_workers
158 elif 'executors_threadpool_size' in config.config:
159 workers = config.config['executors_threadpool_size']
160 if workers is not None:
161 logger.debug('Creating threadpool executor with %d workers', workers)
163 logger.debug('Creating a default sized threadpool executor')
164 self._thread_pool_executor = fut.ThreadPoolExecutor(
165 max_workers=workers, thread_name_prefix="thread_executor_helper"
167 self.already_shutdown = False
169 # This is run on a different thread; do not adjust task count here.
171 def run_local_bundle(fun, *args, **kwargs):
172 logger.debug("Running local bundle at %s", fun.__name__)
173 result = fun(*args, **kwargs)
177 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
178 if self.already_shutdown:
179 raise Exception('Submitted work after shutdown.')
180 self.adjust_task_count(+1)
182 newargs.append(function)
186 result = self._thread_pool_executor.submit(
187 ThreadExecutor.run_local_bundle, *newargs, **kwargs
189 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
190 result.add_done_callback(lambda _: self.adjust_task_count(-1))
194 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
195 if not self.already_shutdown:
196 logger.debug('Shutting down threadpool executor %s', self.title)
197 self._thread_pool_executor.shutdown(wait)
199 print(self.histogram.__repr__(label_formatter='%ds'))
200 self.already_shutdown = True
203 class ProcessExecutor(BaseExecutor):
204 """An executor which runs tasks in child processes.
206 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
209 def __init__(self, max_workers=None):
212 if max_workers is not None:
213 workers = max_workers
214 elif 'executors_processpool_size' in config.config:
215 workers = config.config['executors_processpool_size']
216 if workers is not None:
217 logger.debug('Creating processpool executor with %d workers.', workers)
219 logger.debug('Creating a default sized processpool executor')
220 self._process_executor = fut.ProcessPoolExecutor(
223 self.already_shutdown = False
225 # This is run in another process; do not adjust task count here.
227 def run_cloud_pickle(pickle):
228 fun, args, kwargs = cloudpickle.loads(pickle)
229 logger.debug("Running pickled bundle at %s", fun.__name__)
230 result = fun(*args, **kwargs)
234 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
235 if self.already_shutdown:
236 raise Exception('Submitted work after shutdown.')
238 self.adjust_task_count(+1)
239 pickle = _make_cloud_pickle(function, *args, **kwargs)
240 result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
241 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
242 result.add_done_callback(lambda _: self.adjust_task_count(-1))
246 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
247 if not self.already_shutdown:
248 logger.debug('Shutting down processpool executor %s', self.title)
249 self._process_executor.shutdown(wait)
251 print(self.histogram.__repr__(label_formatter='%ds'))
252 self.already_shutdown = True
254 def __getstate__(self):
255 state = self.__dict__.copy()
256 state['_process_executor'] = None
260 class RemoteExecutorException(Exception):
261 """Thrown when a bundle cannot be executed despite several retries."""
267 class RemoteWorkerRecord:
268 """A record of info about a remote worker."""
271 """Username we can ssh into on this machine to run work."""
274 """Machine address / name."""
277 """Relative probability for the weighted policy to select this
278 machine for scheduling work."""
281 """If this machine is selected, what is the maximum number of task
282 that it can handle?"""
285 return hash((self.username, self.machine))
288 return f'{self.username}@{self.machine}'
293 """All info necessary to define some unit of work that needs to be
294 done, where it is being run, its state, whether it is an original
295 bundle of a backup bundle, how many times it has failed, etc...
299 """The code to run, cloud pickled"""
302 """A unique identifier"""
305 """The name of the function we pickled"""
307 worker: Optional[RemoteWorkerRecord]
308 """The remote worker running this bundle or None if none (yet)"""
310 username: Optional[str]
311 """The remote username running this bundle or None if none (yet)"""
313 machine: Optional[str]
314 """The remote machine running this bundle or None if none (yet)"""
317 """The controller machine"""
320 """A unique filename to hold the work to be done"""
323 """Where the results should be placed / read from"""
326 """The process id of the local subprocess watching the ssh connection
327 to the remote machine"""
335 slower_than_local_p95: bool
336 """Currently slower then 95% of other bundles on remote host"""
338 slower_than_global_p95: bool
339 """Currently slower than 95% of other bundles globally"""
341 src_bundle: Optional[BundleDetails]
342 """If this is a backup bundle, this points to the original bundle
343 that it's backing up. None otherwise."""
345 is_cancelled: threading.Event
346 """An event that can be signaled to indicate this bundle is cancelled.
347 This is set when another copy (backup or original) of this work has
348 completed successfully elsewhere."""
351 """True if this bundle was cancelled, False if it finished normally"""
353 backup_bundles: Optional[List[BundleDetails]]
354 """If we've created backups of this bundle, this is the list of them"""
357 """How many times has this bundle failed already?"""
361 if uuid[-9:-2] == '_backup':
363 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
367 # We colorize the uuid based on some bits from it to make them
368 # stand out in the logging and help a reader correlate log messages
369 # related to the same bundle.
376 fg('marigold yellow'),
379 fg('cornflower blue'),
380 fg('turquoise blue'),
382 fg('lavender purple'),
385 c = colorz[int(uuid[-2:], 16) % len(colorz)]
387 self.function_name if self.function_name is not None else 'nofname'
389 machine = self.machine if self.machine is not None else 'nomachine'
390 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
393 class RemoteExecutorStatus:
394 """A status 'scoreboard' for a remote executor tracking various
395 metrics and able to render a periodic dump of global state.
398 def __init__(self, total_worker_count: int) -> None:
402 total_worker_count: number of workers in the pool
405 self.worker_count: int = total_worker_count
406 self.known_workers: Set[RemoteWorkerRecord] = set()
407 self.start_time: float = time.time()
408 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
409 self.end_per_bundle: Dict[str, float] = defaultdict(float)
410 self.finished_bundle_timings_per_worker: Dict[
411 RemoteWorkerRecord, List[float]
413 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
414 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
415 self.finished_bundle_timings: List[float] = []
416 self.last_periodic_dump: Optional[float] = None
417 self.total_bundles_submitted: int = 0
419 # Protects reads and modification using self. Also used
420 # as a memory fence for modifications to bundle.
421 self.lock: threading.Lock = threading.Lock()
423 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
424 """Record that bundle with uuid is assigned to a particular worker.
427 worker: the record of the worker to which uuid is assigned
428 uuid: the uuid of a bundle that has been assigned to a worker
431 self.record_acquire_worker_already_locked(worker, uuid)
433 def record_acquire_worker_already_locked(
434 self, worker: RemoteWorkerRecord, uuid: str
436 """Same as above but an entry point that doesn't acquire the lock
437 for codepaths where it's already held."""
438 assert self.lock.locked()
439 self.known_workers.add(worker)
440 self.start_per_bundle[uuid] = None
441 x = self.in_flight_bundles_by_worker.get(worker, set())
443 self.in_flight_bundles_by_worker[worker] = x
445 def record_bundle_details(self, details: BundleDetails) -> None:
446 """Register the details about a bundle of work."""
448 self.record_bundle_details_already_locked(details)
450 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
451 """Same as above but for codepaths that already hold the lock."""
452 assert self.lock.locked()
453 self.bundle_details_by_uuid[details.uuid] = details
455 def record_release_worker(
457 worker: RemoteWorkerRecord,
461 """Record that a bundle has released a worker."""
463 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
465 def record_release_worker_already_locked(
467 worker: RemoteWorkerRecord,
471 """Same as above but for codepaths that already hold the lock."""
472 assert self.lock.locked()
474 self.end_per_bundle[uuid] = ts
475 self.in_flight_bundles_by_worker[worker].remove(uuid)
476 if not was_cancelled:
477 start = self.start_per_bundle[uuid]
478 assert start is not None
479 bundle_latency = ts - start
480 x = self.finished_bundle_timings_per_worker.get(worker, [])
481 x.append(bundle_latency)
482 self.finished_bundle_timings_per_worker[worker] = x
483 self.finished_bundle_timings.append(bundle_latency)
485 def record_processing_began(self, uuid: str):
486 """Record when work on a bundle begins."""
488 self.start_per_bundle[uuid] = time.time()
490 def total_in_flight(self) -> int:
491 """How many bundles are in flight currently?"""
492 assert self.lock.locked()
494 for worker in self.known_workers:
495 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
496 return total_in_flight
498 def total_idle(self) -> int:
499 """How many idle workers are there currently?"""
500 assert self.lock.locked()
501 return self.worker_count - self.total_in_flight()
504 assert self.lock.locked()
506 total_finished = len(self.finished_bundle_timings)
507 total_in_flight = self.total_in_flight()
508 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
510 if len(self.finished_bundle_timings) > 1:
511 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
513 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
514 f'✅={total_finished}/{self.total_bundles_submitted}, '
515 f'💻n={total_in_flight}/{self.worker_count}\n'
519 f'⏱={ts-self.start_time:.1f}s, '
520 f'✅={total_finished}/{self.total_bundles_submitted}, '
521 f'💻n={total_in_flight}/{self.worker_count}\n'
524 for worker in self.known_workers:
525 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
526 timings = self.finished_bundle_timings_per_worker.get(worker, [])
530 qworker = numpy.quantile(timings, [0.5, 0.95])
531 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
535 ret += f' ...finished {count} total bundle(s) so far\n'
536 in_flight = len(self.in_flight_bundles_by_worker[worker])
538 ret += f' ...{in_flight} bundles currently in flight:\n'
539 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
540 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
541 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
542 if self.start_per_bundle[bundle_uuid] is not None:
543 sec = ts - self.start_per_bundle[bundle_uuid]
544 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
546 ret += f' {details} setting up / copying data...'
549 if qworker is not None:
551 ret += f'{bg("red")}>💻p95{reset()} '
552 if details is not None:
553 details.slower_than_local_p95 = True
555 if details is not None:
556 details.slower_than_local_p95 = False
560 ret += f'{bg("red")}>∀p95{reset()} '
561 if details is not None:
562 details.slower_than_global_p95 = True
564 details.slower_than_global_p95 = False
568 def periodic_dump(self, total_bundles_submitted: int) -> None:
569 assert self.lock.locked()
570 self.total_bundles_submitted = total_bundles_submitted
572 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
574 self.last_periodic_dump = ts
577 class RemoteWorkerSelectionPolicy(ABC):
578 """A policy for selecting a remote worker base class."""
581 self.workers: Optional[List[RemoteWorkerRecord]] = None
583 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
584 self.workers = workers
587 def is_worker_available(self) -> bool:
591 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
595 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
596 """A remote worker selector that uses weighted RNG."""
599 def is_worker_available(self) -> bool:
601 for worker in self.workers:
607 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
610 for worker in self.workers:
611 if worker.machine != machine_to_avoid:
613 for _ in range(worker.count * worker.weight):
614 grabbag.append(worker)
616 if len(grabbag) == 0:
618 'There are no available workers that avoid %s', machine_to_avoid
621 for worker in self.workers:
623 for _ in range(worker.count * worker.weight):
624 grabbag.append(worker)
626 if len(grabbag) == 0:
627 logger.warning('There are no available workers?!')
630 worker = random.sample(grabbag, 1)[0]
631 assert worker.count > 0
633 logger.debug('Selected worker %s', worker)
637 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
638 """A remote worker selector that just round robins."""
640 def __init__(self) -> None:
645 def is_worker_available(self) -> bool:
647 for worker in self.workers:
654 self, machine_to_avoid: str = None
655 ) -> Optional[RemoteWorkerRecord]:
659 worker = self.workers[x]
663 if x >= len(self.workers):
666 logger.debug('Selected worker %s', worker)
669 if x >= len(self.workers):
672 logger.warning('Unexpectedly could not find a worker, retrying...')
677 class RemoteExecutor(BaseExecutor):
678 """An executor that uses processes on remote machines to do work. This
679 works by creating "bundles" of work with pickled code in each to be
680 executed. Each bundle is assigned a remote worker based on some policy
681 heuristics. Once assigned to a remote worker, a local subprocess is
682 created. It copies the pickled code to the remote machine via ssh/scp
683 and then starts up work on the remote machine again using ssh. When
684 the work is complete it copies the results back to the local machine.
686 So there is essentially one "controller" machine (which may also be
687 in the remote executor pool and therefore do task work in addition to
688 controlling) and N worker machines. This code runs on the controller
689 whereas on the worker machines we invoke pickled user code via a
690 shim in :file:`remote_worker.py`.
692 Some redundancy and safety provisions are made; e.g. slower than
693 expected tasks have redundant backups created and if a task fails
694 repeatedly we consider it poisoned and give up on it.
698 The network overhead / latency of copying work from the
699 controller machine to the remote workers is relatively high.
700 This executor probably only makes sense to use with
701 computationally expensive tasks such as jobs that will execute
702 for ~30 seconds or longer.
704 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
709 workers: List[RemoteWorkerRecord],
710 policy: RemoteWorkerSelectionPolicy,
715 workers: A list of remote workers we can call on to do tasks.
716 policy: A policy for selecting remote workers for tasks.
720 self.workers = workers
722 self.worker_count = 0
723 for worker in self.workers:
724 self.worker_count += worker.count
725 if self.worker_count <= 0:
726 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
728 raise RemoteExecutorException(msg)
729 self.policy.register_worker_pool(self.workers)
730 self.cv = threading.Condition()
732 'Creating %d local threads, one per remote worker.', self.worker_count
734 self._helper_executor = fut.ThreadPoolExecutor(
735 thread_name_prefix="remote_executor_helper",
736 max_workers=self.worker_count,
738 self.status = RemoteExecutorStatus(self.worker_count)
739 self.total_bundles_submitted = 0
740 self.backup_lock = threading.Lock()
741 self.last_backup = None
743 self.heartbeat_thread,
744 self.heartbeat_stop_event,
745 ) = self._run_periodic_heartbeat()
746 self.already_shutdown = False
749 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
751 We create a background thread to invoke :meth:`_heartbeat` regularly
752 while we are scheduling work. It does some accounting such as
753 looking for slow bundles to tag for backup creation, checking for
754 unexpected failures, and printing a fancy message on stdout.
756 while not stop_event.is_set():
758 logger.debug('Running periodic heartbeat code...')
760 logger.debug('Periodic heartbeat thread shutting down.')
762 def _heartbeat(self) -> None:
763 # Note: this is invoked on a background thread, not an
764 # executor thread. Be careful what you do with it b/c it
765 # needs to get back and dump status again periodically.
766 with self.status.lock:
767 self.status.periodic_dump(self.total_bundles_submitted)
769 # Look for bundles to reschedule via executor.submit
770 if config.config['executors_schedule_remote_backups']:
771 self._maybe_schedule_backup_bundles()
773 def _maybe_schedule_backup_bundles(self):
774 """Maybe schedule backup bundles if we see a very slow bundle."""
776 assert self.status.lock.locked()
777 num_done = len(self.status.finished_bundle_timings)
778 num_idle_workers = self.worker_count - self.task_count
782 and num_idle_workers > 0
783 and (self.last_backup is None or (now - self.last_backup > 9.0))
784 and self.backup_lock.acquire(blocking=False)
787 assert self.backup_lock.locked()
789 bundle_to_backup = None
794 ) in self.status.in_flight_bundles_by_worker.items():
796 # Prefer to schedule backups of bundles running on
799 for record in self.workers:
800 if worker.machine == record.machine:
801 base_score = float(record.weight)
802 base_score = 1.0 / base_score
804 base_score = int(base_score)
807 for uuid in bundle_uuids:
808 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
811 and bundle.src_bundle is None
812 and bundle.backup_bundles is not None
816 # Schedule backups of bundles running
817 # longer; especially those that are
819 start_ts = self.status.start_per_bundle[uuid]
820 if start_ts is not None:
821 runtime = now - start_ts
824 'score[%s] => %.1f # latency boost', bundle, score
827 if bundle.slower_than_local_p95:
830 'score[%s] => %.1f # >worker p95',
835 if bundle.slower_than_global_p95:
838 'score[%s] => %.1f # >global p95',
843 # Prefer backups of bundles that don't
844 # have backups already.
845 backup_count = len(bundle.backup_bundles)
846 if backup_count == 0:
848 elif backup_count == 1:
850 elif backup_count == 2:
855 'score[%s] => %.1f # {backup_count} dup backup factor',
861 best_score is None or score > best_score
863 bundle_to_backup = bundle
864 assert bundle is not None
865 assert bundle.backup_bundles is not None
866 assert bundle.src_bundle is None
869 # Note: this is all still happening on the heartbeat
870 # runner thread. That's ok because
871 # _schedule_backup_for_bundle uses the executor to
872 # submit the bundle again which will cause it to be
873 # picked up by a worker thread and allow this thread
874 # to return to run future heartbeats.
875 if bundle_to_backup is not None:
876 self.last_backup = now
878 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
882 self._schedule_backup_for_bundle(bundle_to_backup)
884 self.backup_lock.release()
886 def _is_worker_available(self) -> bool:
887 """Is there a worker available currently?"""
888 return self.policy.is_worker_available()
891 self, machine_to_avoid: str = None
892 ) -> Optional[RemoteWorkerRecord]:
893 """Try to acquire a worker."""
894 return self.policy.acquire_worker(machine_to_avoid)
896 def _find_available_worker_or_block(
897 self, machine_to_avoid: str = None
898 ) -> RemoteWorkerRecord:
899 """Find a worker or block until one becomes available."""
901 while not self._is_worker_available():
903 worker = self._acquire_worker(machine_to_avoid)
904 if worker is not None:
906 msg = "We should never reach this point in the code"
910 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
911 """Release a previously acquired worker."""
912 worker = bundle.worker
913 assert worker is not None
914 logger.debug('Released worker %s', worker)
915 self.status.record_release_worker(
923 self.adjust_task_count(-1)
925 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
926 """See if a particular bundle is cancelled. Do not block."""
927 with self.status.lock:
928 if bundle.is_cancelled.wait(timeout=0.0):
929 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
930 bundle.was_cancelled = True
934 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
935 """Find a worker for bundle or block until one is available."""
937 self.adjust_task_count(+1)
939 hostname = bundle.hostname
940 avoid_machine = override_avoid_machine
941 is_original = bundle.src_bundle is None
943 # Try not to schedule a backup on the same host as the original.
944 if avoid_machine is None and bundle.src_bundle is not None:
945 avoid_machine = bundle.src_bundle.machine
947 while worker is None:
948 worker = self._find_available_worker_or_block(avoid_machine)
949 assert worker is not None
951 # Ok, found a worker.
952 bundle.worker = worker
953 machine = bundle.machine = worker.machine
954 username = bundle.username = worker.username
955 self.status.record_acquire_worker(worker, uuid)
956 logger.debug('%s: Running bundle on %s...', bundle, worker)
958 # Before we do any work, make sure the bundle is still viable.
959 # It may have been some time between when it was submitted and
960 # now due to lack of worker availability and someone else may
961 # have already finished it.
962 if self._check_if_cancelled(bundle):
964 return self._process_work_result(bundle)
965 except Exception as e:
967 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
969 self._release_worker(bundle)
971 # Weird. We are the original owner of this
972 # bundle. For it to have been cancelled, a backup
973 # must have already started and completed before
974 # we even for started. Moreover, the backup says
975 # it is done but we can't find the results it
976 # should have copied over. Reschedule the whole
980 '%s: We are the original owner thread and yet there are '
981 'no results for this bundle. This is unexpected and bad.',
984 return self._emergency_retry_nasty_bundle(bundle)
986 # We're a backup and our bundle is cancelled
987 # before we even got started. Do nothing and let
988 # the original bundle's thread worry about either
989 # finding the results or complaining about it.
992 # Send input code / data to worker machine if it's not local.
993 if hostname not in machine:
996 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
998 start_ts = time.time()
999 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1001 xfer_latency = time.time() - start_ts
1003 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1005 except Exception as e:
1006 self._release_worker(bundle)
1008 # Weird. We tried to copy the code to the worker
1009 # and it failed... And we're the original bundle.
1013 "%s: Failed to send instructions to the worker machine?! "
1014 "This is not expected; we\'re the original bundle so this shouldn\'t "
1015 "be a race condition. Attempting an emergency retry...",
1018 return self._emergency_retry_nasty_bundle(bundle)
1020 # This is actually expected; we're a backup.
1021 # There's a race condition where someone else
1022 # already finished the work and removed the source
1023 # code_file before we could copy it. Ignore.
1025 '%s: Failed to send instructions to the worker machine... '
1026 'We\'re a backup and this may be caused by the original (or '
1027 'some other backup) already finishing this work. Ignoring.',
1032 # Kick off the work. Note that if this fails we let
1033 # _wait_for_process deal with it.
1034 self.status.record_processing_began(uuid)
1036 f'{SSH} {bundle.username}@{bundle.machine} '
1037 f'"source py39-venv/bin/activate &&'
1038 f' /home/scott/lib/python_modules/remote_worker.py'
1039 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1042 '%s: Executing %s in the background to kick off work...', bundle, cmd
1044 p = cmd_in_background(cmd, silent=True)
1047 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1049 return self._wait_for_process(p, bundle, 0)
1051 def _wait_for_process(
1052 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1054 """At this point we've copied the bundle's pickled code to the remote
1055 worker and started an ssh process that should be invoking the
1056 remote worker to have it execute the user's code. See how
1057 that's going and wait for it to complete or fail. Note that
1058 this code is recursive: there are codepaths where we decide to
1059 stop waiting for an ssh process (because another backup seems
1060 to have finished) but then fail to fetch or parse the results
1061 from that backup and thus call ourselves to continue waiting
1062 on an active ssh process. This is the purpose of the depth
1063 argument: to curtail potential infinite recursion by giving up
1067 p: the Popen record of the ssh job
1068 bundle: the bundle of work being executed remotely
1069 depth: how many retries we've made so far. Starts at zero.
1073 machine = bundle.machine
1074 assert p is not None
1075 pid = p.pid # pid of the ssh process
1078 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1082 self._release_worker(bundle)
1083 return self._emergency_retry_nasty_bundle(bundle)
1085 # Spin until either the ssh job we scheduled finishes the
1086 # bundle or some backup worker signals that they finished it
1090 p.wait(timeout=0.25)
1091 except subprocess.TimeoutExpired:
1092 if self._check_if_cancelled(bundle):
1094 '%s: looks like another worker finished bundle...', bundle
1098 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1102 # If we get here we believe the bundle is done; either the ssh
1103 # subprocess finished (hopefully successfully) or we noticed
1104 # that some other worker seems to have completed the bundle
1105 # before us and we're bailing out.
1107 ret = self._process_work_result(bundle)
1108 if ret is not None and p is not None:
1112 # Something went wrong; e.g. we could not copy the results
1113 # back, cleanup after ourselves on the remote machine, or
1114 # unpickle the results we got from the remove machine. If we
1115 # still have an active ssh subprocess, keep waiting on it.
1116 # Otherwise, time for an emergency reschedule.
1117 except Exception as e:
1119 logger.error('%s: Something unexpected just happened...', bundle)
1122 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1125 return self._wait_for_process(p, bundle, depth + 1)
1127 self._release_worker(bundle)
1128 return self._emergency_retry_nasty_bundle(bundle)
1130 def _process_work_result(self, bundle: BundleDetails) -> Any:
1131 """A bundle seems to be completed. Check on the results."""
1133 with self.status.lock:
1134 is_original = bundle.src_bundle is None
1135 was_cancelled = bundle.was_cancelled
1136 username = bundle.username
1137 machine = bundle.machine
1138 result_file = bundle.result_file
1139 code_file = bundle.code_file
1141 # Whether original or backup, if we finished first we must
1142 # fetch the results if the computation happened on a
1144 bundle.end_ts = time.time()
1145 if not was_cancelled:
1146 assert bundle.machine is not None
1147 if bundle.hostname not in bundle.machine:
1148 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1150 "%s: Fetching results back from %s@%s via %s",
1157 # If either of these throw they are handled in
1158 # _wait_for_process.
1163 except Exception as e:
1170 # Cleanup remote /tmp files.
1172 f'{SSH} {username}@{machine}'
1173 f' "/bin/rm -f {code_file} {result_file}"'
1176 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1178 dur = bundle.end_ts - bundle.start_ts
1179 self.histogram.add_item(dur)
1181 # Only the original worker should unpickle the file contents
1182 # though since it's the only one whose result matters. The
1183 # original is also the only job that may delete result_file
1184 # from disk. Note that the original may have been cancelled
1185 # if one of the backups finished first; it still must read the
1186 # result from disk. It still does that here with is_cancelled
1189 logger.debug("%s: Unpickling %s.", bundle, result_file)
1191 with open(result_file, 'rb') as rb:
1192 serialized = rb.read()
1193 result = cloudpickle.loads(serialized)
1194 except Exception as e:
1196 logger.error('Failed to load %s... this is bad news.', result_file)
1197 self._release_worker(bundle)
1199 # Re-raise the exception; the code in _wait_for_process may
1200 # decide to _emergency_retry_nasty_bundle here.
1202 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1203 os.remove(result_file)
1204 os.remove(code_file)
1206 # Notify any backups that the original is done so they
1207 # should stop ASAP. Do this whether or not we
1208 # finished first since there could be more than one
1210 if bundle.backup_bundles is not None:
1211 for backup in bundle.backup_bundles:
1213 '%s: Notifying backup %s that it\'s cancelled',
1217 backup.is_cancelled.set()
1219 # This is a backup job and, by now, we have already fetched
1220 # the bundle results.
1222 # Backup results don't matter, they just need to leave the
1223 # result file in the right place for their originals to
1224 # read/unpickle later.
1227 # Tell the original to stop if we finished first.
1228 if not was_cancelled:
1229 orig_bundle = bundle.src_bundle
1230 assert orig_bundle is not None
1232 '%s: Notifying original %s we beat them to it.',
1236 orig_bundle.is_cancelled.set()
1237 self._release_worker(bundle, was_cancelled=was_cancelled)
1240 def _create_original_bundle(self, pickle, function_name: str):
1241 """Creates a bundle that is not a backup of any other bundle but
1242 rather represents a user task.
1245 uuid = string_utils.generate_uuid(omit_dashes=True)
1246 code_file = f'/tmp/{uuid}.code.bin'
1247 result_file = f'/tmp/{uuid}.result.bin'
1249 logger.debug('Writing pickled code to %s', code_file)
1250 with open(code_file, 'wb') as wb:
1253 bundle = BundleDetails(
1254 pickled_code=pickle,
1256 function_name=function_name,
1260 hostname=platform.node(),
1261 code_file=code_file,
1262 result_file=result_file,
1264 start_ts=time.time(),
1266 slower_than_local_p95=False,
1267 slower_than_global_p95=False,
1269 is_cancelled=threading.Event(),
1270 was_cancelled=False,
1274 self.status.record_bundle_details(bundle)
1275 logger.debug('%s: Created an original bundle', bundle)
1278 def _create_backup_bundle(self, src_bundle: BundleDetails):
1279 """Creates a bundle that is a backup of another bundle that is
1280 running too slowly."""
1282 assert self.status.lock.locked()
1283 assert src_bundle.backup_bundles is not None
1284 n = len(src_bundle.backup_bundles)
1285 uuid = src_bundle.uuid + f'_backup#{n}'
1287 backup_bundle = BundleDetails(
1288 pickled_code=src_bundle.pickled_code,
1290 function_name=src_bundle.function_name,
1294 hostname=src_bundle.hostname,
1295 code_file=src_bundle.code_file,
1296 result_file=src_bundle.result_file,
1298 start_ts=time.time(),
1300 slower_than_local_p95=False,
1301 slower_than_global_p95=False,
1302 src_bundle=src_bundle,
1303 is_cancelled=threading.Event(),
1304 was_cancelled=False,
1305 backup_bundles=None, # backup backups not allowed
1308 src_bundle.backup_bundles.append(backup_bundle)
1309 self.status.record_bundle_details_already_locked(backup_bundle)
1310 logger.debug('%s: Created a backup bundle', backup_bundle)
1311 return backup_bundle
1313 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1314 """Schedule a backup of src_bundle."""
1316 assert self.status.lock.locked()
1317 assert src_bundle is not None
1318 backup_bundle = self._create_backup_bundle(src_bundle)
1320 '%s/%s: Scheduling backup for execution...',
1322 backup_bundle.function_name,
1324 self._helper_executor.submit(self._launch, backup_bundle)
1326 # Results from backups don't matter; if they finish first
1327 # they will move the result_file to this machine and let
1328 # the original pick them up and unpickle them (and return
1331 def _emergency_retry_nasty_bundle(
1332 self, bundle: BundleDetails
1333 ) -> Optional[fut.Future]:
1334 """Something unexpectedly failed with bundle. Either retry it
1335 from the beginning or throw in the towel and give up on it."""
1337 is_original = bundle.src_bundle is None
1338 bundle.worker = None
1339 avoid_last_machine = bundle.machine
1340 bundle.machine = None
1341 bundle.username = None
1342 bundle.failure_count += 1
1348 if bundle.failure_count > retry_limit:
1350 '%s: Tried this bundle too many times already (%dx); giving up.',
1355 raise RemoteExecutorException(
1356 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1360 '%s: At least it\'s only a backup; better luck with the others.',
1365 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1368 return self._launch(bundle, avoid_last_machine)
1371 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1372 """Submit work to be done. This is the user entry point of this
1374 if self.already_shutdown:
1375 raise Exception('Submitted work after shutdown.')
1376 pickle = _make_cloud_pickle(function, *args, **kwargs)
1377 bundle = self._create_original_bundle(pickle, function.__name__)
1378 self.total_bundles_submitted += 1
1379 return self._helper_executor.submit(self._launch, bundle)
1382 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1383 """Shutdown the executor."""
1384 if not self.already_shutdown:
1385 logging.debug('Shutting down RemoteExecutor %s', self.title)
1386 self.heartbeat_stop_event.set()
1387 self.heartbeat_thread.join()
1388 self._helper_executor.shutdown(wait)
1390 print(self.histogram.__repr__(label_formatter='%ds'))
1391 self.already_shutdown = True
1394 class RemoteWorkerPoolProvider:
1396 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1400 @persistent.persistent_autoloaded_singleton() # type: ignore
1401 class ConfigRemoteWorkerPoolProvider(
1402 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1404 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1405 self.remote_worker_pool = []
1406 for record in json_remote_worker_pool['remote_worker_records']:
1407 self.remote_worker_pool.append(
1408 self.dataclassFromDict(RemoteWorkerRecord, record)
1410 assert len(self.remote_worker_pool) > 0
1413 def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1414 fieldSet = {f.name for f in fields(clsName) if f.init}
1415 filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1416 return clsName(**filteredArgDict)
1419 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1420 return self.remote_worker_pool
1423 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1424 return self.remote_worker_pool
1428 def get_filename() -> str:
1429 return config.config['remote_worker_records_file']
1433 def should_we_load_data(filename: str) -> bool:
1438 def should_we_save_data(filename: str) -> bool:
1443 class DefaultExecutors(object):
1444 """A container for a default thread, process and remote executor.
1445 These are not created until needed and we take care to clean up
1446 before process exit automatically for the caller's convenience.
1447 Instead of creating your own executor, consider using the one
1448 from this pool. e.g.::
1450 @par.parallelize(method=par.Method.PROCESS)
1452 solutions: List[Work],
1459 def start_do_work(all_work: List[Work]):
1461 logger.debug('Sharding work into groups of 10.')
1462 for subset in list_utils.shard(all_work, 10):
1463 shards.append([x for x in subset])
1465 logger.debug('Kicking off helper pool.')
1467 for n, shard in enumerate(shards):
1470 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1473 smart_future.wait_all(results)
1475 # Note: if you forget to do this it will clean itself up
1476 # during program termination including tearing down any
1477 # active ssh connections.
1478 executors.DefaultExecutors().process_pool().shutdown()
1482 self.thread_executor: Optional[ThreadExecutor] = None
1483 self.process_executor: Optional[ProcessExecutor] = None
1484 self.remote_executor: Optional[RemoteExecutor] = None
1487 def _ping(host) -> bool:
1488 logger.debug('RUN> ping -c 1 %s', host)
1491 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1497 def thread_pool(self) -> ThreadExecutor:
1498 if self.thread_executor is None:
1499 self.thread_executor = ThreadExecutor()
1500 return self.thread_executor
1502 def process_pool(self) -> ProcessExecutor:
1503 if self.process_executor is None:
1504 self.process_executor = ProcessExecutor()
1505 return self.process_executor
1507 def remote_pool(self) -> RemoteExecutor:
1508 if self.remote_executor is None:
1509 logger.info('Looking for some helper machines...')
1510 provider = ConfigRemoteWorkerPoolProvider()
1511 all_machines = provider.get_remote_workers()
1514 # Make sure we can ping each machine.
1515 for record in all_machines:
1516 if self._ping(record.machine):
1517 logger.info('%s is alive / responding to pings', record.machine)
1520 # The controller machine has a lot to do; go easy on it.
1522 if record.machine == platform.node() and record.count > 1:
1523 logger.info('Reducing workload for %s.', record.machine)
1524 record.count = max(int(record.count / 2), 1)
1526 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1527 policy.register_worker_pool(pool)
1528 self.remote_executor = RemoteExecutor(pool, policy)
1529 return self.remote_executor
1531 def shutdown(self) -> None:
1532 if self.thread_executor is not None:
1533 self.thread_executor.shutdown(wait=True, quiet=True)
1534 self.thread_executor = None
1535 if self.process_executor is not None:
1536 self.process_executor.shutdown(wait=True, quiet=True)
1537 self.process_executor = None
1538 if self.remote_executor is not None:
1539 self.remote_executor.shutdown(wait=True, quiet=True)
1540 self.remote_executor = None