2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
7 This module defines a :class:`BaseExecutor` interface and three
10 - :class:`ThreadExecutor`
11 - :class:`ProcessExecutor`
12 - :class:`RemoteExecutor`
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work. Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code. But it's good for
18 work that is mostly I/O bound.
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network. You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed. It has the added benefit of being automatically
33 cleaned up at process termination time.
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
39 from __future__ import annotations
41 import concurrent.futures as fut
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
55 import cloudpickle # type: ignore
56 from overrides import overrides
58 import pyutils.types.histogram as hist
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
73 logger = logging.getLogger(__name__)
75 parser = config.add_commandline_args(
76 f"Executors ({__file__})", "Args related to processing executors."
79 '--executors_threadpool_size',
82 help='Number of threads in the default threadpool, leave unset for default',
86 '--executors_processpool_size',
89 help='Number of processes in the default processpool, leave unset for default',
93 '--executors_schedule_remote_backups',
95 action=argparse_utils.ActionNoYes,
96 help='Should we schedule duplicative backup work if a remote bundle is slow',
99 '--executors_max_bundle_failures',
103 help='Maximum number of failures before giving up on a bundle',
106 '--remote_worker_records_file',
109 help='Path of the remote worker records file (JSON)',
110 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
113 '--remote_worker_helper_path',
115 metavar='PATH_TO_REMOTE_WORKER_PY',
116 help='Path to remote_worker.py on remote machines',
117 default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
125 def _make_cloud_pickle(fun, *args, **kwargs):
126 """Internal helper to create cloud pickles."""
127 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128 return cloudpickle.dumps((fun, args, kwargs))
131 class BaseExecutor(ABC):
132 """The base executor interface definition. The interface for
133 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134 :class:`ThreadExecutor`.
137 def __init__(self, *, title=''):
140 title: the name of this executor.
143 self.histogram = hist.SimpleHistogram(
144 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
149 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150 """Submit work for the executor to do.
153 function: the Callable to be executed.
154 *args: the arguments to function
155 **kwargs: the arguments to function
158 A concurrent :class:`Future` representing the result of the
164 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165 """Shutdown the executor.
168 wait: wait for the shutdown to complete before returning?
169 quiet: keep it quiet, please.
173 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174 """Shutdown the executor and return True if the executor is idle
175 (i.e. there are no pending or active tasks). Return False
176 otherwise. Note: this should only be called by the launcher
180 quiet: keep it quiet, please.
183 True if the executor could be shut down because it has no
184 pending work, False otherwise.
186 if self.task_count == 0:
187 self.shutdown(wait=True, quiet=quiet)
191 def adjust_task_count(self, delta: int) -> None:
192 """Change the task count. Note: do not call this method from a
193 worker, it should only be called by the launcher process /
197 delta: the delta value by which to adjust task count.
199 self.task_count += delta
200 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
202 def get_task_count(self) -> int:
203 """Change the task count. Note: do not call this method from a
204 worker, it should only be called by the launcher process /
208 The executor's current task count.
210 return self.task_count
213 class ThreadExecutor(BaseExecutor):
214 """A threadpool executor. This executor uses Python threads to
215 schedule tasks. Note that, at least as of python3.10, because of
216 the global lock in the interpreter itself, these do not
217 parallelize very well so this class is useful mostly for non-CPU
220 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
223 def __init__(self, max_workers: Optional[int] = None):
226 max_workers: maximum number of threads to create in the pool.
230 if max_workers is not None:
231 workers = max_workers
232 elif 'executors_threadpool_size' in config.config:
233 workers = config.config['executors_threadpool_size']
234 if workers is not None:
235 logger.debug('Creating threadpool executor with %d workers', workers)
237 logger.debug('Creating a default sized threadpool executor')
238 self._thread_pool_executor = fut.ThreadPoolExecutor(
239 max_workers=workers, thread_name_prefix="thread_executor_helper"
241 self.already_shutdown = False
243 # This is run on a different thread; do not adjust task count here.
245 def _run_local_bundle(fun, *args, **kwargs):
246 logger.debug("Running local bundle at %s", fun.__name__)
247 result = fun(*args, **kwargs)
251 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252 if self.already_shutdown:
253 raise Exception('Submitted work after shutdown.')
254 self.adjust_task_count(+1)
256 newargs.append(function)
260 result = self._thread_pool_executor.submit(
261 ThreadExecutor._run_local_bundle, *newargs, **kwargs
263 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264 result.add_done_callback(lambda _: self.adjust_task_count(-1))
268 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269 if not self.already_shutdown:
270 logger.debug('Shutting down threadpool executor %s', self.title)
271 self._thread_pool_executor.shutdown(wait)
273 print(self.histogram.__repr__(label_formatter='%ds'))
274 self.already_shutdown = True
277 class ProcessExecutor(BaseExecutor):
278 """An executor which runs tasks in child processes.
280 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
283 def __init__(self, max_workers=None):
286 max_workers: the max number of worker processes to create.
290 if max_workers is not None:
291 workers = max_workers
292 elif 'executors_processpool_size' in config.config:
293 workers = config.config['executors_processpool_size']
294 if workers is not None:
295 logger.debug('Creating processpool executor with %d workers.', workers)
297 logger.debug('Creating a default sized processpool executor')
298 self._process_executor = fut.ProcessPoolExecutor(
301 self.already_shutdown = False
303 # This is run in another process; do not adjust task count here.
305 def _run_cloud_pickle(pickle):
306 fun, args, kwargs = cloudpickle.loads(pickle)
307 logger.debug("Running pickled bundle at %s", fun.__name__)
308 result = fun(*args, **kwargs)
312 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313 if self.already_shutdown:
314 raise Exception('Submitted work after shutdown.')
316 self.adjust_task_count(+1)
317 pickle = _make_cloud_pickle(function, *args, **kwargs)
318 result = self._process_executor.submit(
319 ProcessExecutor._run_cloud_pickle, pickle
321 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322 result.add_done_callback(lambda _: self.adjust_task_count(-1))
326 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327 if not self.already_shutdown:
328 logger.debug('Shutting down processpool executor %s', self.title)
329 self._process_executor.shutdown(wait)
331 print(self.histogram.__repr__(label_formatter='%ds'))
332 self.already_shutdown = True
334 def __getstate__(self):
335 state = self.__dict__.copy()
336 state['_process_executor'] = None
340 class RemoteExecutorException(Exception):
341 """Thrown when a bundle cannot be executed despite several retries."""
347 class RemoteWorkerRecord:
348 """A record of info about a remote worker."""
351 """Username we can ssh into on this machine to run work."""
354 """Machine address / name."""
357 """Relative probability for the weighted policy to select this
358 machine for scheduling work."""
361 """If this machine is selected, what is the maximum number of task
362 that it can handle?"""
365 return hash((self.username, self.machine))
368 return f'{self.username}@{self.machine}'
373 """All info necessary to define some unit of work that needs to be
374 done, where it is being run, its state, whether it is an original
375 bundle of a backup bundle, how many times it has failed, etc...
379 """The code to run, cloud pickled"""
382 """A unique identifier"""
385 """The name of the function we pickled"""
387 worker: Optional[RemoteWorkerRecord]
388 """The remote worker running this bundle or None if none (yet)"""
390 username: Optional[str]
391 """The remote username running this bundle or None if none (yet)"""
393 machine: Optional[str]
394 """The remote machine running this bundle or None if none (yet)"""
397 """The controller machine"""
400 """A unique filename to hold the work to be done"""
403 """Where the results should be placed / read from"""
406 """The process id of the local subprocess watching the ssh connection
407 to the remote machine"""
415 slower_than_local_p95: bool
416 """Currently slower then 95% of other bundles on remote host"""
418 slower_than_global_p95: bool
419 """Currently slower than 95% of other bundles globally"""
421 src_bundle: Optional[BundleDetails]
422 """If this is a backup bundle, this points to the original bundle
423 that it's backing up. None otherwise."""
425 is_cancelled: threading.Event
426 """An event that can be signaled to indicate this bundle is cancelled.
427 This is set when another copy (backup or original) of this work has
428 completed successfully elsewhere."""
431 """True if this bundle was cancelled, False if it finished normally"""
433 backup_bundles: Optional[List[BundleDetails]]
434 """If we've created backups of this bundle, this is the list of them"""
437 """How many times has this bundle failed already?"""
441 if uuid[-9:-2] == '_backup':
443 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
447 # We colorize the uuid based on some bits from it to make them
448 # stand out in the logging and help a reader correlate log messages
449 # related to the same bundle.
456 fg('marigold yellow'),
459 fg('cornflower blue'),
460 fg('turquoise blue'),
462 fg('lavender purple'),
465 c = colorz[int(uuid[-2:], 16) % len(colorz)]
467 self.function_name if self.function_name is not None else 'nofname'
469 machine = self.machine if self.machine is not None else 'nomachine'
470 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
473 class RemoteExecutorStatus:
474 """A status 'scoreboard' for a remote executor tracking various
475 metrics and able to render a periodic dump of global state.
478 def __init__(self, total_worker_count: int) -> None:
481 total_worker_count: number of workers in the pool
483 self.worker_count: int = total_worker_count
484 self.known_workers: Set[RemoteWorkerRecord] = set()
485 self.start_time: float = time.time()
486 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487 self.end_per_bundle: Dict[str, float] = defaultdict(float)
488 self.finished_bundle_timings_per_worker: Dict[
489 RemoteWorkerRecord, math_utils.NumericPopulation
491 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493 self.finished_bundle_timings: math_utils.NumericPopulation = (
494 math_utils.NumericPopulation()
496 self.last_periodic_dump: Optional[float] = None
497 self.total_bundles_submitted: int = 0
499 # Protects reads and modification using self. Also used
500 # as a memory fence for modifications to bundle.
501 self.lock: threading.Lock = threading.Lock()
503 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504 """Record that bundle with uuid is assigned to a particular worker.
507 worker: the record of the worker to which uuid is assigned
508 uuid: the uuid of a bundle that has been assigned to a worker
511 self.record_acquire_worker_already_locked(worker, uuid)
513 def record_acquire_worker_already_locked(
514 self, worker: RemoteWorkerRecord, uuid: str
516 """Same as above but an entry point that doesn't acquire the lock
517 for codepaths where it's already held."""
518 assert self.lock.locked()
519 self.known_workers.add(worker)
520 self.start_per_bundle[uuid] = None
521 x = self.in_flight_bundles_by_worker.get(worker, set())
523 self.in_flight_bundles_by_worker[worker] = x
525 def record_bundle_details(self, details: BundleDetails) -> None:
526 """Register the details about a bundle of work."""
528 self.record_bundle_details_already_locked(details)
530 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531 """Same as above but for codepaths that already hold the lock."""
532 assert self.lock.locked()
533 self.bundle_details_by_uuid[details.uuid] = details
535 def record_release_worker(
537 worker: RemoteWorkerRecord,
541 """Record that a bundle has released a worker."""
543 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
545 def record_release_worker_already_locked(
547 worker: RemoteWorkerRecord,
551 """Same as above but for codepaths that already hold the lock."""
552 assert self.lock.locked()
554 self.end_per_bundle[uuid] = ts
555 self.in_flight_bundles_by_worker[worker].remove(uuid)
556 if not was_cancelled:
557 start = self.start_per_bundle[uuid]
558 assert start is not None
559 bundle_latency = ts - start
560 x = self.finished_bundle_timings_per_worker.get(
561 worker, math_utils.NumericPopulation()
563 x.add_number(bundle_latency)
564 self.finished_bundle_timings_per_worker[worker] = x
565 self.finished_bundle_timings.add_number(bundle_latency)
567 def record_processing_began(self, uuid: str):
568 """Record when work on a bundle begins."""
570 self.start_per_bundle[uuid] = time.time()
572 def total_in_flight(self) -> int:
573 """How many bundles are in flight currently?"""
574 assert self.lock.locked()
576 for worker in self.known_workers:
577 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578 return total_in_flight
580 def total_idle(self) -> int:
581 """How many idle workers are there currently?"""
582 assert self.lock.locked()
583 return self.worker_count - self.total_in_flight()
586 assert self.lock.locked()
588 total_finished = len(self.finished_bundle_timings)
589 total_in_flight = self.total_in_flight()
590 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
593 if len(self.finished_bundle_timings) > 1:
594 qall_median = self.finished_bundle_timings.get_median()
595 qall_p95 = self.finished_bundle_timings.get_percentile(95)
597 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
598 f'✅={total_finished}/{self.total_bundles_submitted}, '
599 f'💻n={total_in_flight}/{self.worker_count}\n'
603 f'⏱={ts-self.start_time:.1f}s, '
604 f'✅={total_finished}/{self.total_bundles_submitted}, '
605 f'💻n={total_in_flight}/{self.worker_count}\n'
608 for worker in self.known_workers:
609 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
610 timings = self.finished_bundle_timings_per_worker.get(
611 worker, math_utils.NumericPopulation()
614 qworker_median = None
617 qworker_median = timings.get_median()
618 qworker_p95 = timings.get_percentile(95)
619 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
623 ret += f' ...finished {count} total bundle(s) so far\n'
624 in_flight = len(self.in_flight_bundles_by_worker[worker])
626 ret += f' ...{in_flight} bundles currently in flight:\n'
627 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
628 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
629 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
630 if self.start_per_bundle[bundle_uuid] is not None:
631 sec = ts - self.start_per_bundle[bundle_uuid]
632 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
634 ret += f' {details} setting up / copying data...'
637 if qworker_p95 is not None:
638 if sec > qworker_p95:
639 ret += f'{bg("red")}>💻p95{reset()} '
640 if details is not None:
641 details.slower_than_local_p95 = True
643 if details is not None:
644 details.slower_than_local_p95 = False
646 if qall_p95 is not None:
648 ret += f'{bg("red")}>∀p95{reset()} '
649 if details is not None:
650 details.slower_than_global_p95 = True
652 details.slower_than_global_p95 = False
656 def periodic_dump(self, total_bundles_submitted: int) -> None:
657 assert self.lock.locked()
658 self.total_bundles_submitted = total_bundles_submitted
660 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
662 self.last_periodic_dump = ts
665 class RemoteWorkerSelectionPolicy(ABC):
666 """An interface definition of a policy for selecting a remote worker."""
669 self.workers: Optional[List[RemoteWorkerRecord]] = None
671 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
672 self.workers = workers
675 def is_worker_available(self) -> bool:
679 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
683 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
684 """A remote worker selector that uses weighted RNG."""
687 def is_worker_available(self) -> bool:
689 for worker in self.workers:
695 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
698 for worker in self.workers:
699 if worker.machine != machine_to_avoid:
701 for _ in range(worker.count * worker.weight):
702 grabbag.append(worker)
704 if len(grabbag) == 0:
706 'There are no available workers that avoid %s', machine_to_avoid
709 for worker in self.workers:
711 for _ in range(worker.count * worker.weight):
712 grabbag.append(worker)
714 if len(grabbag) == 0:
715 logger.warning('There are no available workers?!')
718 worker = random.sample(grabbag, 1)[0]
719 assert worker.count > 0
721 logger.debug('Selected worker %s', worker)
725 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
726 """A remote worker selector that just round robins."""
728 def __init__(self) -> None:
733 def is_worker_available(self) -> bool:
735 for worker in self.workers:
742 self, machine_to_avoid: str = None
743 ) -> Optional[RemoteWorkerRecord]:
747 worker = self.workers[x]
751 if x >= len(self.workers):
754 logger.debug('Selected worker %s', worker)
757 if x >= len(self.workers):
760 logger.warning('Unexpectedly could not find a worker, retrying...')
765 class RemoteExecutor(BaseExecutor):
766 """An executor that uses processes on remote machines to do work.
767 To do so, it requires that a pool of remote workers to be properly
768 configured. See instructions in
769 :class:`pyutils.parallelize.parallelize`.
771 Each machine in a worker pool has a *weight* and a *count*. A
772 *weight* captures the relative speed of a processor on that worker
773 and a *count* captures the number of synchronous tasks the worker
774 can accept (i.e. the number of cpus on the machine).
776 To dispatch work to a remote machine, this class pickles the code
777 to be executed remotely using `cloudpickle`. For that to work,
778 the remote machine should be running the same version of Python as
779 this machine, ideally in a virtual environment with the same
780 import libraries installed. Differences in operating system
781 and/or processor architecture don't seem to matter for most code,
786 Mismatches in Python version or in the version numbers of
787 third-party libraries between machines can cause problems
788 when trying to unpickle and run code remotely.
790 Work to be dispatched is represented in this code by creating a
791 "bundle". Each bundle is assigned to a remote worker based on
792 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
793 general, it attempts to load all workers in the pool and maximize
794 throughput. Once assigned to a remote worker, pickled code is
795 copied to that worker via `scp` and a remote command is issued via
796 `ssh` to execute a :file:`remote_worker.py` process on the remote
797 machine. This process unpickles the code, runs it, and produces a
798 result which is then copied back to the local machine (again via
799 `scp`) where it can be processed by local code.
801 You can and probably must override the path of
802 :file:`remote_worker.py` on your pool machines using the
803 `--remote_worker_helper_path` commandline argument (or by just
804 changing the default in code, see above in this file's code).
806 During remote work execution, this local machine acts as a
807 controller dispatching all work to the network, copying pickled
808 tasks out, and copying results back in. It may also be a worker
809 in the pool but do not underestimate the cost of being a
810 controller -- it takes some cpu and a lot of network bandwidth.
811 The work dispatcher logic attempts to detect when a controller is
812 also a worker and reduce its load.
814 Some redundancy and safety provisions are made when scheduling
815 tasks to the worker pool; e.g. slower than expected tasks have
816 redundant backups tasks created, especially if there are otherwise
817 idle workers. If a task fails repeatedly, the dispatcher consider
818 it poisoned and give up on it.
822 This executor probably only makes sense to use with
823 computationally expensive tasks such as jobs that will execute
824 for ~30 seconds or longer.
826 The network overhead and latency of copying work from the
827 controller (local) machine to the remote workers and copying
828 results back again is relatively high. Especially at startup,
829 the network can become a bottleneck. Future versions of this
830 code may attempt to split the responsibility of being a
831 controller (distributing work to pool machines).
833 Instructions for how to set this up are provided in
834 :class:`pyutils.parallelize.parallelize`.
836 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
842 workers: List[RemoteWorkerRecord],
843 policy: RemoteWorkerSelectionPolicy,
847 workers: A list of remote workers we can call on to do tasks.
848 policy: A policy for selecting remote workers for tasks.
852 self.workers = workers
854 self.worker_count = 0
855 for worker in self.workers:
856 self.worker_count += worker.count
857 if self.worker_count <= 0:
858 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
860 raise RemoteExecutorException(msg)
861 self.policy.register_worker_pool(self.workers)
862 self.cv = threading.Condition()
864 'Creating %d local threads, one per remote worker.', self.worker_count
866 self._helper_executor = fut.ThreadPoolExecutor(
867 thread_name_prefix="remote_executor_helper",
868 max_workers=self.worker_count,
870 self.status = RemoteExecutorStatus(self.worker_count)
871 self.total_bundles_submitted = 0
872 self.backup_lock = threading.Lock()
873 self.last_backup = None
875 self.heartbeat_thread,
876 self.heartbeat_stop_event,
877 ) = self._run_periodic_heartbeat()
878 self.already_shutdown = False
881 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
883 We create a background thread to invoke :meth:`_heartbeat` regularly
884 while we are scheduling work. It does some accounting such as
885 looking for slow bundles to tag for backup creation, checking for
886 unexpected failures, and printing a fancy message on stdout.
888 while not stop_event.is_set():
890 logger.debug('Running periodic heartbeat code...')
892 logger.debug('Periodic heartbeat thread shutting down.')
894 def _heartbeat(self) -> None:
895 # Note: this is invoked on a background thread, not an
896 # executor thread. Be careful what you do with it b/c it
897 # needs to get back and dump status again periodically.
898 with self.status.lock:
899 self.status.periodic_dump(self.total_bundles_submitted)
901 # Look for bundles to reschedule via executor.submit
902 if config.config['executors_schedule_remote_backups']:
903 self._maybe_schedule_backup_bundles()
905 def _maybe_schedule_backup_bundles(self):
906 """Maybe schedule backup bundles if we see a very slow bundle."""
908 assert self.status.lock.locked()
909 num_done = len(self.status.finished_bundle_timings)
910 num_idle_workers = self.worker_count - self.task_count
914 and num_idle_workers > 0
915 and (self.last_backup is None or (now - self.last_backup > 9.0))
916 and self.backup_lock.acquire(blocking=False)
919 assert self.backup_lock.locked()
921 bundle_to_backup = None
926 ) in self.status.in_flight_bundles_by_worker.items():
928 # Prefer to schedule backups of bundles running on
931 for record in self.workers:
932 if worker.machine == record.machine:
933 base_score = float(record.weight)
934 base_score = 1.0 / base_score
936 base_score = int(base_score)
939 for uuid in bundle_uuids:
940 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
943 and bundle.src_bundle is None
944 and bundle.backup_bundles is not None
948 # Schedule backups of bundles running
949 # longer; especially those that are
951 start_ts = self.status.start_per_bundle[uuid]
952 if start_ts is not None:
953 runtime = now - start_ts
956 'score[%s] => %.1f # latency boost', bundle, score
959 if bundle.slower_than_local_p95:
962 'score[%s] => %.1f # >worker p95',
967 if bundle.slower_than_global_p95:
970 'score[%s] => %.1f # >global p95',
975 # Prefer backups of bundles that don't
976 # have backups already.
977 backup_count = len(bundle.backup_bundles)
978 if backup_count == 0:
980 elif backup_count == 1:
982 elif backup_count == 2:
987 'score[%s] => %.1f # {backup_count} dup backup factor',
993 best_score is None or score > best_score
995 bundle_to_backup = bundle
996 assert bundle is not None
997 assert bundle.backup_bundles is not None
998 assert bundle.src_bundle is None
1001 # Note: this is all still happening on the heartbeat
1002 # runner thread. That's ok because
1003 # _schedule_backup_for_bundle uses the executor to
1004 # submit the bundle again which will cause it to be
1005 # picked up by a worker thread and allow this thread
1006 # to return to run future heartbeats.
1007 if bundle_to_backup is not None:
1008 self.last_backup = now
1010 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1014 self._schedule_backup_for_bundle(bundle_to_backup)
1016 self.backup_lock.release()
1018 def _is_worker_available(self) -> bool:
1019 """Is there a worker available currently?"""
1020 return self.policy.is_worker_available()
1022 def _acquire_worker(
1023 self, machine_to_avoid: str = None
1024 ) -> Optional[RemoteWorkerRecord]:
1025 """Try to acquire a worker."""
1026 return self.policy.acquire_worker(machine_to_avoid)
1028 def _find_available_worker_or_block(
1029 self, machine_to_avoid: str = None
1030 ) -> RemoteWorkerRecord:
1031 """Find a worker or block until one becomes available."""
1033 while not self._is_worker_available():
1035 worker = self._acquire_worker(machine_to_avoid)
1036 if worker is not None:
1038 msg = "We should never reach this point in the code"
1039 logger.critical(msg)
1040 raise Exception(msg)
1042 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1043 """Release a previously acquired worker."""
1044 worker = bundle.worker
1045 assert worker is not None
1046 logger.debug('Released worker %s', worker)
1047 self.status.record_release_worker(
1055 self.adjust_task_count(-1)
1057 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1058 """See if a particular bundle is cancelled. Do not block."""
1059 with self.status.lock:
1060 if bundle.is_cancelled.wait(timeout=0.0):
1061 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1062 bundle.was_cancelled = True
1066 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1067 """Find a worker for bundle or block until one is available."""
1069 self.adjust_task_count(+1)
1071 controller = bundle.controller
1072 avoid_machine = override_avoid_machine
1073 is_original = bundle.src_bundle is None
1075 # Try not to schedule a backup on the same host as the original.
1076 if avoid_machine is None and bundle.src_bundle is not None:
1077 avoid_machine = bundle.src_bundle.machine
1079 while worker is None:
1080 worker = self._find_available_worker_or_block(avoid_machine)
1081 assert worker is not None
1083 # Ok, found a worker.
1084 bundle.worker = worker
1085 machine = bundle.machine = worker.machine
1086 username = bundle.username = worker.username
1087 self.status.record_acquire_worker(worker, uuid)
1088 logger.debug('%s: Running bundle on %s...', bundle, worker)
1090 # Before we do any work, make sure the bundle is still viable.
1091 # It may have been some time between when it was submitted and
1092 # now due to lack of worker availability and someone else may
1093 # have already finished it.
1094 if self._check_if_cancelled(bundle):
1096 return self._process_work_result(bundle)
1099 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1101 self._release_worker(bundle)
1103 # Weird. We are the original owner of this
1104 # bundle. For it to have been cancelled, a backup
1105 # must have already started and completed before
1106 # we even for started. Moreover, the backup says
1107 # it is done but we can't find the results it
1108 # should have copied over. Reschedule the whole
1111 '%s: We are the original owner thread and yet there are '
1112 'no results for this bundle. This is unexpected and bad. '
1113 'Attempting an emergency retry...',
1116 return self._emergency_retry_nasty_bundle(bundle)
1118 # We're a backup and our bundle is cancelled
1119 # before we even got started. Do nothing and let
1120 # the original bundle's thread worry about either
1121 # finding the results or complaining about it.
1124 # Send input code / data to worker machine if it's not local.
1125 if controller not in machine:
1128 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1130 start_ts = time.time()
1131 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1133 xfer_latency = time.time() - start_ts
1135 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1138 self._release_worker(bundle)
1140 # Weird. We tried to copy the code to the worker
1141 # and it failed... And we're the original bundle.
1144 "%s: Failed to send instructions to the worker machine?! "
1145 "This is not expected; we\'re the original bundle so this shouldn\'t "
1146 "be a race condition. Attempting an emergency retry...",
1149 return self._emergency_retry_nasty_bundle(bundle)
1151 # This is actually expected; we're a backup.
1152 # There's a race condition where someone else
1153 # already finished the work and removed the source
1154 # code_file before we could copy it. Ignore.
1156 '%s: Failed to send instructions to the worker machine... '
1157 'We\'re a backup and this may be caused by the original (or '
1158 'some other backup) already finishing this work. Ignoring.',
1163 # Kick off the work. Note that if this fails we let
1164 # _wait_for_process deal with it.
1165 self.status.record_processing_began(uuid)
1166 helper_path = config.config['remote_worker_helper_path']
1168 f'{SSH} {bundle.username}@{bundle.machine} '
1169 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1172 '%s: Executing %s in the background to kick off work...', bundle, cmd
1174 p = cmd_in_background(cmd, silent=True)
1177 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1179 return self._wait_for_process(p, bundle, 0)
1181 def _wait_for_process(
1182 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1184 """At this point we've copied the bundle's pickled code to the remote
1185 worker and started an ssh process that should be invoking the
1186 remote worker to have it execute the user's code. See how
1187 that's going and wait for it to complete or fail. Note that
1188 this code is recursive: there are codepaths where we decide to
1189 stop waiting for an ssh process (because another backup seems
1190 to have finished) but then fail to fetch or parse the results
1191 from that backup and thus call ourselves to continue waiting
1192 on an active ssh process. This is the purpose of the depth
1193 argument: to curtail potential infinite recursion by giving up
1197 p: the Popen record of the ssh job
1198 bundle: the bundle of work being executed remotely
1199 depth: how many retries we've made so far. Starts at zero.
1203 machine = bundle.machine
1204 assert p is not None
1205 pid = p.pid # pid of the ssh process
1208 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1212 self._release_worker(bundle)
1213 return self._emergency_retry_nasty_bundle(bundle)
1215 # Spin until either the ssh job we scheduled finishes the
1216 # bundle or some backup worker signals that they finished it
1220 p.wait(timeout=0.25)
1221 except subprocess.TimeoutExpired:
1222 if self._check_if_cancelled(bundle):
1224 '%s: looks like another worker finished bundle...', bundle
1228 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1232 # If we get here we believe the bundle is done; either the ssh
1233 # subprocess finished (hopefully successfully) or we noticed
1234 # that some other worker seems to have completed the bundle
1235 # before us and we're bailing out.
1237 ret = self._process_work_result(bundle)
1238 if ret is not None and p is not None:
1242 # Something went wrong; e.g. we could not copy the results
1243 # back, cleanup after ourselves on the remote machine, or
1244 # unpickle the results we got from the remove machine. If we
1245 # still have an active ssh subprocess, keep waiting on it.
1246 # Otherwise, time for an emergency reschedule.
1248 logger.exception('%s: Something unexpected just happened...', bundle)
1251 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1254 return self._wait_for_process(p, bundle, depth + 1)
1256 self._release_worker(bundle)
1257 return self._emergency_retry_nasty_bundle(bundle)
1259 def _process_work_result(self, bundle: BundleDetails) -> Any:
1260 """A bundle seems to be completed. Check on the results."""
1262 with self.status.lock:
1263 is_original = bundle.src_bundle is None
1264 was_cancelled = bundle.was_cancelled
1265 username = bundle.username
1266 machine = bundle.machine
1267 result_file = bundle.result_file
1268 code_file = bundle.code_file
1270 # Whether original or backup, if we finished first we must
1271 # fetch the results if the computation happened on a
1273 bundle.end_ts = time.time()
1274 if not was_cancelled:
1275 assert bundle.machine is not None
1276 if bundle.controller not in bundle.machine:
1277 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1279 "%s: Fetching results back from %s@%s via %s",
1286 # If either of these throw they are handled in
1287 # _wait_for_process.
1292 except Exception as e:
1299 # Cleanup remote /tmp files.
1301 f'{SSH} {username}@{machine}'
1302 f' "/bin/rm -f {code_file} {result_file}"'
1305 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1307 dur = bundle.end_ts - bundle.start_ts
1308 self.histogram.add_item(dur)
1310 # Only the original worker should unpickle the file contents
1311 # though since it's the only one whose result matters. The
1312 # original is also the only job that may delete result_file
1313 # from disk. Note that the original may have been cancelled
1314 # if one of the backups finished first; it still must read the
1315 # result from disk. It still does that here with is_cancelled
1318 logger.debug("%s: Unpickling %s.", bundle, result_file)
1320 with open(result_file, 'rb') as rb:
1321 serialized = rb.read()
1322 result = cloudpickle.loads(serialized)
1323 except Exception as e:
1324 logger.exception('Failed to load %s... this is bad news.', result_file)
1325 self._release_worker(bundle)
1327 # Re-raise the exception; the code in _wait_for_process may
1328 # decide to _emergency_retry_nasty_bundle here.
1330 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1331 os.remove(result_file)
1332 os.remove(code_file)
1334 # Notify any backups that the original is done so they
1335 # should stop ASAP. Do this whether or not we
1336 # finished first since there could be more than one
1338 if bundle.backup_bundles is not None:
1339 for backup in bundle.backup_bundles:
1341 '%s: Notifying backup %s that it\'s cancelled',
1345 backup.is_cancelled.set()
1347 # This is a backup job and, by now, we have already fetched
1348 # the bundle results.
1350 # Backup results don't matter, they just need to leave the
1351 # result file in the right place for their originals to
1352 # read/unpickle later.
1355 # Tell the original to stop if we finished first.
1356 if not was_cancelled:
1357 orig_bundle = bundle.src_bundle
1358 assert orig_bundle is not None
1360 '%s: Notifying original %s we beat them to it.',
1364 orig_bundle.is_cancelled.set()
1365 self._release_worker(bundle, was_cancelled=was_cancelled)
1368 def _create_original_bundle(self, pickle, function_name: str):
1369 """Creates a bundle that is not a backup of any other bundle but
1370 rather represents a user task.
1373 uuid = string_utils.generate_uuid(omit_dashes=True)
1374 code_file = f'/tmp/{uuid}.code.bin'
1375 result_file = f'/tmp/{uuid}.result.bin'
1377 logger.debug('Writing pickled code to %s', code_file)
1378 with open(code_file, 'wb') as wb:
1381 bundle = BundleDetails(
1382 pickled_code=pickle,
1384 function_name=function_name,
1388 controller=platform.node(),
1389 code_file=code_file,
1390 result_file=result_file,
1392 start_ts=time.time(),
1394 slower_than_local_p95=False,
1395 slower_than_global_p95=False,
1397 is_cancelled=threading.Event(),
1398 was_cancelled=False,
1402 self.status.record_bundle_details(bundle)
1403 logger.debug('%s: Created an original bundle', bundle)
1406 def _create_backup_bundle(self, src_bundle: BundleDetails):
1407 """Creates a bundle that is a backup of another bundle that is
1408 running too slowly."""
1410 assert self.status.lock.locked()
1411 assert src_bundle.backup_bundles is not None
1412 n = len(src_bundle.backup_bundles)
1413 uuid = src_bundle.uuid + f'_backup#{n}'
1415 backup_bundle = BundleDetails(
1416 pickled_code=src_bundle.pickled_code,
1418 function_name=src_bundle.function_name,
1422 controller=src_bundle.controller,
1423 code_file=src_bundle.code_file,
1424 result_file=src_bundle.result_file,
1426 start_ts=time.time(),
1428 slower_than_local_p95=False,
1429 slower_than_global_p95=False,
1430 src_bundle=src_bundle,
1431 is_cancelled=threading.Event(),
1432 was_cancelled=False,
1433 backup_bundles=None, # backup backups not allowed
1436 src_bundle.backup_bundles.append(backup_bundle)
1437 self.status.record_bundle_details_already_locked(backup_bundle)
1438 logger.debug('%s: Created a backup bundle', backup_bundle)
1439 return backup_bundle
1441 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1442 """Schedule a backup of src_bundle."""
1444 assert self.status.lock.locked()
1445 assert src_bundle is not None
1446 backup_bundle = self._create_backup_bundle(src_bundle)
1448 '%s/%s: Scheduling backup for execution...',
1450 backup_bundle.function_name,
1452 self._helper_executor.submit(self._launch, backup_bundle)
1454 # Results from backups don't matter; if they finish first
1455 # they will move the result_file to this machine and let
1456 # the original pick them up and unpickle them (and return
1459 def _emergency_retry_nasty_bundle(
1460 self, bundle: BundleDetails
1461 ) -> Optional[fut.Future]:
1462 """Something unexpectedly failed with bundle. Either retry it
1463 from the beginning or throw in the towel and give up on it."""
1465 is_original = bundle.src_bundle is None
1466 bundle.worker = None
1467 avoid_last_machine = bundle.machine
1468 bundle.machine = None
1469 bundle.username = None
1470 bundle.failure_count += 1
1476 if bundle.failure_count > retry_limit:
1478 '%s: Tried this bundle too many times already (%dx); giving up.',
1483 raise RemoteExecutorException(
1484 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1487 '%s: At least it\'s only a backup; better luck with the others.',
1492 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1495 return self._launch(bundle, avoid_last_machine)
1498 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1499 """Submit work to be done. This is the user entry point of this
1501 if self.already_shutdown:
1502 raise Exception('Submitted work after shutdown.')
1503 pickle = _make_cloud_pickle(function, *args, **kwargs)
1504 bundle = self._create_original_bundle(pickle, function.__name__)
1505 self.total_bundles_submitted += 1
1506 return self._helper_executor.submit(self._launch, bundle)
1509 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1510 """Shutdown the executor."""
1511 if not self.already_shutdown:
1512 logging.debug('Shutting down RemoteExecutor %s', self.title)
1513 self.heartbeat_stop_event.set()
1514 self.heartbeat_thread.join()
1515 self._helper_executor.shutdown(wait)
1517 print(self.histogram.__repr__(label_formatter='%ds'))
1518 self.already_shutdown = True
1521 class RemoteWorkerPoolProvider:
1523 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1527 @persistent.persistent_autoloaded_singleton() # type: ignore
1528 class ConfigRemoteWorkerPoolProvider(
1529 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1531 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1532 self.remote_worker_pool = []
1533 for record in json_remote_worker_pool['remote_worker_records']:
1534 self.remote_worker_pool.append(
1535 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1537 assert len(self.remote_worker_pool) > 0
1540 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1541 return self.remote_worker_pool
1544 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1545 return self.remote_worker_pool
1549 def get_filename() -> str:
1550 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1554 def should_we_load_data(filename: str) -> bool:
1559 def should_we_save_data(filename: str) -> bool:
1564 class DefaultExecutors(object):
1565 """A container for a default thread, process and remote executor.
1566 These are not created until needed and we take care to clean up
1567 before process exit automatically for the caller's convenience.
1568 Instead of creating your own executor, consider using the one
1569 from this pool. e.g.::
1571 @par.parallelize(method=par.Method.PROCESS)
1573 solutions: List[Work],
1580 def start_do_work(all_work: List[Work]):
1582 logger.debug('Sharding work into groups of 10.')
1583 for subset in list_utils.shard(all_work, 10):
1584 shards.append([x for x in subset])
1586 logger.debug('Kicking off helper pool.')
1588 for n, shard in enumerate(shards):
1591 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1594 smart_future.wait_all(results)
1596 # Note: if you forget to do this it will clean itself up
1597 # during program termination including tearing down any
1598 # active ssh connections.
1599 executors.DefaultExecutors().process_pool().shutdown()
1603 self.thread_executor: Optional[ThreadExecutor] = None
1604 self.process_executor: Optional[ProcessExecutor] = None
1605 self.remote_executor: Optional[RemoteExecutor] = None
1608 def _ping(host) -> bool:
1609 logger.debug('RUN> ping -c 1 %s', host)
1612 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1618 def thread_pool(self) -> ThreadExecutor:
1619 if self.thread_executor is None:
1620 self.thread_executor = ThreadExecutor()
1621 return self.thread_executor
1623 def process_pool(self) -> ProcessExecutor:
1624 if self.process_executor is None:
1625 self.process_executor = ProcessExecutor()
1626 return self.process_executor
1628 def remote_pool(self) -> RemoteExecutor:
1629 if self.remote_executor is None:
1630 logger.info('Looking for some helper machines...')
1631 provider = ConfigRemoteWorkerPoolProvider()
1632 all_machines = provider.get_remote_workers()
1635 # Make sure we can ping each machine.
1636 for record in all_machines:
1637 if self._ping(record.machine):
1638 logger.info('%s is alive / responding to pings', record.machine)
1641 # The controller machine has a lot to do; go easy on it.
1643 if record.machine == platform.node() and record.count > 1:
1644 logger.info('Reducing workload for %s.', record.machine)
1645 record.count = max(int(record.count / 2), 1)
1647 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1648 policy.register_worker_pool(pool)
1649 self.remote_executor = RemoteExecutor(pool, policy)
1650 return self.remote_executor
1652 def shutdown(self) -> None:
1653 if self.thread_executor is not None:
1654 self.thread_executor.shutdown(wait=True, quiet=True)
1655 self.thread_executor = None
1656 if self.process_executor is not None:
1657 self.process_executor.shutdown(wait=True, quiet=True)
1658 self.process_executor = None
1659 if self.remote_executor is not None:
1660 self.remote_executor.shutdown(wait=True, quiet=True)
1661 self.remote_executor = None