2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
7 This module defines a :class:`BaseExecutor` interface and three
10 - :class:`ThreadExecutor`
11 - :class:`ProcessExecutor`
12 - :class:`RemoteExecutor`
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work. Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code. But it's good for
18 work that is mostly I/O bound.
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network. You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed. It has the added benefit of being automatically
33 cleaned up at process termination time.
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
39 from __future__ import annotations
41 import concurrent.futures as fut
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
55 import cloudpickle # type: ignore
56 from overrides import overrides
58 import pyutils.types.histogram as hist
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
73 logger = logging.getLogger(__name__)
75 parser = config.add_commandline_args(
76 f"Executors ({__file__})", "Args related to processing executors."
79 '--executors_threadpool_size',
82 help='Number of threads in the default threadpool, leave unset for default',
86 '--executors_processpool_size',
89 help='Number of processes in the default processpool, leave unset for default',
93 '--executors_schedule_remote_backups',
95 action=argparse_utils.ActionNoYes,
96 help='Should we schedule duplicative backup work if a remote bundle is slow',
99 '--executors_max_bundle_failures',
103 help='Maximum number of failures before giving up on a bundle',
106 '--remote_worker_records_file',
109 help='Path of the remote worker records file (JSON)',
110 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
113 '--remote_worker_helper_path',
115 metavar='PATH_TO_REMOTE_WORKER_PY',
116 help='Path to remote_worker.py on remote machines',
117 default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
125 def _make_cloud_pickle(fun, *args, **kwargs):
126 """Internal helper to create cloud pickles."""
127 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128 return cloudpickle.dumps((fun, args, kwargs))
131 class BaseExecutor(ABC):
132 """The base executor interface definition. The interface for
133 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134 :class:`ThreadExecutor`.
137 def __init__(self, *, title=''):
140 title: the name of this executor.
143 self.histogram = hist.SimpleHistogram(
144 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
149 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150 """Submit work for the executor to do.
153 function: the Callable to be executed.
154 *args: the arguments to function
155 **kwargs: the arguments to function
158 A concurrent :class:`Future` representing the result of the
164 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165 """Shutdown the executor.
168 wait: wait for the shutdown to complete before returning?
169 quiet: keep it quiet, please.
173 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174 """Shutdown the executor and return True if the executor is idle
175 (i.e. there are no pending or active tasks). Return False
176 otherwise. Note: this should only be called by the launcher
180 quiet: keep it quiet, please.
183 True if the executor could be shut down because it has no
184 pending work, False otherwise.
186 if self.task_count == 0:
187 self.shutdown(wait=True, quiet=quiet)
191 def adjust_task_count(self, delta: int) -> None:
192 """Change the task count. Note: do not call this method from a
193 worker, it should only be called by the launcher process /
197 delta: the delta value by which to adjust task count.
199 self.task_count += delta
200 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
202 def get_task_count(self) -> int:
203 """Change the task count. Note: do not call this method from a
204 worker, it should only be called by the launcher process /
208 The executor's current task count.
210 return self.task_count
213 class ThreadExecutor(BaseExecutor):
214 """A threadpool executor. This executor uses Python threads to
215 schedule tasks. Note that, at least as of python3.10, because of
216 the global lock in the interpreter itself, these do not
217 parallelize very well so this class is useful mostly for non-CPU
220 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
223 def __init__(self, max_workers: Optional[int] = None):
226 max_workers: maximum number of threads to create in the pool.
230 if max_workers is not None:
231 workers = max_workers
232 elif 'executors_threadpool_size' in config.config:
233 workers = config.config['executors_threadpool_size']
234 if workers is not None:
235 logger.debug('Creating threadpool executor with %d workers', workers)
237 logger.debug('Creating a default sized threadpool executor')
238 self._thread_pool_executor = fut.ThreadPoolExecutor(
239 max_workers=workers, thread_name_prefix="thread_executor_helper"
241 self.already_shutdown = False
243 # This is run on a different thread; do not adjust task count here.
245 def _run_local_bundle(fun, *args, **kwargs):
246 logger.debug("Running local bundle at %s", fun.__name__)
247 result = fun(*args, **kwargs)
251 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252 if self.already_shutdown:
253 raise Exception('Submitted work after shutdown.')
254 self.adjust_task_count(+1)
256 newargs.append(function)
260 result = self._thread_pool_executor.submit(
261 ThreadExecutor._run_local_bundle, *newargs, **kwargs
263 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264 result.add_done_callback(lambda _: self.adjust_task_count(-1))
268 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269 if not self.already_shutdown:
270 logger.debug('Shutting down threadpool executor %s', self.title)
271 self._thread_pool_executor.shutdown(wait)
273 print(self.histogram.__repr__(label_formatter='%ds'))
274 self.already_shutdown = True
277 class ProcessExecutor(BaseExecutor):
278 """An executor which runs tasks in child processes.
280 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
283 def __init__(self, max_workers=None):
286 max_workers: the max number of worker processes to create.
290 if max_workers is not None:
291 workers = max_workers
292 elif 'executors_processpool_size' in config.config:
293 workers = config.config['executors_processpool_size']
294 if workers is not None:
295 logger.debug('Creating processpool executor with %d workers.', workers)
297 logger.debug('Creating a default sized processpool executor')
298 self._process_executor = fut.ProcessPoolExecutor(
301 self.already_shutdown = False
303 # This is run in another process; do not adjust task count here.
305 def _run_cloud_pickle(pickle):
306 fun, args, kwargs = cloudpickle.loads(pickle)
307 logger.debug("Running pickled bundle at %s", fun.__name__)
308 result = fun(*args, **kwargs)
312 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313 if self.already_shutdown:
314 raise Exception('Submitted work after shutdown.')
316 self.adjust_task_count(+1)
317 pickle = _make_cloud_pickle(function, *args, **kwargs)
318 result = self._process_executor.submit(
319 ProcessExecutor._run_cloud_pickle, pickle
321 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322 result.add_done_callback(lambda _: self.adjust_task_count(-1))
326 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327 if not self.already_shutdown:
328 logger.debug('Shutting down processpool executor %s', self.title)
329 self._process_executor.shutdown(wait)
331 print(self.histogram.__repr__(label_formatter='%ds'))
332 self.already_shutdown = True
334 def __getstate__(self):
335 state = self.__dict__.copy()
336 state['_process_executor'] = None
340 class RemoteExecutorException(Exception):
341 """Thrown when a bundle cannot be executed despite several retries."""
347 class RemoteWorkerRecord:
348 """A record of info about a remote worker."""
351 """Username we can ssh into on this machine to run work."""
354 """Machine address / name."""
357 """Relative probability for the weighted policy to select this
358 machine for scheduling work."""
361 """If this machine is selected, what is the maximum number of task
362 that it can handle?"""
365 return hash((self.username, self.machine))
368 return f'{self.username}@{self.machine}'
373 """All info necessary to define some unit of work that needs to be
374 done, where it is being run, its state, whether it is an original
375 bundle of a backup bundle, how many times it has failed, etc...
379 """The code to run, cloud pickled"""
382 """A unique identifier"""
385 """The name of the function we pickled"""
387 worker: Optional[RemoteWorkerRecord]
388 """The remote worker running this bundle or None if none (yet)"""
390 username: Optional[str]
391 """The remote username running this bundle or None if none (yet)"""
393 machine: Optional[str]
394 """The remote machine running this bundle or None if none (yet)"""
397 """The controller machine"""
400 """A unique filename to hold the work to be done"""
403 """Where the results should be placed / read from"""
406 """The process id of the local subprocess watching the ssh connection
407 to the remote machine"""
415 slower_than_local_p95: bool
416 """Currently slower then 95% of other bundles on remote host"""
418 slower_than_global_p95: bool
419 """Currently slower than 95% of other bundles globally"""
421 src_bundle: Optional[BundleDetails]
422 """If this is a backup bundle, this points to the original bundle
423 that it's backing up. None otherwise."""
425 is_cancelled: threading.Event
426 """An event that can be signaled to indicate this bundle is cancelled.
427 This is set when another copy (backup or original) of this work has
428 completed successfully elsewhere."""
431 """True if this bundle was cancelled, False if it finished normally"""
433 backup_bundles: Optional[List[BundleDetails]]
434 """If we've created backups of this bundle, this is the list of them"""
437 """How many times has this bundle failed already?"""
441 if uuid[-9:-2] == '_backup':
443 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
447 # We colorize the uuid based on some bits from it to make them
448 # stand out in the logging and help a reader correlate log messages
449 # related to the same bundle.
456 fg('marigold yellow'),
459 fg('cornflower blue'),
460 fg('turquoise blue'),
462 fg('lavender purple'),
465 c = colorz[int(uuid[-2:], 16) % len(colorz)]
467 self.function_name if self.function_name is not None else 'nofname'
469 machine = self.machine if self.machine is not None else 'nomachine'
470 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
473 class RemoteExecutorStatus:
474 """A status 'scoreboard' for a remote executor tracking various
475 metrics and able to render a periodic dump of global state.
478 def __init__(self, total_worker_count: int) -> None:
481 total_worker_count: number of workers in the pool
483 self.worker_count: int = total_worker_count
484 self.known_workers: Set[RemoteWorkerRecord] = set()
485 self.start_time: float = time.time()
486 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487 self.end_per_bundle: Dict[str, float] = defaultdict(float)
488 self.finished_bundle_timings_per_worker: Dict[
489 RemoteWorkerRecord, math_utils.NumericPopulation
491 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493 self.finished_bundle_timings: math_utils.NumericPopulation = (
494 math_utils.NumericPopulation()
496 self.last_periodic_dump: Optional[float] = None
497 self.total_bundles_submitted: int = 0
499 # Protects reads and modification using self. Also used
500 # as a memory fence for modifications to bundle.
501 self.lock: threading.Lock = threading.Lock()
503 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504 """Record that bundle with uuid is assigned to a particular worker.
507 worker: the record of the worker to which uuid is assigned
508 uuid: the uuid of a bundle that has been assigned to a worker
511 self.record_acquire_worker_already_locked(worker, uuid)
513 def record_acquire_worker_already_locked(
514 self, worker: RemoteWorkerRecord, uuid: str
516 """Same as above but an entry point that doesn't acquire the lock
517 for codepaths where it's already held."""
518 assert self.lock.locked()
519 self.known_workers.add(worker)
520 self.start_per_bundle[uuid] = None
521 x = self.in_flight_bundles_by_worker.get(worker, set())
523 self.in_flight_bundles_by_worker[worker] = x
525 def record_bundle_details(self, details: BundleDetails) -> None:
526 """Register the details about a bundle of work."""
528 self.record_bundle_details_already_locked(details)
530 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531 """Same as above but for codepaths that already hold the lock."""
532 assert self.lock.locked()
533 self.bundle_details_by_uuid[details.uuid] = details
535 def record_release_worker(
537 worker: RemoteWorkerRecord,
541 """Record that a bundle has released a worker."""
543 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
545 def record_release_worker_already_locked(
547 worker: RemoteWorkerRecord,
551 """Same as above but for codepaths that already hold the lock."""
552 assert self.lock.locked()
554 self.end_per_bundle[uuid] = ts
555 self.in_flight_bundles_by_worker[worker].remove(uuid)
556 if not was_cancelled:
557 start = self.start_per_bundle[uuid]
558 assert start is not None
559 bundle_latency = ts - start
560 x = self.finished_bundle_timings_per_worker.get(
561 worker, math_utils.NumericPopulation()
563 x.add_number(bundle_latency)
564 self.finished_bundle_timings_per_worker[worker] = x
565 self.finished_bundle_timings.add_number(bundle_latency)
567 def record_processing_began(self, uuid: str):
568 """Record when work on a bundle begins."""
570 self.start_per_bundle[uuid] = time.time()
572 def total_in_flight(self) -> int:
573 """How many bundles are in flight currently?"""
574 assert self.lock.locked()
576 for worker in self.known_workers:
577 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578 return total_in_flight
580 def total_idle(self) -> int:
581 """How many idle workers are there currently?"""
582 assert self.lock.locked()
583 return self.worker_count - self.total_in_flight()
586 assert self.lock.locked()
588 total_finished = len(self.finished_bundle_timings)
589 total_in_flight = self.total_in_flight()
590 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
593 if len(self.finished_bundle_timings) > 1:
594 qall_median = self.finished_bundle_timings.get_median()
595 qall_p95 = self.finished_bundle_timings.get_percentile(95)
597 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
598 f'✅={total_finished}/{self.total_bundles_submitted}, '
599 f'💻n={total_in_flight}/{self.worker_count}\n'
603 f'⏱={ts-self.start_time:.1f}s, '
604 f'✅={total_finished}/{self.total_bundles_submitted}, '
605 f'💻n={total_in_flight}/{self.worker_count}\n'
608 for worker in self.known_workers:
609 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
610 timings = self.finished_bundle_timings_per_worker.get(
611 worker, math_utils.NumericPopulation()
614 qworker_median = None
617 qworker_median = timings.get_median()
618 qworker_p95 = timings.get_percentile(95)
619 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
623 ret += f' ...finished {count} total bundle(s) so far\n'
624 in_flight = len(self.in_flight_bundles_by_worker[worker])
626 ret += f' ...{in_flight} bundles currently in flight:\n'
627 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
628 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
629 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
630 if self.start_per_bundle[bundle_uuid] is not None:
631 sec = ts - self.start_per_bundle[bundle_uuid]
632 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
634 ret += f' {details} setting up / copying data...'
637 if qworker_p95 is not None:
638 if sec > qworker_p95:
639 ret += f'{bg("red")}>💻p95{reset()} '
640 if details is not None:
641 details.slower_than_local_p95 = True
643 if details is not None:
644 details.slower_than_local_p95 = False
646 if qall_p95 is not None:
648 ret += f'{bg("red")}>∀p95{reset()} '
649 if details is not None:
650 details.slower_than_global_p95 = True
652 details.slower_than_global_p95 = False
656 def periodic_dump(self, total_bundles_submitted: int) -> None:
657 assert self.lock.locked()
658 self.total_bundles_submitted = total_bundles_submitted
660 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
662 self.last_periodic_dump = ts
665 class RemoteWorkerSelectionPolicy(ABC):
666 """An interface definition of a policy for selecting a remote worker."""
669 self.workers: Optional[List[RemoteWorkerRecord]] = None
671 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
672 self.workers = workers
675 def is_worker_available(self) -> bool:
679 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
683 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
684 """A remote worker selector that uses weighted RNG."""
687 def is_worker_available(self) -> bool:
689 for worker in self.workers:
695 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
698 for worker in self.workers:
699 if worker.machine != machine_to_avoid:
701 for _ in range(worker.count * worker.weight):
702 grabbag.append(worker)
704 if len(grabbag) == 0:
706 'There are no available workers that avoid %s', machine_to_avoid
709 for worker in self.workers:
711 for _ in range(worker.count * worker.weight):
712 grabbag.append(worker)
714 if len(grabbag) == 0:
715 logger.warning('There are no available workers?!')
718 worker = random.sample(grabbag, 1)[0]
719 assert worker.count > 0
721 logger.debug('Selected worker %s', worker)
725 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
726 """A remote worker selector that just round robins."""
728 def __init__(self) -> None:
733 def is_worker_available(self) -> bool:
735 for worker in self.workers:
742 self, machine_to_avoid: str = None
743 ) -> Optional[RemoteWorkerRecord]:
747 worker = self.workers[x]
751 if x >= len(self.workers):
754 logger.debug('Selected worker %s', worker)
757 if x >= len(self.workers):
760 logger.warning('Unexpectedly could not find a worker, retrying...')
765 class RemoteExecutor(BaseExecutor):
766 """An executor that uses processes on remote machines to do work.
767 To do so, it requires that a pool of remote workers to be properly
768 configured. See instructions in
769 :class:`pyutils.parallelize.parallelize`.
771 Each machine in a worker pool has a *weight* and a *count*. A
772 *weight* captures the relative speed of a processor on that worker
773 and a *count* captures the number of synchronous tasks the worker
774 can accept (i.e. the number of cpus on the machine).
776 To dispatch work to a remote machine, this class pickles the code
777 to be executed remotely using `cloudpickle`. For that to work,
778 the remote machine should be running the same version of Python as
779 this machine, ideally in a virtual environment with the same
780 import libraries installed. Differences in operating system
781 and/or processor architecture don't seem to matter for most code,
786 Mismatches in Python version or in the version numbers of
787 third-party libraries between machines can cause problems
788 when trying to unpickle and run code remotely.
790 Work to be dispatched is represented in this code by creating a
791 "bundle". Each bundle is assigned to a remote worker based on
792 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
793 general, it attempts to load all workers in the pool and maximize
794 throughput. Once assigned to a remote worker, pickled code is
795 copied to that worker via `scp` and a remote command is issued via
796 `ssh` to execute a :file:`remote_worker.py` process on the remote
797 machine. This process unpickles the code, runs it, and produces a
798 result which is then copied back to the local machine (again via
799 `scp`) where it can be processed by local code.
801 You can and probably must override the path of
802 :file:`remote_worker.py` on your pool machines using the
803 `--remote_worker_helper_path` commandline argument (or by just
804 changing the default in code, see above in this file's code).
806 During remote work execution, this local machine acts as a
807 controller dispatching all work to the network, copying pickled
808 tasks out, and copying results back in. It may also be a worker
809 in the pool but do not underestimate the cost of being a
810 controller -- it takes some cpu and a lot of network bandwidth.
811 The work dispatcher logic attempts to detect when a controller is
812 also a worker and reduce its load.
814 Some redundancy and safety provisions are made when scheduling
815 tasks to the worker pool; e.g. slower than expected tasks have
816 redundant backups tasks created, especially if there are otherwise
817 idle workers. If a task fails repeatedly, the dispatcher consider
818 it poisoned and give up on it.
822 This executor probably only makes sense to use with
823 computationally expensive tasks such as jobs that will execute
824 for ~30 seconds or longer.
826 The network overhead and latency of copying work from the
827 controller (local) machine to the remote workers and copying
828 results back again is relatively high. Especially at startup,
829 the network can become a bottleneck. Future versions of this
830 code may attempt to split the responsibility of being a
831 controller (distributing work to pool machines).
833 Instructions for how to set this up are provided in
834 :class:`pyutils.parallelize.parallelize`.
836 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
842 workers: List[RemoteWorkerRecord],
843 policy: RemoteWorkerSelectionPolicy,
847 workers: A list of remote workers we can call on to do tasks.
848 policy: A policy for selecting remote workers for tasks.
852 self.workers = workers
854 self.worker_count = 0
855 for worker in self.workers:
856 self.worker_count += worker.count
857 if self.worker_count <= 0:
858 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
860 raise RemoteExecutorException(msg)
861 self.policy.register_worker_pool(self.workers)
862 self.cv = threading.Condition()
864 'Creating %d local threads, one per remote worker.', self.worker_count
866 self._helper_executor = fut.ThreadPoolExecutor(
867 thread_name_prefix="remote_executor_helper",
868 max_workers=self.worker_count,
870 self.status = RemoteExecutorStatus(self.worker_count)
871 self.total_bundles_submitted = 0
872 self.backup_lock = threading.Lock()
873 self.last_backup = None
875 self.heartbeat_thread,
876 self.heartbeat_stop_event,
877 ) = self._run_periodic_heartbeat()
878 self.already_shutdown = False
881 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
883 We create a background thread to invoke :meth:`_heartbeat` regularly
884 while we are scheduling work. It does some accounting such as
885 looking for slow bundles to tag for backup creation, checking for
886 unexpected failures, and printing a fancy message on stdout.
888 while not stop_event.is_set():
890 logger.debug('Running periodic heartbeat code...')
892 logger.debug('Periodic heartbeat thread shutting down.')
894 def _heartbeat(self) -> None:
895 # Note: this is invoked on a background thread, not an
896 # executor thread. Be careful what you do with it b/c it
897 # needs to get back and dump status again periodically.
898 with self.status.lock:
899 self.status.periodic_dump(self.total_bundles_submitted)
901 # Look for bundles to reschedule via executor.submit
902 if config.config['executors_schedule_remote_backups']:
903 self._maybe_schedule_backup_bundles()
905 def _maybe_schedule_backup_bundles(self):
906 """Maybe schedule backup bundles if we see a very slow bundle."""
908 assert self.status.lock.locked()
909 num_done = len(self.status.finished_bundle_timings)
910 num_idle_workers = self.worker_count - self.task_count
914 and num_idle_workers > 0
915 and (self.last_backup is None or (now - self.last_backup > 9.0))
916 and self.backup_lock.acquire(blocking=False)
919 assert self.backup_lock.locked()
921 bundle_to_backup = None
926 ) in self.status.in_flight_bundles_by_worker.items():
928 # Prefer to schedule backups of bundles running on
931 for record in self.workers:
932 if worker.machine == record.machine:
933 base_score = float(record.weight)
934 base_score = 1.0 / base_score
936 base_score = int(base_score)
939 for uuid in bundle_uuids:
940 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
943 and bundle.src_bundle is None
944 and bundle.backup_bundles is not None
948 # Schedule backups of bundles running
949 # longer; especially those that are
951 start_ts = self.status.start_per_bundle[uuid]
952 if start_ts is not None:
953 runtime = now - start_ts
956 'score[%s] => %.1f # latency boost', bundle, score
959 if bundle.slower_than_local_p95:
962 'score[%s] => %.1f # >worker p95',
967 if bundle.slower_than_global_p95:
970 'score[%s] => %.1f # >global p95',
975 # Prefer backups of bundles that don't
976 # have backups already.
977 backup_count = len(bundle.backup_bundles)
978 if backup_count == 0:
980 elif backup_count == 1:
982 elif backup_count == 2:
987 'score[%s] => %.1f # {backup_count} dup backup factor',
993 best_score is None or score > best_score
995 bundle_to_backup = bundle
996 assert bundle is not None
997 assert bundle.backup_bundles is not None
998 assert bundle.src_bundle is None
1001 # Note: this is all still happening on the heartbeat
1002 # runner thread. That's ok because
1003 # _schedule_backup_for_bundle uses the executor to
1004 # submit the bundle again which will cause it to be
1005 # picked up by a worker thread and allow this thread
1006 # to return to run future heartbeats.
1007 if bundle_to_backup is not None:
1008 self.last_backup = now
1010 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1014 self._schedule_backup_for_bundle(bundle_to_backup)
1016 self.backup_lock.release()
1018 def _is_worker_available(self) -> bool:
1019 """Is there a worker available currently?"""
1020 return self.policy.is_worker_available()
1022 def _acquire_worker(
1023 self, machine_to_avoid: str = None
1024 ) -> Optional[RemoteWorkerRecord]:
1025 """Try to acquire a worker."""
1026 return self.policy.acquire_worker(machine_to_avoid)
1028 def _find_available_worker_or_block(
1029 self, machine_to_avoid: str = None
1030 ) -> RemoteWorkerRecord:
1031 """Find a worker or block until one becomes available."""
1033 while not self._is_worker_available():
1035 worker = self._acquire_worker(machine_to_avoid)
1036 if worker is not None:
1038 msg = "We should never reach this point in the code"
1039 logger.critical(msg)
1040 raise Exception(msg)
1042 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1043 """Release a previously acquired worker."""
1044 worker = bundle.worker
1045 assert worker is not None
1046 logger.debug('Released worker %s', worker)
1047 self.status.record_release_worker(
1055 self.adjust_task_count(-1)
1057 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1058 """See if a particular bundle is cancelled. Do not block."""
1059 with self.status.lock:
1060 if bundle.is_cancelled.wait(timeout=0.0):
1061 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1062 bundle.was_cancelled = True
1066 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1067 """Find a worker for bundle or block until one is available."""
1069 self.adjust_task_count(+1)
1071 controller = bundle.controller
1072 avoid_machine = override_avoid_machine
1073 is_original = bundle.src_bundle is None
1075 # Try not to schedule a backup on the same host as the original.
1076 if avoid_machine is None and bundle.src_bundle is not None:
1077 avoid_machine = bundle.src_bundle.machine
1079 while worker is None:
1080 worker = self._find_available_worker_or_block(avoid_machine)
1081 assert worker is not None
1083 # Ok, found a worker.
1084 bundle.worker = worker
1085 machine = bundle.machine = worker.machine
1086 username = bundle.username = worker.username
1087 self.status.record_acquire_worker(worker, uuid)
1088 logger.debug('%s: Running bundle on %s...', bundle, worker)
1090 # Before we do any work, make sure the bundle is still viable.
1091 # It may have been some time between when it was submitted and
1092 # now due to lack of worker availability and someone else may
1093 # have already finished it.
1094 if self._check_if_cancelled(bundle):
1096 return self._process_work_result(bundle)
1097 except Exception as e:
1099 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1101 self._release_worker(bundle)
1103 # Weird. We are the original owner of this
1104 # bundle. For it to have been cancelled, a backup
1105 # must have already started and completed before
1106 # we even for started. Moreover, the backup says
1107 # it is done but we can't find the results it
1108 # should have copied over. Reschedule the whole
1112 '%s: We are the original owner thread and yet there are '
1113 'no results for this bundle. This is unexpected and bad.',
1116 return self._emergency_retry_nasty_bundle(bundle)
1118 # We're a backup and our bundle is cancelled
1119 # before we even got started. Do nothing and let
1120 # the original bundle's thread worry about either
1121 # finding the results or complaining about it.
1124 # Send input code / data to worker machine if it's not local.
1125 if controller not in machine:
1128 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1130 start_ts = time.time()
1131 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1133 xfer_latency = time.time() - start_ts
1135 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1137 except Exception as e:
1138 self._release_worker(bundle)
1140 # Weird. We tried to copy the code to the worker
1141 # and it failed... And we're the original bundle.
1145 "%s: Failed to send instructions to the worker machine?! "
1146 "This is not expected; we\'re the original bundle so this shouldn\'t "
1147 "be a race condition. Attempting an emergency retry...",
1150 return self._emergency_retry_nasty_bundle(bundle)
1152 # This is actually expected; we're a backup.
1153 # There's a race condition where someone else
1154 # already finished the work and removed the source
1155 # code_file before we could copy it. Ignore.
1157 '%s: Failed to send instructions to the worker machine... '
1158 'We\'re a backup and this may be caused by the original (or '
1159 'some other backup) already finishing this work. Ignoring.',
1164 # Kick off the work. Note that if this fails we let
1165 # _wait_for_process deal with it.
1166 self.status.record_processing_began(uuid)
1167 helper_path = config.config['remote_worker_helper_path']
1169 f'{SSH} {bundle.username}@{bundle.machine} '
1170 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1173 '%s: Executing %s in the background to kick off work...', bundle, cmd
1175 p = cmd_in_background(cmd, silent=True)
1178 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1180 return self._wait_for_process(p, bundle, 0)
1182 def _wait_for_process(
1183 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1185 """At this point we've copied the bundle's pickled code to the remote
1186 worker and started an ssh process that should be invoking the
1187 remote worker to have it execute the user's code. See how
1188 that's going and wait for it to complete or fail. Note that
1189 this code is recursive: there are codepaths where we decide to
1190 stop waiting for an ssh process (because another backup seems
1191 to have finished) but then fail to fetch or parse the results
1192 from that backup and thus call ourselves to continue waiting
1193 on an active ssh process. This is the purpose of the depth
1194 argument: to curtail potential infinite recursion by giving up
1198 p: the Popen record of the ssh job
1199 bundle: the bundle of work being executed remotely
1200 depth: how many retries we've made so far. Starts at zero.
1204 machine = bundle.machine
1205 assert p is not None
1206 pid = p.pid # pid of the ssh process
1209 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1213 self._release_worker(bundle)
1214 return self._emergency_retry_nasty_bundle(bundle)
1216 # Spin until either the ssh job we scheduled finishes the
1217 # bundle or some backup worker signals that they finished it
1221 p.wait(timeout=0.25)
1222 except subprocess.TimeoutExpired:
1223 if self._check_if_cancelled(bundle):
1225 '%s: looks like another worker finished bundle...', bundle
1229 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1233 # If we get here we believe the bundle is done; either the ssh
1234 # subprocess finished (hopefully successfully) or we noticed
1235 # that some other worker seems to have completed the bundle
1236 # before us and we're bailing out.
1238 ret = self._process_work_result(bundle)
1239 if ret is not None and p is not None:
1243 # Something went wrong; e.g. we could not copy the results
1244 # back, cleanup after ourselves on the remote machine, or
1245 # unpickle the results we got from the remove machine. If we
1246 # still have an active ssh subprocess, keep waiting on it.
1247 # Otherwise, time for an emergency reschedule.
1248 except Exception as e:
1250 logger.error('%s: Something unexpected just happened...', bundle)
1253 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1256 return self._wait_for_process(p, bundle, depth + 1)
1258 self._release_worker(bundle)
1259 return self._emergency_retry_nasty_bundle(bundle)
1261 def _process_work_result(self, bundle: BundleDetails) -> Any:
1262 """A bundle seems to be completed. Check on the results."""
1264 with self.status.lock:
1265 is_original = bundle.src_bundle is None
1266 was_cancelled = bundle.was_cancelled
1267 username = bundle.username
1268 machine = bundle.machine
1269 result_file = bundle.result_file
1270 code_file = bundle.code_file
1272 # Whether original or backup, if we finished first we must
1273 # fetch the results if the computation happened on a
1275 bundle.end_ts = time.time()
1276 if not was_cancelled:
1277 assert bundle.machine is not None
1278 if bundle.controller not in bundle.machine:
1279 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1281 "%s: Fetching results back from %s@%s via %s",
1288 # If either of these throw they are handled in
1289 # _wait_for_process.
1294 except Exception as e:
1301 # Cleanup remote /tmp files.
1303 f'{SSH} {username}@{machine}'
1304 f' "/bin/rm -f {code_file} {result_file}"'
1307 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1309 dur = bundle.end_ts - bundle.start_ts
1310 self.histogram.add_item(dur)
1312 # Only the original worker should unpickle the file contents
1313 # though since it's the only one whose result matters. The
1314 # original is also the only job that may delete result_file
1315 # from disk. Note that the original may have been cancelled
1316 # if one of the backups finished first; it still must read the
1317 # result from disk. It still does that here with is_cancelled
1320 logger.debug("%s: Unpickling %s.", bundle, result_file)
1322 with open(result_file, 'rb') as rb:
1323 serialized = rb.read()
1324 result = cloudpickle.loads(serialized)
1325 except Exception as e:
1327 logger.error('Failed to load %s... this is bad news.', result_file)
1328 self._release_worker(bundle)
1330 # Re-raise the exception; the code in _wait_for_process may
1331 # decide to _emergency_retry_nasty_bundle here.
1333 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1334 os.remove(result_file)
1335 os.remove(code_file)
1337 # Notify any backups that the original is done so they
1338 # should stop ASAP. Do this whether or not we
1339 # finished first since there could be more than one
1341 if bundle.backup_bundles is not None:
1342 for backup in bundle.backup_bundles:
1344 '%s: Notifying backup %s that it\'s cancelled',
1348 backup.is_cancelled.set()
1350 # This is a backup job and, by now, we have already fetched
1351 # the bundle results.
1353 # Backup results don't matter, they just need to leave the
1354 # result file in the right place for their originals to
1355 # read/unpickle later.
1358 # Tell the original to stop if we finished first.
1359 if not was_cancelled:
1360 orig_bundle = bundle.src_bundle
1361 assert orig_bundle is not None
1363 '%s: Notifying original %s we beat them to it.',
1367 orig_bundle.is_cancelled.set()
1368 self._release_worker(bundle, was_cancelled=was_cancelled)
1371 def _create_original_bundle(self, pickle, function_name: str):
1372 """Creates a bundle that is not a backup of any other bundle but
1373 rather represents a user task.
1376 uuid = string_utils.generate_uuid(omit_dashes=True)
1377 code_file = f'/tmp/{uuid}.code.bin'
1378 result_file = f'/tmp/{uuid}.result.bin'
1380 logger.debug('Writing pickled code to %s', code_file)
1381 with open(code_file, 'wb') as wb:
1384 bundle = BundleDetails(
1385 pickled_code=pickle,
1387 function_name=function_name,
1391 controller=platform.node(),
1392 code_file=code_file,
1393 result_file=result_file,
1395 start_ts=time.time(),
1397 slower_than_local_p95=False,
1398 slower_than_global_p95=False,
1400 is_cancelled=threading.Event(),
1401 was_cancelled=False,
1405 self.status.record_bundle_details(bundle)
1406 logger.debug('%s: Created an original bundle', bundle)
1409 def _create_backup_bundle(self, src_bundle: BundleDetails):
1410 """Creates a bundle that is a backup of another bundle that is
1411 running too slowly."""
1413 assert self.status.lock.locked()
1414 assert src_bundle.backup_bundles is not None
1415 n = len(src_bundle.backup_bundles)
1416 uuid = src_bundle.uuid + f'_backup#{n}'
1418 backup_bundle = BundleDetails(
1419 pickled_code=src_bundle.pickled_code,
1421 function_name=src_bundle.function_name,
1425 controller=src_bundle.controller,
1426 code_file=src_bundle.code_file,
1427 result_file=src_bundle.result_file,
1429 start_ts=time.time(),
1431 slower_than_local_p95=False,
1432 slower_than_global_p95=False,
1433 src_bundle=src_bundle,
1434 is_cancelled=threading.Event(),
1435 was_cancelled=False,
1436 backup_bundles=None, # backup backups not allowed
1439 src_bundle.backup_bundles.append(backup_bundle)
1440 self.status.record_bundle_details_already_locked(backup_bundle)
1441 logger.debug('%s: Created a backup bundle', backup_bundle)
1442 return backup_bundle
1444 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1445 """Schedule a backup of src_bundle."""
1447 assert self.status.lock.locked()
1448 assert src_bundle is not None
1449 backup_bundle = self._create_backup_bundle(src_bundle)
1451 '%s/%s: Scheduling backup for execution...',
1453 backup_bundle.function_name,
1455 self._helper_executor.submit(self._launch, backup_bundle)
1457 # Results from backups don't matter; if they finish first
1458 # they will move the result_file to this machine and let
1459 # the original pick them up and unpickle them (and return
1462 def _emergency_retry_nasty_bundle(
1463 self, bundle: BundleDetails
1464 ) -> Optional[fut.Future]:
1465 """Something unexpectedly failed with bundle. Either retry it
1466 from the beginning or throw in the towel and give up on it."""
1468 is_original = bundle.src_bundle is None
1469 bundle.worker = None
1470 avoid_last_machine = bundle.machine
1471 bundle.machine = None
1472 bundle.username = None
1473 bundle.failure_count += 1
1479 if bundle.failure_count > retry_limit:
1481 '%s: Tried this bundle too many times already (%dx); giving up.',
1486 raise RemoteExecutorException(
1487 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1490 '%s: At least it\'s only a backup; better luck with the others.',
1495 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1498 return self._launch(bundle, avoid_last_machine)
1501 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502 """Submit work to be done. This is the user entry point of this
1504 if self.already_shutdown:
1505 raise Exception('Submitted work after shutdown.')
1506 pickle = _make_cloud_pickle(function, *args, **kwargs)
1507 bundle = self._create_original_bundle(pickle, function.__name__)
1508 self.total_bundles_submitted += 1
1509 return self._helper_executor.submit(self._launch, bundle)
1512 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513 """Shutdown the executor."""
1514 if not self.already_shutdown:
1515 logging.debug('Shutting down RemoteExecutor %s', self.title)
1516 self.heartbeat_stop_event.set()
1517 self.heartbeat_thread.join()
1518 self._helper_executor.shutdown(wait)
1520 print(self.histogram.__repr__(label_formatter='%ds'))
1521 self.already_shutdown = True
1524 class RemoteWorkerPoolProvider:
1526 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1530 @persistent.persistent_autoloaded_singleton() # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1534 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535 self.remote_worker_pool = []
1536 for record in json_remote_worker_pool['remote_worker_records']:
1537 self.remote_worker_pool.append(
1538 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1540 assert len(self.remote_worker_pool) > 0
1543 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544 return self.remote_worker_pool
1547 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548 return self.remote_worker_pool
1552 def get_filename() -> str:
1553 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1557 def should_we_load_data(filename: str) -> bool:
1562 def should_we_save_data(filename: str) -> bool:
1567 class DefaultExecutors(object):
1568 """A container for a default thread, process and remote executor.
1569 These are not created until needed and we take care to clean up
1570 before process exit automatically for the caller's convenience.
1571 Instead of creating your own executor, consider using the one
1572 from this pool. e.g.::
1574 @par.parallelize(method=par.Method.PROCESS)
1576 solutions: List[Work],
1583 def start_do_work(all_work: List[Work]):
1585 logger.debug('Sharding work into groups of 10.')
1586 for subset in list_utils.shard(all_work, 10):
1587 shards.append([x for x in subset])
1589 logger.debug('Kicking off helper pool.')
1591 for n, shard in enumerate(shards):
1594 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1597 smart_future.wait_all(results)
1599 # Note: if you forget to do this it will clean itself up
1600 # during program termination including tearing down any
1601 # active ssh connections.
1602 executors.DefaultExecutors().process_pool().shutdown()
1606 self.thread_executor: Optional[ThreadExecutor] = None
1607 self.process_executor: Optional[ProcessExecutor] = None
1608 self.remote_executor: Optional[RemoteExecutor] = None
1611 def _ping(host) -> bool:
1612 logger.debug('RUN> ping -c 1 %s', host)
1615 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1621 def thread_pool(self) -> ThreadExecutor:
1622 if self.thread_executor is None:
1623 self.thread_executor = ThreadExecutor()
1624 return self.thread_executor
1626 def process_pool(self) -> ProcessExecutor:
1627 if self.process_executor is None:
1628 self.process_executor = ProcessExecutor()
1629 return self.process_executor
1631 def remote_pool(self) -> RemoteExecutor:
1632 if self.remote_executor is None:
1633 logger.info('Looking for some helper machines...')
1634 provider = ConfigRemoteWorkerPoolProvider()
1635 all_machines = provider.get_remote_workers()
1638 # Make sure we can ping each machine.
1639 for record in all_machines:
1640 if self._ping(record.machine):
1641 logger.info('%s is alive / responding to pings', record.machine)
1644 # The controller machine has a lot to do; go easy on it.
1646 if record.machine == platform.node() and record.count > 1:
1647 logger.info('Reducing workload for %s.', record.machine)
1648 record.count = max(int(record.count / 2), 1)
1650 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651 policy.register_worker_pool(pool)
1652 self.remote_executor = RemoteExecutor(pool, policy)
1653 return self.remote_executor
1655 def shutdown(self) -> None:
1656 if self.thread_executor is not None:
1657 self.thread_executor.shutdown(wait=True, quiet=True)
1658 self.thread_executor = None
1659 if self.process_executor is not None:
1660 self.process_executor.shutdown(wait=True, quiet=True)
1661 self.process_executor = None
1662 if self.remote_executor is not None:
1663 self.remote_executor.shutdown(wait=True, quiet=True)
1664 self.remote_executor = None