2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
7 This module defines a :class:`BaseExecutor` interface and three
10 - :class:`ThreadExecutor`
11 - :class:`ProcessExecutor`
12 - :class:`RemoteExecutor`
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work. Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code. But it's good for
18 work that is mostly I/O bound.
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network. You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed. It has the added benefit of being automatically
33 cleaned up at process termination time.
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
39 from __future__ import annotations
41 import concurrent.futures as fut
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass
53 from typing import Any, Callable, Dict, List, Optional, Set
55 import cloudpickle # type: ignore
56 from overrides import overrides
58 import pyutils.types.histogram as hist
67 from pyutils.ansi import bg, fg, reset, underline
68 from pyutils.decorator_utils import singleton
69 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
70 from pyutils.parallelize.thread_utils import background_thread
71 from pyutils.types import type_utils
73 logger = logging.getLogger(__name__)
75 parser = config.add_commandline_args(
76 f"Executors ({__file__})", "Args related to processing executors."
79 '--executors_threadpool_size',
82 help='Number of threads in the default threadpool, leave unset for default',
86 '--executors_processpool_size',
89 help='Number of processes in the default processpool, leave unset for default',
93 '--executors_schedule_remote_backups',
95 action=argparse_utils.ActionNoYes,
96 help='Should we schedule duplicative backup work if a remote bundle is slow',
99 '--executors_max_bundle_failures',
103 help='Maximum number of failures before giving up on a bundle',
106 '--remote_worker_records_file',
109 help='Path of the remote worker records file (JSON)',
110 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
113 '--remote_worker_helper_path',
115 metavar='PATH_TO_REMOTE_WORKER_PY',
116 help='Path to remote_worker.py on remote machines',
117 default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
121 SSH = '/usr/bin/ssh -oForwardX11=no'
122 SCP = '/usr/bin/scp -C'
125 def _make_cloud_pickle(fun, *args, **kwargs):
126 """Internal helper to create cloud pickles."""
127 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
128 return cloudpickle.dumps((fun, args, kwargs))
131 class BaseExecutor(ABC):
132 """The base executor interface definition. The interface for
133 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
134 :class:`ThreadExecutor`.
137 def __init__(self, *, title=''):
140 title: the name of this executor.
143 self.histogram = hist.SimpleHistogram(
144 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
149 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
150 """Submit work for the executor to do.
153 function: the Callable to be executed.
154 *args: the arguments to function
155 **kwargs: the arguments to function
158 A concurrent :class:`Future` representing the result of the
164 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
165 """Shutdown the executor.
168 wait: wait for the shutdown to complete before returning?
169 quiet: keep it quiet, please.
173 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
174 """Shutdown the executor and return True if the executor is idle
175 (i.e. there are no pending or active tasks). Return False
176 otherwise. Note: this should only be called by the launcher
180 quiet: keep it quiet, please.
183 True if the executor could be shut down because it has no
184 pending work, False otherwise.
186 if self.task_count == 0:
187 self.shutdown(wait=True, quiet=quiet)
191 def adjust_task_count(self, delta: int) -> None:
192 """Change the task count. Note: do not call this method from a
193 worker, it should only be called by the launcher process /
197 delta: the delta value by which to adjust task count.
199 self.task_count += delta
200 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
202 def get_task_count(self) -> int:
203 """Change the task count. Note: do not call this method from a
204 worker, it should only be called by the launcher process /
208 The executor's current task count.
210 return self.task_count
213 class ThreadExecutor(BaseExecutor):
214 """A threadpool executor. This executor uses Python threads to
215 schedule tasks. Note that, at least as of python3.10, because of
216 the global lock in the interpreter itself, these do not
217 parallelize very well so this class is useful mostly for non-CPU
220 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
223 def __init__(self, max_workers: Optional[int] = None):
226 max_workers: maximum number of threads to create in the pool.
230 if max_workers is not None:
231 workers = max_workers
232 elif 'executors_threadpool_size' in config.config:
233 workers = config.config['executors_threadpool_size']
234 if workers is not None:
235 logger.debug('Creating threadpool executor with %d workers', workers)
237 logger.debug('Creating a default sized threadpool executor')
238 self._thread_pool_executor = fut.ThreadPoolExecutor(
239 max_workers=workers, thread_name_prefix="thread_executor_helper"
241 self.already_shutdown = False
243 # This is run on a different thread; do not adjust task count here.
245 def _run_local_bundle(fun, *args, **kwargs):
246 logger.debug("Running local bundle at %s", fun.__name__)
247 result = fun(*args, **kwargs)
251 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
252 if self.already_shutdown:
253 raise Exception('Submitted work after shutdown.')
254 self.adjust_task_count(+1)
256 newargs.append(function)
260 result = self._thread_pool_executor.submit(
261 ThreadExecutor._run_local_bundle, *newargs, **kwargs
263 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
264 result.add_done_callback(lambda _: self.adjust_task_count(-1))
268 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
269 if not self.already_shutdown:
270 logger.debug('Shutting down threadpool executor %s', self.title)
271 self._thread_pool_executor.shutdown(wait)
273 print(self.histogram.__repr__(label_formatter='%ds'))
274 self.already_shutdown = True
277 class ProcessExecutor(BaseExecutor):
278 """An executor which runs tasks in child processes.
280 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
283 def __init__(self, max_workers=None):
286 max_workers: the max number of worker processes to create.
290 if max_workers is not None:
291 workers = max_workers
292 elif 'executors_processpool_size' in config.config:
293 workers = config.config['executors_processpool_size']
294 if workers is not None:
295 logger.debug('Creating processpool executor with %d workers.', workers)
297 logger.debug('Creating a default sized processpool executor')
298 self._process_executor = fut.ProcessPoolExecutor(
301 self.already_shutdown = False
303 # This is run in another process; do not adjust task count here.
305 def _run_cloud_pickle(pickle):
306 fun, args, kwargs = cloudpickle.loads(pickle)
307 logger.debug("Running pickled bundle at %s", fun.__name__)
308 result = fun(*args, **kwargs)
312 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
313 if self.already_shutdown:
314 raise Exception('Submitted work after shutdown.')
316 self.adjust_task_count(+1)
317 pickle = _make_cloud_pickle(function, *args, **kwargs)
318 result = self._process_executor.submit(
319 ProcessExecutor._run_cloud_pickle, pickle
321 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
322 result.add_done_callback(lambda _: self.adjust_task_count(-1))
326 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
327 if not self.already_shutdown:
328 logger.debug('Shutting down processpool executor %s', self.title)
329 self._process_executor.shutdown(wait)
331 print(self.histogram.__repr__(label_formatter='%ds'))
332 self.already_shutdown = True
334 def __getstate__(self):
335 state = self.__dict__.copy()
336 state['_process_executor'] = None
340 class RemoteExecutorException(Exception):
341 """Thrown when a bundle cannot be executed despite several retries."""
347 class RemoteWorkerRecord:
348 """A record of info about a remote worker."""
351 """Username we can ssh into on this machine to run work."""
354 """Machine address / name."""
357 """Relative probability for the weighted policy to select this
358 machine for scheduling work."""
361 """If this machine is selected, what is the maximum number of task
362 that it can handle?"""
365 return hash((self.username, self.machine))
368 return f'{self.username}@{self.machine}'
373 """All info necessary to define some unit of work that needs to be
374 done, where it is being run, its state, whether it is an original
375 bundle of a backup bundle, how many times it has failed, etc...
379 """The code to run, cloud pickled"""
382 """A unique identifier"""
385 """The name of the function we pickled"""
387 worker: Optional[RemoteWorkerRecord]
388 """The remote worker running this bundle or None if none (yet)"""
390 username: Optional[str]
391 """The remote username running this bundle or None if none (yet)"""
393 machine: Optional[str]
394 """The remote machine running this bundle or None if none (yet)"""
397 """The controller machine"""
400 """A unique filename to hold the work to be done"""
403 """Where the results should be placed / read from"""
406 """The process id of the local subprocess watching the ssh connection
407 to the remote machine"""
415 slower_than_local_p95: bool
416 """Currently slower then 95% of other bundles on remote host"""
418 slower_than_global_p95: bool
419 """Currently slower than 95% of other bundles globally"""
421 src_bundle: Optional[BundleDetails]
422 """If this is a backup bundle, this points to the original bundle
423 that it's backing up. None otherwise."""
425 is_cancelled: threading.Event
426 """An event that can be signaled to indicate this bundle is cancelled.
427 This is set when another copy (backup or original) of this work has
428 completed successfully elsewhere."""
431 """True if this bundle was cancelled, False if it finished normally"""
433 backup_bundles: Optional[List[BundleDetails]]
434 """If we've created backups of this bundle, this is the list of them"""
437 """How many times has this bundle failed already?"""
441 if uuid[-9:-2] == '_backup':
443 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
447 # We colorize the uuid based on some bits from it to make them
448 # stand out in the logging and help a reader correlate log messages
449 # related to the same bundle.
456 fg('marigold yellow'),
459 fg('cornflower blue'),
460 fg('turquoise blue'),
462 fg('lavender purple'),
465 c = colorz[int(uuid[-2:], 16) % len(colorz)]
467 self.function_name if self.function_name is not None else 'nofname'
469 machine = self.machine if self.machine is not None else 'nomachine'
470 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
473 class RemoteExecutorStatus:
474 """A status 'scoreboard' for a remote executor tracking various
475 metrics and able to render a periodic dump of global state.
478 def __init__(self, total_worker_count: int) -> None:
481 total_worker_count: number of workers in the pool
483 self.worker_count: int = total_worker_count
484 self.known_workers: Set[RemoteWorkerRecord] = set()
485 self.start_time: float = time.time()
486 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
487 self.end_per_bundle: Dict[str, float] = defaultdict(float)
488 self.finished_bundle_timings_per_worker: Dict[
489 RemoteWorkerRecord, math_utils.NumericPopulation
491 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
492 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
493 self.finished_bundle_timings: math_utils.NumericPopulation = (
494 math_utils.NumericPopulation()
496 self.last_periodic_dump: Optional[float] = None
497 self.total_bundles_submitted: int = 0
499 # Protects reads and modification using self. Also used
500 # as a memory fence for modifications to bundle.
501 self.lock: threading.Lock = threading.Lock()
503 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
504 """Record that bundle with uuid is assigned to a particular worker.
507 worker: the record of the worker to which uuid is assigned
508 uuid: the uuid of a bundle that has been assigned to a worker
511 self.record_acquire_worker_already_locked(worker, uuid)
513 def record_acquire_worker_already_locked(
514 self, worker: RemoteWorkerRecord, uuid: str
516 """Same as above but an entry point that doesn't acquire the lock
517 for codepaths where it's already held."""
518 assert self.lock.locked()
519 self.known_workers.add(worker)
520 self.start_per_bundle[uuid] = None
521 x = self.in_flight_bundles_by_worker.get(worker, set())
523 self.in_flight_bundles_by_worker[worker] = x
525 def record_bundle_details(self, details: BundleDetails) -> None:
526 """Register the details about a bundle of work."""
528 self.record_bundle_details_already_locked(details)
530 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
531 """Same as above but for codepaths that already hold the lock."""
532 assert self.lock.locked()
533 self.bundle_details_by_uuid[details.uuid] = details
535 def record_release_worker(
537 worker: RemoteWorkerRecord,
541 """Record that a bundle has released a worker."""
543 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
545 def record_release_worker_already_locked(
547 worker: RemoteWorkerRecord,
551 """Same as above but for codepaths that already hold the lock."""
552 assert self.lock.locked()
554 self.end_per_bundle[uuid] = ts
555 self.in_flight_bundles_by_worker[worker].remove(uuid)
556 if not was_cancelled:
557 start = self.start_per_bundle[uuid]
558 assert start is not None
559 bundle_latency = ts - start
560 x = self.finished_bundle_timings_per_worker.get(
561 worker, math_utils.NumericPopulation()
563 x.add_number(bundle_latency)
564 self.finished_bundle_timings_per_worker[worker] = x
565 self.finished_bundle_timings.add_number(bundle_latency)
567 def record_processing_began(self, uuid: str):
568 """Record when work on a bundle begins."""
570 self.start_per_bundle[uuid] = time.time()
572 def total_in_flight(self) -> int:
573 """How many bundles are in flight currently?"""
574 assert self.lock.locked()
576 for worker in self.known_workers:
577 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
578 return total_in_flight
580 def total_idle(self) -> int:
581 """How many idle workers are there currently?"""
582 assert self.lock.locked()
583 return self.worker_count - self.total_in_flight()
586 assert self.lock.locked()
588 total_finished = len(self.finished_bundle_timings)
589 total_in_flight = self.total_in_flight()
590 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
592 if len(self.finished_bundle_timings) > 1:
593 qall_median = self.finished_bundle_timings.get_median()
594 qall_p95 = self.finished_bundle_timings.get_percentile(95)
596 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
597 f'✅={total_finished}/{self.total_bundles_submitted}, '
598 f'💻n={total_in_flight}/{self.worker_count}\n'
602 f'⏱={ts-self.start_time:.1f}s, '
603 f'✅={total_finished}/{self.total_bundles_submitted}, '
604 f'💻n={total_in_flight}/{self.worker_count}\n'
607 for worker in self.known_workers:
608 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
609 timings = self.finished_bundle_timings_per_worker.get(
610 worker, math_utils.NumericPopulation()
613 qworker_median = None
616 qworker_median = timings.get_median()
617 qworker_p95 = timings.get_percentile(95)
618 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
622 ret += f' ...finished {count} total bundle(s) so far\n'
623 in_flight = len(self.in_flight_bundles_by_worker[worker])
625 ret += f' ...{in_flight} bundles currently in flight:\n'
626 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
627 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
628 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
629 if self.start_per_bundle[bundle_uuid] is not None:
630 sec = ts - self.start_per_bundle[bundle_uuid]
631 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
633 ret += f' {details} setting up / copying data...'
636 if qworker_p95 is not None:
637 if sec > qworker_p95:
638 ret += f'{bg("red")}>💻p95{reset()} '
639 if details is not None:
640 details.slower_than_local_p95 = True
642 if details is not None:
643 details.slower_than_local_p95 = False
647 ret += f'{bg("red")}>∀p95{reset()} '
648 if details is not None:
649 details.slower_than_global_p95 = True
651 details.slower_than_global_p95 = False
655 def periodic_dump(self, total_bundles_submitted: int) -> None:
656 assert self.lock.locked()
657 self.total_bundles_submitted = total_bundles_submitted
659 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
661 self.last_periodic_dump = ts
664 class RemoteWorkerSelectionPolicy(ABC):
665 """An interface definition of a policy for selecting a remote worker."""
668 self.workers: Optional[List[RemoteWorkerRecord]] = None
670 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
671 self.workers = workers
674 def is_worker_available(self) -> bool:
678 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
682 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
683 """A remote worker selector that uses weighted RNG."""
686 def is_worker_available(self) -> bool:
688 for worker in self.workers:
694 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
697 for worker in self.workers:
698 if worker.machine != machine_to_avoid:
700 for _ in range(worker.count * worker.weight):
701 grabbag.append(worker)
703 if len(grabbag) == 0:
705 'There are no available workers that avoid %s', machine_to_avoid
708 for worker in self.workers:
710 for _ in range(worker.count * worker.weight):
711 grabbag.append(worker)
713 if len(grabbag) == 0:
714 logger.warning('There are no available workers?!')
717 worker = random.sample(grabbag, 1)[0]
718 assert worker.count > 0
720 logger.debug('Selected worker %s', worker)
724 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
725 """A remote worker selector that just round robins."""
727 def __init__(self) -> None:
732 def is_worker_available(self) -> bool:
734 for worker in self.workers:
741 self, machine_to_avoid: str = None
742 ) -> Optional[RemoteWorkerRecord]:
746 worker = self.workers[x]
750 if x >= len(self.workers):
753 logger.debug('Selected worker %s', worker)
756 if x >= len(self.workers):
759 logger.warning('Unexpectedly could not find a worker, retrying...')
764 class RemoteExecutor(BaseExecutor):
765 """An executor that uses processes on remote machines to do work.
766 To do so, it requires that a pool of remote workers to be properly
767 configured. See instructions in
768 :class:`pyutils.parallelize.parallelize`.
770 Each machine in a worker pool has a *weight* and a *count*. A
771 *weight* captures the relative speed of a processor on that worker
772 and a *count* captures the number of synchronous tasks the worker
773 can accept (i.e. the number of cpus on the machine).
775 To dispatch work to a remote machine, this class pickles the code
776 to be executed remotely using `cloudpickle`. For that to work,
777 the remote machine should be running the same version of Python as
778 this machine, ideally in a virtual environment with the same
779 import libraries installed. Differences in operating system
780 and/or processor architecture don't seem to matter for most code,
785 Mismatches in Python version or in the version numbers of
786 third-party libraries between machines can cause problems
787 when trying to unpickle and run code remotely.
789 Work to be dispatched is represented in this code by creating a
790 "bundle". Each bundle is assigned to a remote worker based on
791 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
792 general, it attempts to load all workers in the pool and maximize
793 throughput. Once assigned to a remote worker, pickled code is
794 copied to that worker via `scp` and a remote command is issued via
795 `ssh` to execute a :file:`remote_worker.py` process on the remote
796 machine. This process unpickles the code, runs it, and produces a
797 result which is then copied back to the local machine (again via
798 `scp`) where it can be processed by local code.
800 You can and probably must override the path of
801 :file:`remote_worker.py` on your pool machines using the
802 `--remote_worker_helper_path` commandline argument (or by just
803 changing the default in code, see above in this file's code).
805 During remote work execution, this local machine acts as a
806 controller dispatching all work to the network, copying pickled
807 tasks out, and copying results back in. It may also be a worker
808 in the pool but do not underestimate the cost of being a
809 controller -- it takes some cpu and a lot of network bandwidth.
810 The work dispatcher logic attempts to detect when a controller is
811 also a worker and reduce its load.
813 Some redundancy and safety provisions are made when scheduling
814 tasks to the worker pool; e.g. slower than expected tasks have
815 redundant backups tasks created, especially if there are otherwise
816 idle workers. If a task fails repeatedly, the dispatcher consider
817 it poisoned and give up on it.
821 This executor probably only makes sense to use with
822 computationally expensive tasks such as jobs that will execute
823 for ~30 seconds or longer.
825 The network overhead and latency of copying work from the
826 controller (local) machine to the remote workers and copying
827 results back again is relatively high. Especially at startup,
828 the network can become a bottleneck. Future versions of this
829 code may attempt to split the responsibility of being a
830 controller (distributing work to pool machines).
832 Instructions for how to set this up are provided in
833 :class:`pyutils.parallelize.parallelize`.
835 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
841 workers: List[RemoteWorkerRecord],
842 policy: RemoteWorkerSelectionPolicy,
846 workers: A list of remote workers we can call on to do tasks.
847 policy: A policy for selecting remote workers for tasks.
851 self.workers = workers
853 self.worker_count = 0
854 for worker in self.workers:
855 self.worker_count += worker.count
856 if self.worker_count <= 0:
857 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
859 raise RemoteExecutorException(msg)
860 self.policy.register_worker_pool(self.workers)
861 self.cv = threading.Condition()
863 'Creating %d local threads, one per remote worker.', self.worker_count
865 self._helper_executor = fut.ThreadPoolExecutor(
866 thread_name_prefix="remote_executor_helper",
867 max_workers=self.worker_count,
869 self.status = RemoteExecutorStatus(self.worker_count)
870 self.total_bundles_submitted = 0
871 self.backup_lock = threading.Lock()
872 self.last_backup = None
874 self.heartbeat_thread,
875 self.heartbeat_stop_event,
876 ) = self._run_periodic_heartbeat()
877 self.already_shutdown = False
880 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
882 We create a background thread to invoke :meth:`_heartbeat` regularly
883 while we are scheduling work. It does some accounting such as
884 looking for slow bundles to tag for backup creation, checking for
885 unexpected failures, and printing a fancy message on stdout.
887 while not stop_event.is_set():
889 logger.debug('Running periodic heartbeat code...')
891 logger.debug('Periodic heartbeat thread shutting down.')
893 def _heartbeat(self) -> None:
894 # Note: this is invoked on a background thread, not an
895 # executor thread. Be careful what you do with it b/c it
896 # needs to get back and dump status again periodically.
897 with self.status.lock:
898 self.status.periodic_dump(self.total_bundles_submitted)
900 # Look for bundles to reschedule via executor.submit
901 if config.config['executors_schedule_remote_backups']:
902 self._maybe_schedule_backup_bundles()
904 def _maybe_schedule_backup_bundles(self):
905 """Maybe schedule backup bundles if we see a very slow bundle."""
907 assert self.status.lock.locked()
908 num_done = len(self.status.finished_bundle_timings)
909 num_idle_workers = self.worker_count - self.task_count
913 and num_idle_workers > 0
914 and (self.last_backup is None or (now - self.last_backup > 9.0))
915 and self.backup_lock.acquire(blocking=False)
918 assert self.backup_lock.locked()
920 bundle_to_backup = None
925 ) in self.status.in_flight_bundles_by_worker.items():
927 # Prefer to schedule backups of bundles running on
930 for record in self.workers:
931 if worker.machine == record.machine:
932 base_score = float(record.weight)
933 base_score = 1.0 / base_score
935 base_score = int(base_score)
938 for uuid in bundle_uuids:
939 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
942 and bundle.src_bundle is None
943 and bundle.backup_bundles is not None
947 # Schedule backups of bundles running
948 # longer; especially those that are
950 start_ts = self.status.start_per_bundle[uuid]
951 if start_ts is not None:
952 runtime = now - start_ts
955 'score[%s] => %.1f # latency boost', bundle, score
958 if bundle.slower_than_local_p95:
961 'score[%s] => %.1f # >worker p95',
966 if bundle.slower_than_global_p95:
969 'score[%s] => %.1f # >global p95',
974 # Prefer backups of bundles that don't
975 # have backups already.
976 backup_count = len(bundle.backup_bundles)
977 if backup_count == 0:
979 elif backup_count == 1:
981 elif backup_count == 2:
986 'score[%s] => %.1f # {backup_count} dup backup factor',
992 best_score is None or score > best_score
994 bundle_to_backup = bundle
995 assert bundle is not None
996 assert bundle.backup_bundles is not None
997 assert bundle.src_bundle is None
1000 # Note: this is all still happening on the heartbeat
1001 # runner thread. That's ok because
1002 # _schedule_backup_for_bundle uses the executor to
1003 # submit the bundle again which will cause it to be
1004 # picked up by a worker thread and allow this thread
1005 # to return to run future heartbeats.
1006 if bundle_to_backup is not None:
1007 self.last_backup = now
1009 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1013 self._schedule_backup_for_bundle(bundle_to_backup)
1015 self.backup_lock.release()
1017 def _is_worker_available(self) -> bool:
1018 """Is there a worker available currently?"""
1019 return self.policy.is_worker_available()
1021 def _acquire_worker(
1022 self, machine_to_avoid: str = None
1023 ) -> Optional[RemoteWorkerRecord]:
1024 """Try to acquire a worker."""
1025 return self.policy.acquire_worker(machine_to_avoid)
1027 def _find_available_worker_or_block(
1028 self, machine_to_avoid: str = None
1029 ) -> RemoteWorkerRecord:
1030 """Find a worker or block until one becomes available."""
1032 while not self._is_worker_available():
1034 worker = self._acquire_worker(machine_to_avoid)
1035 if worker is not None:
1037 msg = "We should never reach this point in the code"
1038 logger.critical(msg)
1039 raise Exception(msg)
1041 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1042 """Release a previously acquired worker."""
1043 worker = bundle.worker
1044 assert worker is not None
1045 logger.debug('Released worker %s', worker)
1046 self.status.record_release_worker(
1054 self.adjust_task_count(-1)
1056 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1057 """See if a particular bundle is cancelled. Do not block."""
1058 with self.status.lock:
1059 if bundle.is_cancelled.wait(timeout=0.0):
1060 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1061 bundle.was_cancelled = True
1065 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1066 """Find a worker for bundle or block until one is available."""
1068 self.adjust_task_count(+1)
1070 hostname = bundle.hostname
1071 avoid_machine = override_avoid_machine
1072 is_original = bundle.src_bundle is None
1074 # Try not to schedule a backup on the same host as the original.
1075 if avoid_machine is None and bundle.src_bundle is not None:
1076 avoid_machine = bundle.src_bundle.machine
1078 while worker is None:
1079 worker = self._find_available_worker_or_block(avoid_machine)
1080 assert worker is not None
1082 # Ok, found a worker.
1083 bundle.worker = worker
1084 machine = bundle.machine = worker.machine
1085 username = bundle.username = worker.username
1086 self.status.record_acquire_worker(worker, uuid)
1087 logger.debug('%s: Running bundle on %s...', bundle, worker)
1089 # Before we do any work, make sure the bundle is still viable.
1090 # It may have been some time between when it was submitted and
1091 # now due to lack of worker availability and someone else may
1092 # have already finished it.
1093 if self._check_if_cancelled(bundle):
1095 return self._process_work_result(bundle)
1096 except Exception as e:
1098 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1100 self._release_worker(bundle)
1102 # Weird. We are the original owner of this
1103 # bundle. For it to have been cancelled, a backup
1104 # must have already started and completed before
1105 # we even for started. Moreover, the backup says
1106 # it is done but we can't find the results it
1107 # should have copied over. Reschedule the whole
1111 '%s: We are the original owner thread and yet there are '
1112 'no results for this bundle. This is unexpected and bad.',
1115 return self._emergency_retry_nasty_bundle(bundle)
1117 # We're a backup and our bundle is cancelled
1118 # before we even got started. Do nothing and let
1119 # the original bundle's thread worry about either
1120 # finding the results or complaining about it.
1123 # Send input code / data to worker machine if it's not local.
1124 if hostname not in machine:
1127 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1129 start_ts = time.time()
1130 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1132 xfer_latency = time.time() - start_ts
1134 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1136 except Exception as e:
1137 self._release_worker(bundle)
1139 # Weird. We tried to copy the code to the worker
1140 # and it failed... And we're the original bundle.
1144 "%s: Failed to send instructions to the worker machine?! "
1145 "This is not expected; we\'re the original bundle so this shouldn\'t "
1146 "be a race condition. Attempting an emergency retry...",
1149 return self._emergency_retry_nasty_bundle(bundle)
1151 # This is actually expected; we're a backup.
1152 # There's a race condition where someone else
1153 # already finished the work and removed the source
1154 # code_file before we could copy it. Ignore.
1156 '%s: Failed to send instructions to the worker machine... '
1157 'We\'re a backup and this may be caused by the original (or '
1158 'some other backup) already finishing this work. Ignoring.',
1163 # Kick off the work. Note that if this fails we let
1164 # _wait_for_process deal with it.
1165 self.status.record_processing_began(uuid)
1166 helper_path = config.config['remote_worker_helper_path']
1168 f'{SSH} {bundle.username}@{bundle.machine} '
1169 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1172 '%s: Executing %s in the background to kick off work...', bundle, cmd
1174 p = cmd_in_background(cmd, silent=True)
1177 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1179 return self._wait_for_process(p, bundle, 0)
1181 def _wait_for_process(
1182 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1184 """At this point we've copied the bundle's pickled code to the remote
1185 worker and started an ssh process that should be invoking the
1186 remote worker to have it execute the user's code. See how
1187 that's going and wait for it to complete or fail. Note that
1188 this code is recursive: there are codepaths where we decide to
1189 stop waiting for an ssh process (because another backup seems
1190 to have finished) but then fail to fetch or parse the results
1191 from that backup and thus call ourselves to continue waiting
1192 on an active ssh process. This is the purpose of the depth
1193 argument: to curtail potential infinite recursion by giving up
1197 p: the Popen record of the ssh job
1198 bundle: the bundle of work being executed remotely
1199 depth: how many retries we've made so far. Starts at zero.
1203 machine = bundle.machine
1204 assert p is not None
1205 pid = p.pid # pid of the ssh process
1208 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1212 self._release_worker(bundle)
1213 return self._emergency_retry_nasty_bundle(bundle)
1215 # Spin until either the ssh job we scheduled finishes the
1216 # bundle or some backup worker signals that they finished it
1220 p.wait(timeout=0.25)
1221 except subprocess.TimeoutExpired:
1222 if self._check_if_cancelled(bundle):
1224 '%s: looks like another worker finished bundle...', bundle
1228 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1232 # If we get here we believe the bundle is done; either the ssh
1233 # subprocess finished (hopefully successfully) or we noticed
1234 # that some other worker seems to have completed the bundle
1235 # before us and we're bailing out.
1237 ret = self._process_work_result(bundle)
1238 if ret is not None and p is not None:
1242 # Something went wrong; e.g. we could not copy the results
1243 # back, cleanup after ourselves on the remote machine, or
1244 # unpickle the results we got from the remove machine. If we
1245 # still have an active ssh subprocess, keep waiting on it.
1246 # Otherwise, time for an emergency reschedule.
1247 except Exception as e:
1249 logger.error('%s: Something unexpected just happened...', bundle)
1252 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1255 return self._wait_for_process(p, bundle, depth + 1)
1257 self._release_worker(bundle)
1258 return self._emergency_retry_nasty_bundle(bundle)
1260 def _process_work_result(self, bundle: BundleDetails) -> Any:
1261 """A bundle seems to be completed. Check on the results."""
1263 with self.status.lock:
1264 is_original = bundle.src_bundle is None
1265 was_cancelled = bundle.was_cancelled
1266 username = bundle.username
1267 machine = bundle.machine
1268 result_file = bundle.result_file
1269 code_file = bundle.code_file
1271 # Whether original or backup, if we finished first we must
1272 # fetch the results if the computation happened on a
1274 bundle.end_ts = time.time()
1275 if not was_cancelled:
1276 assert bundle.machine is not None
1277 if bundle.hostname not in bundle.machine:
1278 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1280 "%s: Fetching results back from %s@%s via %s",
1287 # If either of these throw they are handled in
1288 # _wait_for_process.
1293 except Exception as e:
1300 # Cleanup remote /tmp files.
1302 f'{SSH} {username}@{machine}'
1303 f' "/bin/rm -f {code_file} {result_file}"'
1306 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1308 dur = bundle.end_ts - bundle.start_ts
1309 self.histogram.add_item(dur)
1311 # Only the original worker should unpickle the file contents
1312 # though since it's the only one whose result matters. The
1313 # original is also the only job that may delete result_file
1314 # from disk. Note that the original may have been cancelled
1315 # if one of the backups finished first; it still must read the
1316 # result from disk. It still does that here with is_cancelled
1319 logger.debug("%s: Unpickling %s.", bundle, result_file)
1321 with open(result_file, 'rb') as rb:
1322 serialized = rb.read()
1323 result = cloudpickle.loads(serialized)
1324 except Exception as e:
1326 logger.error('Failed to load %s... this is bad news.', result_file)
1327 self._release_worker(bundle)
1329 # Re-raise the exception; the code in _wait_for_process may
1330 # decide to _emergency_retry_nasty_bundle here.
1332 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1333 os.remove(result_file)
1334 os.remove(code_file)
1336 # Notify any backups that the original is done so they
1337 # should stop ASAP. Do this whether or not we
1338 # finished first since there could be more than one
1340 if bundle.backup_bundles is not None:
1341 for backup in bundle.backup_bundles:
1343 '%s: Notifying backup %s that it\'s cancelled',
1347 backup.is_cancelled.set()
1349 # This is a backup job and, by now, we have already fetched
1350 # the bundle results.
1352 # Backup results don't matter, they just need to leave the
1353 # result file in the right place for their originals to
1354 # read/unpickle later.
1357 # Tell the original to stop if we finished first.
1358 if not was_cancelled:
1359 orig_bundle = bundle.src_bundle
1360 assert orig_bundle is not None
1362 '%s: Notifying original %s we beat them to it.',
1366 orig_bundle.is_cancelled.set()
1367 self._release_worker(bundle, was_cancelled=was_cancelled)
1370 def _create_original_bundle(self, pickle, function_name: str):
1371 """Creates a bundle that is not a backup of any other bundle but
1372 rather represents a user task.
1375 uuid = string_utils.generate_uuid(omit_dashes=True)
1376 code_file = f'/tmp/{uuid}.code.bin'
1377 result_file = f'/tmp/{uuid}.result.bin'
1379 logger.debug('Writing pickled code to %s', code_file)
1380 with open(code_file, 'wb') as wb:
1383 bundle = BundleDetails(
1384 pickled_code=pickle,
1386 function_name=function_name,
1390 hostname=platform.node(),
1391 code_file=code_file,
1392 result_file=result_file,
1394 start_ts=time.time(),
1396 slower_than_local_p95=False,
1397 slower_than_global_p95=False,
1399 is_cancelled=threading.Event(),
1400 was_cancelled=False,
1404 self.status.record_bundle_details(bundle)
1405 logger.debug('%s: Created an original bundle', bundle)
1408 def _create_backup_bundle(self, src_bundle: BundleDetails):
1409 """Creates a bundle that is a backup of another bundle that is
1410 running too slowly."""
1412 assert self.status.lock.locked()
1413 assert src_bundle.backup_bundles is not None
1414 n = len(src_bundle.backup_bundles)
1415 uuid = src_bundle.uuid + f'_backup#{n}'
1417 backup_bundle = BundleDetails(
1418 pickled_code=src_bundle.pickled_code,
1420 function_name=src_bundle.function_name,
1424 hostname=src_bundle.hostname,
1425 code_file=src_bundle.code_file,
1426 result_file=src_bundle.result_file,
1428 start_ts=time.time(),
1430 slower_than_local_p95=False,
1431 slower_than_global_p95=False,
1432 src_bundle=src_bundle,
1433 is_cancelled=threading.Event(),
1434 was_cancelled=False,
1435 backup_bundles=None, # backup backups not allowed
1438 src_bundle.backup_bundles.append(backup_bundle)
1439 self.status.record_bundle_details_already_locked(backup_bundle)
1440 logger.debug('%s: Created a backup bundle', backup_bundle)
1441 return backup_bundle
1443 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1444 """Schedule a backup of src_bundle."""
1446 assert self.status.lock.locked()
1447 assert src_bundle is not None
1448 backup_bundle = self._create_backup_bundle(src_bundle)
1450 '%s/%s: Scheduling backup for execution...',
1452 backup_bundle.function_name,
1454 self._helper_executor.submit(self._launch, backup_bundle)
1456 # Results from backups don't matter; if they finish first
1457 # they will move the result_file to this machine and let
1458 # the original pick them up and unpickle them (and return
1461 def _emergency_retry_nasty_bundle(
1462 self, bundle: BundleDetails
1463 ) -> Optional[fut.Future]:
1464 """Something unexpectedly failed with bundle. Either retry it
1465 from the beginning or throw in the towel and give up on it."""
1467 is_original = bundle.src_bundle is None
1468 bundle.worker = None
1469 avoid_last_machine = bundle.machine
1470 bundle.machine = None
1471 bundle.username = None
1472 bundle.failure_count += 1
1478 if bundle.failure_count > retry_limit:
1480 '%s: Tried this bundle too many times already (%dx); giving up.',
1485 raise RemoteExecutorException(
1486 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1490 '%s: At least it\'s only a backup; better luck with the others.',
1495 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1498 return self._launch(bundle, avoid_last_machine)
1501 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502 """Submit work to be done. This is the user entry point of this
1504 if self.already_shutdown:
1505 raise Exception('Submitted work after shutdown.')
1506 pickle = _make_cloud_pickle(function, *args, **kwargs)
1507 bundle = self._create_original_bundle(pickle, function.__name__)
1508 self.total_bundles_submitted += 1
1509 return self._helper_executor.submit(self._launch, bundle)
1512 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513 """Shutdown the executor."""
1514 if not self.already_shutdown:
1515 logging.debug('Shutting down RemoteExecutor %s', self.title)
1516 self.heartbeat_stop_event.set()
1517 self.heartbeat_thread.join()
1518 self._helper_executor.shutdown(wait)
1520 print(self.histogram.__repr__(label_formatter='%ds'))
1521 self.already_shutdown = True
1524 class RemoteWorkerPoolProvider:
1526 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1530 @persistent.persistent_autoloaded_singleton() # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1534 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535 self.remote_worker_pool = []
1536 for record in json_remote_worker_pool['remote_worker_records']:
1537 self.remote_worker_pool.append(
1538 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1540 assert len(self.remote_worker_pool) > 0
1543 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544 return self.remote_worker_pool
1547 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548 return self.remote_worker_pool
1552 def get_filename() -> str:
1553 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1557 def should_we_load_data(filename: str) -> bool:
1562 def should_we_save_data(filename: str) -> bool:
1567 class DefaultExecutors(object):
1568 """A container for a default thread, process and remote executor.
1569 These are not created until needed and we take care to clean up
1570 before process exit automatically for the caller's convenience.
1571 Instead of creating your own executor, consider using the one
1572 from this pool. e.g.::
1574 @par.parallelize(method=par.Method.PROCESS)
1576 solutions: List[Work],
1583 def start_do_work(all_work: List[Work]):
1585 logger.debug('Sharding work into groups of 10.')
1586 for subset in list_utils.shard(all_work, 10):
1587 shards.append([x for x in subset])
1589 logger.debug('Kicking off helper pool.')
1591 for n, shard in enumerate(shards):
1594 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1597 smart_future.wait_all(results)
1599 # Note: if you forget to do this it will clean itself up
1600 # during program termination including tearing down any
1601 # active ssh connections.
1602 executors.DefaultExecutors().process_pool().shutdown()
1606 self.thread_executor: Optional[ThreadExecutor] = None
1607 self.process_executor: Optional[ProcessExecutor] = None
1608 self.remote_executor: Optional[RemoteExecutor] = None
1611 def _ping(host) -> bool:
1612 logger.debug('RUN> ping -c 1 %s', host)
1615 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1621 def thread_pool(self) -> ThreadExecutor:
1622 if self.thread_executor is None:
1623 self.thread_executor = ThreadExecutor()
1624 return self.thread_executor
1626 def process_pool(self) -> ProcessExecutor:
1627 if self.process_executor is None:
1628 self.process_executor = ProcessExecutor()
1629 return self.process_executor
1631 def remote_pool(self) -> RemoteExecutor:
1632 if self.remote_executor is None:
1633 logger.info('Looking for some helper machines...')
1634 provider = ConfigRemoteWorkerPoolProvider()
1635 all_machines = provider.get_remote_workers()
1638 # Make sure we can ping each machine.
1639 for record in all_machines:
1640 if self._ping(record.machine):
1641 logger.info('%s is alive / responding to pings', record.machine)
1644 # The controller machine has a lot to do; go easy on it.
1646 if record.machine == platform.node() and record.count > 1:
1647 logger.info('Reducing workload for %s.', record.machine)
1648 record.count = max(int(record.count / 2), 1)
1650 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651 policy.register_worker_pool(pool)
1652 self.remote_executor = RemoteExecutor(pool, policy)
1653 return self.remote_executor
1655 def shutdown(self) -> None:
1656 if self.thread_executor is not None:
1657 self.thread_executor.shutdown(wait=True, quiet=True)
1658 self.thread_executor = None
1659 if self.process_executor is not None:
1660 self.process_executor.shutdown(wait=True, quiet=True)
1661 self.process_executor = None
1662 if self.remote_executor is not None:
1663 self.remote_executor.shutdown(wait=True, quiet=True)
1664 self.remote_executor = None