2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
6 # © Copyright 2021-2023, Scott Gasch
9 This module defines a :class:`BaseExecutor` interface and three
12 - :class:`ThreadExecutor`
13 - :class:`ProcessExecutor`
14 - :class:`RemoteExecutor`
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work. Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code. But it's good for
20 work that is mostly I/O bound.
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network. You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed. It has the added benefit of being automatically
35 cleaned up at process termination time.
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
41 from __future__ import annotations
43 import concurrent.futures as fut
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
57 import cloudpickle # type: ignore
58 from overrides import overrides
60 import pyutils.typez.histogram as hist
61 from pyutils import argparse_utils, config, dataclass_utils, math_utils, string_utils
62 from pyutils.ansi import bg, fg, reset, underline
63 from pyutils.decorator_utils import singleton
64 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
65 from pyutils.parallelize.thread_utils import background_thread
66 from pyutils.typez import persistent, type_utils
68 logger = logging.getLogger(__name__)
70 parser = config.add_commandline_args(
71 f"Executors ({__file__})", "Args related to processing executors."
74 '--executors_threadpool_size',
77 help='Number of threads in the default threadpool, leave unset for default',
81 '--executors_processpool_size',
84 help='Number of processes in the default processpool, leave unset for default',
88 '--executors_schedule_remote_backups',
90 action=argparse_utils.ActionNoYes,
91 help='Should we schedule duplicative backup work if a remote bundle is slow',
94 '--executors_max_bundle_failures',
98 help='Maximum number of failures before giving up on a bundle',
101 '--remote_worker_records_file',
104 help='Path of the remote worker records file (JSON)',
105 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
108 '--remote_worker_helper_path',
110 metavar='PATH_TO_REMOTE_WORKER_PY',
111 help='Path to remote_worker.py on remote machines',
112 default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
116 SSH = '/usr/bin/ssh -oForwardX11=no'
117 SCP = '/usr/bin/scp -C'
120 def _make_cloud_pickle(fun, *args, **kwargs):
121 """Internal helper to create cloud pickles."""
122 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
123 return cloudpickle.dumps((fun, args, kwargs))
126 class BaseExecutor(ABC):
127 """The base executor interface definition. The interface for
128 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
129 :class:`ThreadExecutor`.
132 def __init__(self, *, title=''):
135 title: the name of this executor.
138 self.histogram = hist.SimpleHistogram(
139 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
144 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
145 """Submit work for the executor to do.
148 function: the Callable to be executed.
149 *args: the arguments to function
150 **kwargs: the arguments to function
153 A concurrent :class:`Future` representing the result of the
159 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
160 """Shutdown the executor.
163 wait: wait for the shutdown to complete before returning?
164 quiet: keep it quiet, please.
168 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
169 """Shutdown the executor and return True if the executor is idle
170 (i.e. there are no pending or active tasks). Return False
171 otherwise. Note: this should only be called by the launcher
175 quiet: keep it quiet, please.
178 True if the executor could be shut down because it has no
179 pending work, False otherwise.
181 if self.task_count == 0:
182 self.shutdown(wait=True, quiet=quiet)
186 def adjust_task_count(self, delta: int) -> None:
187 """Change the task count. Note: do not call this method from a
188 worker, it should only be called by the launcher process /
192 delta: the delta value by which to adjust task count.
194 self.task_count += delta
195 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
197 def get_task_count(self) -> int:
198 """Change the task count. Note: do not call this method from a
199 worker, it should only be called by the launcher process /
203 The executor's current task count.
205 return self.task_count
208 class ThreadExecutor(BaseExecutor):
209 """A threadpool executor. This executor uses Python threads to
210 schedule tasks. Note that, at least as of python3.10, because of
211 the global lock in the interpreter itself, these do not
212 parallelize very well so this class is useful mostly for non-CPU
215 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
218 def __init__(self, max_workers: Optional[int] = None):
221 max_workers: maximum number of threads to create in the pool.
225 if max_workers is not None:
226 workers = max_workers
227 elif 'executors_threadpool_size' in config.config:
228 workers = config.config['executors_threadpool_size']
229 if workers is not None:
230 logger.debug('Creating threadpool executor with %d workers', workers)
232 logger.debug('Creating a default sized threadpool executor')
233 self._thread_pool_executor = fut.ThreadPoolExecutor(
234 max_workers=workers, thread_name_prefix="thread_executor_helper"
236 self.already_shutdown = False
238 # This is run on a different thread; do not adjust task count here.
240 def _run_local_bundle(fun, *args, **kwargs):
241 logger.debug("Running local bundle at %s", fun.__name__)
242 result = fun(*args, **kwargs)
246 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
249 Exception: executor is shutting down already.
251 if self.already_shutdown:
252 raise Exception('Submitted work after shutdown.')
253 self.adjust_task_count(+1)
255 newargs.append(function)
259 result = self._thread_pool_executor.submit(
260 ThreadExecutor._run_local_bundle, *newargs, **kwargs
262 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
263 result.add_done_callback(lambda _: self.adjust_task_count(-1))
267 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
268 if not self.already_shutdown:
269 logger.debug('Shutting down threadpool executor %s', self.title)
270 self._thread_pool_executor.shutdown(wait)
272 print(self.histogram.__repr__(label_formatter='%ds'))
273 self.already_shutdown = True
276 class ProcessExecutor(BaseExecutor):
277 """An executor which runs tasks in child processes.
279 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
282 def __init__(self, max_workers=None):
285 max_workers: the max number of worker processes to create.
289 if max_workers is not None:
290 workers = max_workers
291 elif 'executors_processpool_size' in config.config:
292 workers = config.config['executors_processpool_size']
293 if workers is not None:
294 logger.debug('Creating processpool executor with %d workers.', workers)
296 logger.debug('Creating a default sized processpool executor')
297 self._process_executor = fut.ProcessPoolExecutor(
300 self.already_shutdown = False
302 # This is run in another process; do not adjust task count here.
304 def _run_cloud_pickle(pickle):
305 fun, args, kwargs = cloudpickle.loads(pickle)
306 logger.debug("Running pickled bundle at %s", fun.__name__)
307 result = fun(*args, **kwargs)
311 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
314 Exception: executor is shutting down already.
316 if self.already_shutdown:
317 raise Exception('Submitted work after shutdown.')
319 self.adjust_task_count(+1)
320 pickle = _make_cloud_pickle(function, *args, **kwargs)
321 result = self._process_executor.submit(
322 ProcessExecutor._run_cloud_pickle, pickle
324 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
325 result.add_done_callback(lambda _: self.adjust_task_count(-1))
329 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
330 if not self.already_shutdown:
331 logger.debug('Shutting down processpool executor %s', self.title)
332 self._process_executor.shutdown(wait)
334 print(self.histogram.__repr__(label_formatter='%ds'))
335 self.already_shutdown = True
337 def __getstate__(self):
338 state = self.__dict__.copy()
339 state['_process_executor'] = None
343 class RemoteExecutorException(Exception):
344 """Thrown when a bundle cannot be executed despite several retries."""
350 class RemoteWorkerRecord:
351 """A record of info about a remote worker."""
354 """Username we can ssh into on this machine to run work."""
357 """Machine address / name."""
360 """Relative probability for the weighted policy to select this
361 machine for scheduling work."""
364 """If this machine is selected, what is the maximum number of task
365 that it can handle?"""
368 return hash((self.username, self.machine))
371 return f'{self.username}@{self.machine}'
376 """All info necessary to define some unit of work that needs to be
377 done, where it is being run, its state, whether it is an original
378 bundle of a backup bundle, how many times it has failed, etc...
382 """The code to run, cloud pickled"""
385 """A unique identifier"""
388 """The name of the function we pickled"""
390 worker: Optional[RemoteWorkerRecord]
391 """The remote worker running this bundle or None if none (yet)"""
393 username: Optional[str]
394 """The remote username running this bundle or None if none (yet)"""
396 machine: Optional[str]
397 """The remote machine running this bundle or None if none (yet)"""
400 """The controller machine"""
403 """A unique filename to hold the work to be done"""
406 """Where the results should be placed / read from"""
409 """The process id of the local subprocess watching the ssh connection
410 to the remote machine"""
418 slower_than_local_p95: bool
419 """Currently slower then 95% of other bundles on remote host"""
421 slower_than_global_p95: bool
422 """Currently slower than 95% of other bundles globally"""
424 src_bundle: Optional[BundleDetails]
425 """If this is a backup bundle, this points to the original bundle
426 that it's backing up. None otherwise."""
428 is_cancelled: threading.Event
429 """An event that can be signaled to indicate this bundle is cancelled.
430 This is set when another copy (backup or original) of this work has
431 completed successfully elsewhere."""
434 """True if this bundle was cancelled, False if it finished normally"""
436 backup_bundles: Optional[List[BundleDetails]]
437 """If we've created backups of this bundle, this is the list of them"""
440 """How many times has this bundle failed already?"""
444 if uuid[-9:-2] == '_backup':
446 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
450 # We colorize the uuid based on some bits from it to make them
451 # stand out in the logging and help a reader correlate log messages
452 # related to the same bundle.
459 fg('marigold yellow'),
462 fg('cornflower blue'),
463 fg('turquoise blue'),
465 fg('lavender purple'),
468 c = colorz[int(uuid[-2:], 16) % len(colorz)]
470 self.function_name if self.function_name is not None else 'nofname'
472 machine = self.machine if self.machine is not None else 'nomachine'
473 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
476 class RemoteExecutorStatus:
477 """A status 'scoreboard' for a remote executor tracking various
478 metrics and able to render a periodic dump of global state.
481 def __init__(self, total_worker_count: int) -> None:
484 total_worker_count: number of workers in the pool
486 self.worker_count: int = total_worker_count
487 self.known_workers: Set[RemoteWorkerRecord] = set()
488 self.start_time: float = time.time()
489 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
490 self.end_per_bundle: Dict[str, float] = defaultdict(float)
491 self.finished_bundle_timings_per_worker: Dict[
492 RemoteWorkerRecord, math_utils.NumericPopulation
494 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
495 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
496 self.finished_bundle_timings: math_utils.NumericPopulation = (
497 math_utils.NumericPopulation()
499 self.last_periodic_dump: Optional[float] = None
500 self.total_bundles_submitted: int = 0
502 # Protects reads and modification using self. Also used
503 # as a memory fence for modifications to bundle.
504 self.lock: threading.Lock = threading.Lock()
506 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
507 """Record that bundle with uuid is assigned to a particular worker.
510 worker: the record of the worker to which uuid is assigned
511 uuid: the uuid of a bundle that has been assigned to a worker
514 self.record_acquire_worker_already_locked(worker, uuid)
516 def record_acquire_worker_already_locked(
517 self, worker: RemoteWorkerRecord, uuid: str
519 """Same as above but an entry point that doesn't acquire the lock
520 for codepaths where it's already held."""
521 assert self.lock.locked()
522 self.known_workers.add(worker)
523 self.start_per_bundle[uuid] = None
524 x = self.in_flight_bundles_by_worker.get(worker, set())
526 self.in_flight_bundles_by_worker[worker] = x
528 def record_bundle_details(self, details: BundleDetails) -> None:
529 """Register the details about a bundle of work."""
531 self.record_bundle_details_already_locked(details)
533 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
534 """Same as above but for codepaths that already hold the lock."""
535 assert self.lock.locked()
536 self.bundle_details_by_uuid[details.uuid] = details
538 def record_release_worker(
540 worker: RemoteWorkerRecord,
544 """Record that a bundle has released a worker."""
546 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
548 def record_release_worker_already_locked(
550 worker: RemoteWorkerRecord,
554 """Same as above but for codepaths that already hold the lock."""
555 assert self.lock.locked()
557 self.end_per_bundle[uuid] = ts
558 self.in_flight_bundles_by_worker[worker].remove(uuid)
559 if not was_cancelled:
560 start = self.start_per_bundle[uuid]
561 assert start is not None
562 bundle_latency = ts - start
563 x = self.finished_bundle_timings_per_worker.get(
564 worker, math_utils.NumericPopulation()
566 x.add_number(bundle_latency)
567 self.finished_bundle_timings_per_worker[worker] = x
568 self.finished_bundle_timings.add_number(bundle_latency)
570 def record_processing_began(self, uuid: str):
571 """Record when work on a bundle begins."""
573 self.start_per_bundle[uuid] = time.time()
575 def total_in_flight(self) -> int:
576 """How many bundles are in flight currently?"""
577 assert self.lock.locked()
579 for worker in self.known_workers:
580 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
581 return total_in_flight
583 def total_idle(self) -> int:
584 """How many idle workers are there currently?"""
585 assert self.lock.locked()
586 return self.worker_count - self.total_in_flight()
589 assert self.lock.locked()
591 total_finished = len(self.finished_bundle_timings)
592 total_in_flight = self.total_in_flight()
593 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
596 if len(self.finished_bundle_timings) > 1:
597 qall_median = self.finished_bundle_timings.get_median()
598 qall_p95 = self.finished_bundle_timings.get_percentile(95)
600 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
601 f'✅={total_finished}/{self.total_bundles_submitted}, '
602 f'💻n={total_in_flight}/{self.worker_count}\n'
606 f'⏱={ts-self.start_time:.1f}s, '
607 f'✅={total_finished}/{self.total_bundles_submitted}, '
608 f'💻n={total_in_flight}/{self.worker_count}\n'
611 for worker in self.known_workers:
612 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
613 timings = self.finished_bundle_timings_per_worker.get(
614 worker, math_utils.NumericPopulation()
617 qworker_median = None
620 qworker_median = timings.get_median()
621 qworker_p95 = timings.get_percentile(95)
622 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
626 ret += f' ...finished {count} total bundle(s) so far\n'
627 in_flight = len(self.in_flight_bundles_by_worker[worker])
629 ret += f' ...{in_flight} bundles currently in flight:\n'
630 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
631 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
632 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
633 if self.start_per_bundle[bundle_uuid] is not None:
634 sec = ts - self.start_per_bundle[bundle_uuid]
635 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
637 ret += f' {details} setting up / copying data...'
640 if qworker_p95 is not None:
641 if sec > qworker_p95:
642 ret += f'{bg("red")}>💻p95{reset()} '
643 if details is not None:
644 details.slower_than_local_p95 = True
646 if details is not None:
647 details.slower_than_local_p95 = False
649 if qall_p95 is not None:
651 ret += f'{bg("red")}>∀p95{reset()} '
652 if details is not None:
653 details.slower_than_global_p95 = True
655 details.slower_than_global_p95 = False
659 def periodic_dump(self, total_bundles_submitted: int) -> None:
660 assert self.lock.locked()
661 self.total_bundles_submitted = total_bundles_submitted
663 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
665 self.last_periodic_dump = ts
668 class RemoteWorkerSelectionPolicy(ABC):
669 """An interface definition of a policy for selecting a remote worker."""
672 self.workers: Optional[List[RemoteWorkerRecord]] = None
674 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
675 self.workers = workers
678 def is_worker_available(self) -> bool:
683 self, machine_to_avoid: str = None
684 ) -> Optional[RemoteWorkerRecord]:
688 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
689 """A remote worker selector that uses weighted RNG."""
692 def is_worker_available(self) -> bool:
694 for worker in self.workers:
701 self, machine_to_avoid: str = None
702 ) -> Optional[RemoteWorkerRecord]:
705 for worker in self.workers:
706 if worker.machine != machine_to_avoid:
708 for _ in range(worker.count * worker.weight):
709 grabbag.append(worker)
711 if len(grabbag) == 0:
713 'There are no available workers that avoid %s', machine_to_avoid
716 for worker in self.workers:
718 for _ in range(worker.count * worker.weight):
719 grabbag.append(worker)
721 if len(grabbag) == 0:
722 logger.warning('There are no available workers?!')
725 worker = random.sample(grabbag, 1)[0]
726 assert worker.count > 0
728 logger.debug('Selected worker %s', worker)
732 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
733 """A remote worker selector that just round robins."""
735 def __init__(self) -> None:
740 def is_worker_available(self) -> bool:
742 for worker in self.workers:
747 def _increment_index(self, index: int) -> None:
750 if index >= len(self.workers):
756 self, machine_to_avoid: str = None
757 ) -> Optional[RemoteWorkerRecord]:
761 worker = self.workers[x]
762 if worker.machine != machine_to_avoid and worker.count > 0:
764 self._increment_index(x)
765 logger.debug('Selected worker %s', worker)
768 if x >= len(self.workers):
771 logger.warning('Unexpectedly could not find a worker, retrying...')
776 class RemoteExecutor(BaseExecutor):
777 """An executor that uses processes on remote machines to do work.
778 To do so, it requires that a pool of remote workers to be properly
779 configured. See instructions in
780 :class:`pyutils.parallelize.parallelize`.
782 Each machine in a worker pool has a *weight* and a *count*. A
783 *weight* captures the relative speed of a processor on that worker
784 and a *count* captures the number of synchronous tasks the worker
785 can accept (i.e. the number of cpus on the machine).
787 To dispatch work to a remote machine, this class pickles the code
788 to be executed remotely using `cloudpickle`. For that to work,
789 the remote machine should be running the same version of Python as
790 this machine, ideally in a virtual environment with the same
791 import libraries installed. Differences in operating system
792 and/or processor architecture don't seem to matter for most code,
797 Mismatches in Python version or in the version numbers of
798 third-party libraries between machines can cause problems
799 when trying to unpickle and run code remotely.
801 Work to be dispatched is represented in this code by creating a
802 "bundle". Each bundle is assigned to a remote worker based on
803 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
804 general, it attempts to load all workers in the pool and maximize
805 throughput. Once assigned to a remote worker, pickled code is
806 copied to that worker via `scp` and a remote command is issued via
807 `ssh` to execute a :file:`remote_worker.py` process on the remote
808 machine. This process unpickles the code, runs it, and produces a
809 result which is then copied back to the local machine (again via
810 `scp`) where it can be processed by local code.
812 You can and probably must override the path of
813 :file:`remote_worker.py` on your pool machines using the
814 `--remote_worker_helper_path` commandline argument (or by just
815 changing the default in code, see above in this file's code).
817 During remote work execution, this local machine acts as a
818 controller dispatching all work to the network, copying pickled
819 tasks out, and copying results back in. It may also be a worker
820 in the pool but do not underestimate the cost of being a
821 controller -- it takes some cpu and a lot of network bandwidth.
822 The work dispatcher logic attempts to detect when a controller is
823 also a worker and reduce its load.
825 Some redundancy and safety provisions are made when scheduling
826 tasks to the worker pool; e.g. slower than expected tasks have
827 redundant backups tasks created, especially if there are otherwise
828 idle workers. If a task fails repeatedly, the dispatcher consider
829 it poisoned and give up on it.
833 This executor probably only makes sense to use with
834 computationally expensive tasks such as jobs that will execute
835 for ~30 seconds or longer.
837 The network overhead and latency of copying work from the
838 controller (local) machine to the remote workers and copying
839 results back again is relatively high. Especially at startup,
840 the network can become a bottleneck. Future versions of this
841 code may attempt to split the responsibility of being a
842 controller (distributing work to pool machines).
844 Instructions for how to set this up are provided in
845 :class:`pyutils.parallelize.parallelize`.
847 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
853 workers: List[RemoteWorkerRecord],
854 policy: RemoteWorkerSelectionPolicy,
858 workers: A list of remote workers we can call on to do tasks.
859 policy: A policy for selecting remote workers for tasks.
862 RemoteExecutorException: unable to find a place to schedule work.
866 self.workers = workers
868 self.worker_count = 0
869 for worker in self.workers:
870 self.worker_count += worker.count
871 if self.worker_count <= 0:
872 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
874 raise RemoteExecutorException(msg)
875 self.policy.register_worker_pool(self.workers)
876 self.cv = threading.Condition()
878 'Creating %d local threads, one per remote worker.', self.worker_count
880 self._helper_executor = fut.ThreadPoolExecutor(
881 thread_name_prefix="remote_executor_helper",
882 max_workers=self.worker_count,
884 self.status = RemoteExecutorStatus(self.worker_count)
885 self.total_bundles_submitted = 0
886 self.backup_lock = threading.Lock()
887 self.last_backup = None
889 self.heartbeat_thread,
890 self.heartbeat_stop_event,
891 ) = self._run_periodic_heartbeat()
892 self.already_shutdown = False
895 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
897 We create a background thread to invoke :meth:`_heartbeat` regularly
898 while we are scheduling work. It does some accounting such as
899 looking for slow bundles to tag for backup creation, checking for
900 unexpected failures, and printing a fancy message on stdout.
902 while not stop_event.is_set():
904 logger.debug('Running periodic heartbeat code...')
906 logger.debug('Periodic heartbeat thread shutting down.')
908 def _heartbeat(self) -> None:
909 # Note: this is invoked on a background thread, not an
910 # executor thread. Be careful what you do with it b/c it
911 # needs to get back and dump status again periodically.
912 with self.status.lock:
913 self.status.periodic_dump(self.total_bundles_submitted)
915 # Look for bundles to reschedule via executor.submit
916 if config.config['executors_schedule_remote_backups']:
917 self._maybe_schedule_backup_bundles()
919 def _maybe_schedule_backup_bundles(self):
920 """Maybe schedule backup bundles if we see a very slow bundle."""
922 assert self.status.lock.locked()
923 num_done = len(self.status.finished_bundle_timings)
924 num_idle_workers = self.worker_count - self.task_count
928 and num_idle_workers > 0
929 and (self.last_backup is None or (now - self.last_backup > 9.0))
930 and self.backup_lock.acquire(blocking=False)
933 assert self.backup_lock.locked()
935 bundle_to_backup = None
940 ) in self.status.in_flight_bundles_by_worker.items():
942 # Prefer to schedule backups of bundles running on
945 for record in self.workers:
946 if worker.machine == record.machine:
947 temp_score = float(record.weight)
948 temp_score = 1.0 / temp_score
950 base_score = int(temp_score)
953 for uuid in bundle_uuids:
954 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
957 and bundle.src_bundle is None
958 and bundle.backup_bundles is not None
962 # Schedule backups of bundles running
963 # longer; especially those that are
965 start_ts = self.status.start_per_bundle[uuid]
966 if start_ts is not None:
967 runtime = now - start_ts
970 'score[%s] => %.1f # latency boost', bundle, score
973 if bundle.slower_than_local_p95:
976 'score[%s] => %.1f # >worker p95',
981 if bundle.slower_than_global_p95:
984 'score[%s] => %.1f # >global p95',
989 # Prefer backups of bundles that don't
990 # have backups already.
991 backup_count = len(bundle.backup_bundles)
992 if backup_count == 0:
994 elif backup_count == 1:
996 elif backup_count == 2:
1001 'score[%s] => %.1f # {backup_count} dup backup factor',
1007 best_score is None or score > best_score
1009 bundle_to_backup = bundle
1010 assert bundle is not None
1011 assert bundle.backup_bundles is not None
1012 assert bundle.src_bundle is None
1015 # Note: this is all still happening on the heartbeat
1016 # runner thread. That's ok because
1017 # _schedule_backup_for_bundle uses the executor to
1018 # submit the bundle again which will cause it to be
1019 # picked up by a worker thread and allow this thread
1020 # to return to run future heartbeats.
1021 if bundle_to_backup is not None:
1022 self.last_backup = now
1024 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1028 self._schedule_backup_for_bundle(bundle_to_backup)
1030 self.backup_lock.release()
1032 def _is_worker_available(self) -> bool:
1033 """Is there a worker available currently?"""
1034 return self.policy.is_worker_available()
1036 def _acquire_worker(
1037 self, machine_to_avoid: str = None
1038 ) -> Optional[RemoteWorkerRecord]:
1039 """Try to acquire a worker."""
1040 return self.policy.acquire_worker(machine_to_avoid)
1042 def _find_available_worker_or_block(
1043 self, machine_to_avoid: str = None
1044 ) -> RemoteWorkerRecord:
1045 """Find a worker or block until one becomes available."""
1047 while not self._is_worker_available():
1049 worker = self._acquire_worker(machine_to_avoid)
1050 if worker is not None:
1052 msg = "We should never reach this point in the code"
1053 logger.critical(msg)
1054 raise Exception(msg)
1056 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1057 """Release a previously acquired worker."""
1058 worker = bundle.worker
1059 assert worker is not None
1060 logger.debug('Released worker %s', worker)
1061 self.status.record_release_worker(
1069 self.adjust_task_count(-1)
1071 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1072 """See if a particular bundle is cancelled. Do not block."""
1073 with self.status.lock:
1074 if bundle.is_cancelled.wait(timeout=0.0):
1075 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1076 bundle.was_cancelled = True
1080 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1081 """Find a worker for bundle or block until one is available."""
1083 self.adjust_task_count(+1)
1085 controller = bundle.controller
1086 avoid_machine = override_avoid_machine
1087 is_original = bundle.src_bundle is None
1089 # Try not to schedule a backup on the same host as the original.
1090 if avoid_machine is None and bundle.src_bundle is not None:
1091 avoid_machine = bundle.src_bundle.machine
1093 while worker is None:
1094 worker = self._find_available_worker_or_block(avoid_machine)
1095 assert worker is not None
1097 # Ok, found a worker.
1098 bundle.worker = worker
1099 machine = bundle.machine = worker.machine
1100 username = bundle.username = worker.username
1101 self.status.record_acquire_worker(worker, uuid)
1102 logger.debug('%s: Running bundle on %s...', bundle, worker)
1104 # Before we do any work, make sure the bundle is still viable.
1105 # It may have been some time between when it was submitted and
1106 # now due to lack of worker availability and someone else may
1107 # have already finished it.
1108 if self._check_if_cancelled(bundle):
1110 return self._process_work_result(bundle)
1113 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1115 self._release_worker(bundle)
1117 # Weird. We are the original owner of this
1118 # bundle. For it to have been cancelled, a backup
1119 # must have already started and completed before
1120 # we even for started. Moreover, the backup says
1121 # it is done but we can't find the results it
1122 # should have copied over. Reschedule the whole
1125 '%s: We are the original owner thread and yet there are '
1126 'no results for this bundle. This is unexpected and bad. '
1127 'Attempting an emergency retry...',
1130 return self._emergency_retry_nasty_bundle(bundle)
1132 # We're a backup and our bundle is cancelled
1133 # before we even got started. Do nothing and let
1134 # the original bundle's thread worry about either
1135 # finding the results or complaining about it.
1138 # Send input code / data to worker machine if it's not local.
1139 if controller not in machine:
1142 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1144 start_ts = time.time()
1145 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1147 xfer_latency = time.time() - start_ts
1149 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1152 self._release_worker(bundle)
1154 # Weird. We tried to copy the code to the worker
1155 # and it failed... And we're the original bundle.
1158 "%s: Failed to send instructions to the worker machine?! "
1159 "This is not expected; we\'re the original bundle so this shouldn\'t "
1160 "be a race condition. Attempting an emergency retry...",
1163 return self._emergency_retry_nasty_bundle(bundle)
1165 # This is actually expected; we're a backup.
1166 # There's a race condition where someone else
1167 # already finished the work and removed the source
1168 # code_file before we could copy it. Ignore.
1170 '%s: Failed to send instructions to the worker machine... '
1171 'We\'re a backup and this may be caused by the original (or '
1172 'some other backup) already finishing this work. Ignoring.',
1177 # Kick off the work. Note that if this fails we let
1178 # _wait_for_process deal with it.
1179 self.status.record_processing_began(uuid)
1180 helper_path = config.config['remote_worker_helper_path']
1182 f'{SSH} {bundle.username}@{bundle.machine} '
1183 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1186 '%s: Executing %s in the background to kick off work...', bundle, cmd
1188 p = cmd_in_background(cmd, silent=True)
1191 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1193 return self._wait_for_process(p, bundle, 0)
1195 def _wait_for_process(
1196 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1198 """At this point we've copied the bundle's pickled code to the remote
1199 worker and started an ssh process that should be invoking the
1200 remote worker to have it execute the user's code. See how
1201 that's going and wait for it to complete or fail. Note that
1202 this code is recursive: there are codepaths where we decide to
1203 stop waiting for an ssh process (because another backup seems
1204 to have finished) but then fail to fetch or parse the results
1205 from that backup and thus call ourselves to continue waiting
1206 on an active ssh process. This is the purpose of the depth
1207 argument: to curtail potential infinite recursion by giving up
1211 p: the Popen record of the ssh job
1212 bundle: the bundle of work being executed remotely
1213 depth: how many retries we've made so far. Starts at zero.
1217 machine = bundle.machine
1218 assert p is not None
1219 pid = p.pid # pid of the ssh process
1222 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1226 self._release_worker(bundle)
1227 return self._emergency_retry_nasty_bundle(bundle)
1229 # Spin until either the ssh job we scheduled finishes the
1230 # bundle or some backup worker signals that they finished it
1234 p.wait(timeout=0.25)
1235 except subprocess.TimeoutExpired:
1236 if self._check_if_cancelled(bundle):
1238 '%s: looks like another worker finished bundle...', bundle
1242 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1246 # If we get here we believe the bundle is done; either the ssh
1247 # subprocess finished (hopefully successfully) or we noticed
1248 # that some other worker seems to have completed the bundle
1249 # before us and we're bailing out.
1251 ret = self._process_work_result(bundle)
1252 if ret is not None and p is not None:
1256 # Something went wrong; e.g. we could not copy the results
1257 # back, cleanup after ourselves on the remote machine, or
1258 # unpickle the results we got from the remove machine. If we
1259 # still have an active ssh subprocess, keep waiting on it.
1260 # Otherwise, time for an emergency reschedule.
1262 logger.exception('%s: Something unexpected just happened...', bundle)
1265 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1268 return self._wait_for_process(p, bundle, depth + 1)
1270 self._release_worker(bundle)
1271 return self._emergency_retry_nasty_bundle(bundle)
1273 def _process_work_result(self, bundle: BundleDetails) -> Any:
1274 """A bundle seems to be completed. Check on the results."""
1276 with self.status.lock:
1277 is_original = bundle.src_bundle is None
1278 was_cancelled = bundle.was_cancelled
1279 username = bundle.username
1280 machine = bundle.machine
1281 result_file = bundle.result_file
1282 code_file = bundle.code_file
1284 # Whether original or backup, if we finished first we must
1285 # fetch the results if the computation happened on a
1287 bundle.end_ts = time.time()
1288 if not was_cancelled:
1289 assert bundle.machine is not None
1290 if bundle.controller not in bundle.machine:
1291 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1293 "%s: Fetching results back from %s@%s via %s",
1300 # If either of these throw they are handled in
1301 # _wait_for_process.
1306 except Exception as e:
1313 # Cleanup remote /tmp files.
1315 f'{SSH} {username}@{machine}'
1316 f' "/bin/rm -f {code_file} {result_file}"'
1319 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1321 dur = bundle.end_ts - bundle.start_ts
1322 self.histogram.add_item(dur)
1324 # Only the original worker should unpickle the file contents
1325 # though since it's the only one whose result matters. The
1326 # original is also the only job that may delete result_file
1327 # from disk. Note that the original may have been cancelled
1328 # if one of the backups finished first; it still must read the
1329 # result from disk. It still does that here with is_cancelled
1332 logger.debug("%s: Unpickling %s.", bundle, result_file)
1334 with open(result_file, 'rb') as rb:
1335 serialized = rb.read()
1336 result = cloudpickle.loads(serialized)
1337 except Exception as e:
1338 logger.exception('Failed to load %s... this is bad news.', result_file)
1339 self._release_worker(bundle)
1341 # Re-raise the exception; the code in _wait_for_process may
1342 # decide to _emergency_retry_nasty_bundle here.
1344 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1345 os.remove(result_file)
1346 os.remove(code_file)
1348 # Notify any backups that the original is done so they
1349 # should stop ASAP. Do this whether or not we
1350 # finished first since there could be more than one
1352 if bundle.backup_bundles is not None:
1353 for backup in bundle.backup_bundles:
1355 '%s: Notifying backup %s that it\'s cancelled',
1359 backup.is_cancelled.set()
1361 # This is a backup job and, by now, we have already fetched
1362 # the bundle results.
1364 # Backup results don't matter, they just need to leave the
1365 # result file in the right place for their originals to
1366 # read/unpickle later.
1369 # Tell the original to stop if we finished first.
1370 if not was_cancelled:
1371 orig_bundle = bundle.src_bundle
1372 assert orig_bundle is not None
1374 '%s: Notifying original %s we beat them to it.',
1378 orig_bundle.is_cancelled.set()
1379 self._release_worker(bundle, was_cancelled=was_cancelled)
1382 def _create_original_bundle(self, pickle, function_name: str):
1383 """Creates a bundle that is not a backup of any other bundle but
1384 rather represents a user task.
1387 uuid = string_utils.generate_uuid(omit_dashes=True)
1388 code_file = f'/tmp/{uuid}.code.bin'
1389 result_file = f'/tmp/{uuid}.result.bin'
1391 logger.debug('Writing pickled code to %s', code_file)
1392 with open(code_file, 'wb') as wb:
1395 bundle = BundleDetails(
1396 pickled_code=pickle,
1398 function_name=function_name,
1402 controller=platform.node(),
1403 code_file=code_file,
1404 result_file=result_file,
1406 start_ts=time.time(),
1408 slower_than_local_p95=False,
1409 slower_than_global_p95=False,
1411 is_cancelled=threading.Event(),
1412 was_cancelled=False,
1416 self.status.record_bundle_details(bundle)
1417 logger.debug('%s: Created an original bundle', bundle)
1420 def _create_backup_bundle(self, src_bundle: BundleDetails):
1421 """Creates a bundle that is a backup of another bundle that is
1422 running too slowly."""
1424 assert self.status.lock.locked()
1425 assert src_bundle.backup_bundles is not None
1426 n = len(src_bundle.backup_bundles)
1427 uuid = src_bundle.uuid + f'_backup#{n}'
1429 backup_bundle = BundleDetails(
1430 pickled_code=src_bundle.pickled_code,
1432 function_name=src_bundle.function_name,
1436 controller=src_bundle.controller,
1437 code_file=src_bundle.code_file,
1438 result_file=src_bundle.result_file,
1440 start_ts=time.time(),
1442 slower_than_local_p95=False,
1443 slower_than_global_p95=False,
1444 src_bundle=src_bundle,
1445 is_cancelled=threading.Event(),
1446 was_cancelled=False,
1447 backup_bundles=None, # backup backups not allowed
1450 src_bundle.backup_bundles.append(backup_bundle)
1451 self.status.record_bundle_details_already_locked(backup_bundle)
1452 logger.debug('%s: Created a backup bundle', backup_bundle)
1453 return backup_bundle
1455 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1456 """Schedule a backup of src_bundle."""
1458 assert self.status.lock.locked()
1459 assert src_bundle is not None
1460 backup_bundle = self._create_backup_bundle(src_bundle)
1462 '%s/%s: Scheduling backup for execution...',
1464 backup_bundle.function_name,
1466 self._helper_executor.submit(self._launch, backup_bundle)
1468 # Results from backups don't matter; if they finish first
1469 # they will move the result_file to this machine and let
1470 # the original pick them up and unpickle them (and return
1473 def _emergency_retry_nasty_bundle(
1474 self, bundle: BundleDetails
1475 ) -> Optional[fut.Future]:
1476 """Something unexpectedly failed with bundle. Either retry it
1477 from the beginning or throw in the towel and give up on it.
1480 RemoteExecutorException: a bundle fails repeatedly.
1483 is_original = bundle.src_bundle is None
1484 bundle.worker = None
1485 avoid_last_machine = bundle.machine
1486 bundle.machine = None
1487 bundle.username = None
1488 bundle.failure_count += 1
1494 if bundle.failure_count > retry_limit:
1496 '%s: Tried this bundle too many times already (%dx); giving up.',
1501 raise RemoteExecutorException(
1502 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1505 '%s: At least it\'s only a backup; better luck with the others.',
1510 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1513 return self._launch(bundle, avoid_last_machine)
1516 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1517 """Submit work to be done. This is the user entry point of this
1521 Exception: executor is already shutting down.
1523 if self.already_shutdown:
1524 raise Exception('Submitted work after shutdown.')
1525 pickle = _make_cloud_pickle(function, *args, **kwargs)
1526 bundle = self._create_original_bundle(pickle, function.__name__)
1527 self.total_bundles_submitted += 1
1528 return self._helper_executor.submit(self._launch, bundle)
1531 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1532 """Shutdown the executor."""
1533 if not self.already_shutdown:
1534 logging.debug('Shutting down RemoteExecutor %s', self.title)
1535 self.heartbeat_stop_event.set()
1536 self.heartbeat_thread.join()
1537 self._helper_executor.shutdown(wait)
1539 print(self.histogram.__repr__(label_formatter='%ds'))
1540 self.already_shutdown = True
1543 class RemoteWorkerPoolProvider:
1545 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1549 @persistent.persistent_autoloaded_singleton() # type: ignore
1550 class ConfigRemoteWorkerPoolProvider(
1551 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1553 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1554 self.remote_worker_pool: List[RemoteWorkerRecord] = []
1555 for record in json_remote_worker_pool['remote_worker_records']:
1556 self.remote_worker_pool.append(
1557 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1559 assert len(self.remote_worker_pool) > 0
1562 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1563 return self.remote_worker_pool
1566 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1567 return self.remote_worker_pool
1571 def get_filename() -> str:
1572 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1576 def should_we_load_data(filename: str) -> bool:
1581 def should_we_save_data(filename: str) -> bool:
1586 class DefaultExecutors(object):
1587 """A container for a default thread, process and remote executor.
1588 These are not created until needed and we take care to clean up
1589 before process exit automatically for the caller's convenience.
1590 Instead of creating your own executor, consider using the one
1591 from this pool. e.g.::
1593 @par.parallelize(method=par.Method.PROCESS)
1595 solutions: List[Work],
1602 def start_do_work(all_work: List[Work]):
1604 logger.debug('Sharding work into groups of 10.')
1605 for subset in list_utils.shard(all_work, 10):
1606 shards.append([x for x in subset])
1608 logger.debug('Kicking off helper pool.')
1610 for n, shard in enumerate(shards):
1613 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1616 smart_future.wait_all(results)
1618 # Note: if you forget to do this it will clean itself up
1619 # during program termination including tearing down any
1620 # active ssh connections.
1621 executors.DefaultExecutors().process_pool().shutdown()
1625 self.thread_executor: Optional[ThreadExecutor] = None
1626 self.process_executor: Optional[ProcessExecutor] = None
1627 self.remote_executor: Optional[RemoteExecutor] = None
1630 def _ping(host) -> bool:
1631 logger.debug('RUN> ping -c 1 %s', host)
1634 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1640 def thread_pool(self) -> ThreadExecutor:
1641 if self.thread_executor is None:
1642 self.thread_executor = ThreadExecutor()
1643 return self.thread_executor
1645 def process_pool(self) -> ProcessExecutor:
1646 if self.process_executor is None:
1647 self.process_executor = ProcessExecutor()
1648 return self.process_executor
1650 def remote_pool(self) -> RemoteExecutor:
1651 if self.remote_executor is None:
1652 logger.info('Looking for some helper machines...')
1653 provider = ConfigRemoteWorkerPoolProvider()
1654 all_machines = provider.get_remote_workers()
1657 # Make sure we can ping each machine.
1658 for record in all_machines:
1659 if self._ping(record.machine):
1660 logger.info('%s is alive / responding to pings', record.machine)
1663 # The controller machine has a lot to do; go easy on it.
1665 if record.machine == platform.node() and record.count > 1:
1666 logger.info('Reducing workload for %s.', record.machine)
1667 record.count = max(int(record.count / 2), 1)
1669 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1670 policy.register_worker_pool(pool)
1671 self.remote_executor = RemoteExecutor(pool, policy)
1672 return self.remote_executor
1674 def shutdown(self) -> None:
1675 if self.thread_executor is not None:
1676 self.thread_executor.shutdown(wait=True, quiet=True)
1677 self.thread_executor = None
1678 if self.process_executor is not None:
1679 self.process_executor.shutdown(wait=True, quiet=True)
1680 self.process_executor = None
1681 if self.remote_executor is not None:
1682 self.remote_executor.shutdown(wait=True, quiet=True)
1683 self.remote_executor = None