2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
7 This module defines a :class:`BaseExecutor` interface and three
10 - :class:`ThreadExecutor`
11 - :class:`ProcessExecutor`
12 - :class:`RemoteExecutor`
14 The :class:`ThreadExecutor` is used to dispatch work to background
15 threads in the same Python process for parallelized work. Of course,
16 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
17 is not terribly useful for compute-bound code. But it's good for
18 work that is mostly I/O bound.
20 The :class:`ProcessExecutor` is used to dispatch work to other
21 processes on the same machine and is more useful for compute-bound
24 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
25 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
26 to dispatch work to a set of remote worker machines on your
27 network. You can configure this pool via a JSON configuration file,
28 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
30 Finally, this file defines a :class:`DefaultExecutors` pool that
31 contains a pre-created and ready instance of each of the three
32 executors discussed. It has the added benefit of being automatically
33 cleaned up at process termination time.
35 See instructions in :mod:`pyutils.parallelize.parallelize` for
36 setting up and using the framework.
39 from __future__ import annotations
41 import concurrent.futures as fut
50 from abc import ABC, abstractmethod
51 from collections import defaultdict
52 from dataclasses import dataclass, fields
53 from typing import Any, Callable, Dict, List, Optional, Set
55 import cloudpickle # type: ignore
56 from overrides import overrides
58 import pyutils.typez.histogram as hist
59 from pyutils import argparse_utils, config, math_utils, persistent, string_utils
60 from pyutils.ansi import bg, fg, reset, underline
61 from pyutils.decorator_utils import singleton
62 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
63 from pyutils.parallelize.thread_utils import background_thread
65 logger = logging.getLogger(__name__)
67 parser = config.add_commandline_args(
68 f"Executors ({__file__})", "Args related to processing executors."
71 '--executors_threadpool_size',
74 help='Number of threads in the default threadpool, leave unset for default',
78 '--executors_processpool_size',
81 help='Number of processes in the default processpool, leave unset for default',
85 '--executors_schedule_remote_backups',
87 action=argparse_utils.ActionNoYes,
88 help='Should we schedule duplicative backup work if a remote bundle is slow',
91 '--executors_max_bundle_failures',
95 help='Maximum number of failures before giving up on a bundle',
98 '--remote_worker_records_file',
101 help='Path of the remote worker records file (JSON)',
102 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
105 '--remote_worker_helper_path',
107 metavar='PATH_TO_REMOTE_WORKER_PY',
108 help='Path to remote_worker.py on remote machines',
109 default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
113 SSH = '/usr/bin/ssh -oForwardX11=no'
114 SCP = '/usr/bin/scp -C'
117 def _make_cloud_pickle(fun, *args, **kwargs):
118 """Internal helper to create cloud pickles."""
119 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
120 return cloudpickle.dumps((fun, args, kwargs))
123 class BaseExecutor(ABC):
124 """The base executor interface definition. The interface for
125 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
126 :class:`ThreadExecutor`.
129 def __init__(self, *, title=''):
132 title: the name of this executor.
135 self.histogram = hist.SimpleHistogram(
136 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
141 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
142 """Submit work for the executor to do.
145 function: the Callable to be executed.
146 *args: the arguments to function
147 **kwargs: the arguments to function
150 A concurrent :class:`Future` representing the result of the
156 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
157 """Shutdown the executor.
160 wait: wait for the shutdown to complete before returning?
161 quiet: keep it quiet, please.
165 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
166 """Shutdown the executor and return True if the executor is idle
167 (i.e. there are no pending or active tasks). Return False
168 otherwise. Note: this should only be called by the launcher
172 quiet: keep it quiet, please.
175 True if the executor could be shut down because it has no
176 pending work, False otherwise.
178 if self.task_count == 0:
179 self.shutdown(wait=True, quiet=quiet)
183 def adjust_task_count(self, delta: int) -> None:
184 """Change the task count. Note: do not call this method from a
185 worker, it should only be called by the launcher process /
189 delta: the delta value by which to adjust task count.
191 self.task_count += delta
192 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
194 def get_task_count(self) -> int:
195 """Change the task count. Note: do not call this method from a
196 worker, it should only be called by the launcher process /
200 The executor's current task count.
202 return self.task_count
205 class ThreadExecutor(BaseExecutor):
206 """A threadpool executor. This executor uses Python threads to
207 schedule tasks. Note that, at least as of python3.10, because of
208 the global lock in the interpreter itself, these do not
209 parallelize very well so this class is useful mostly for non-CPU
212 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
215 def __init__(self, max_workers: Optional[int] = None):
218 max_workers: maximum number of threads to create in the pool.
222 if max_workers is not None:
223 workers = max_workers
224 elif 'executors_threadpool_size' in config.config:
225 workers = config.config['executors_threadpool_size']
226 if workers is not None:
227 logger.debug('Creating threadpool executor with %d workers', workers)
229 logger.debug('Creating a default sized threadpool executor')
230 self._thread_pool_executor = fut.ThreadPoolExecutor(
231 max_workers=workers, thread_name_prefix="thread_executor_helper"
233 self.already_shutdown = False
235 # This is run on a different thread; do not adjust task count here.
237 def _run_local_bundle(fun, *args, **kwargs):
238 logger.debug("Running local bundle at %s", fun.__name__)
239 result = fun(*args, **kwargs)
243 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
244 if self.already_shutdown:
245 raise Exception('Submitted work after shutdown.')
246 self.adjust_task_count(+1)
248 newargs.append(function)
252 result = self._thread_pool_executor.submit(
253 ThreadExecutor._run_local_bundle, *newargs, **kwargs
255 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
256 result.add_done_callback(lambda _: self.adjust_task_count(-1))
260 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
261 if not self.already_shutdown:
262 logger.debug('Shutting down threadpool executor %s', self.title)
263 self._thread_pool_executor.shutdown(wait)
265 print(self.histogram.__repr__(label_formatter='%ds'))
266 self.already_shutdown = True
269 class ProcessExecutor(BaseExecutor):
270 """An executor which runs tasks in child processes.
272 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
275 def __init__(self, max_workers=None):
278 max_workers: the max number of worker processes to create.
282 if max_workers is not None:
283 workers = max_workers
284 elif 'executors_processpool_size' in config.config:
285 workers = config.config['executors_processpool_size']
286 if workers is not None:
287 logger.debug('Creating processpool executor with %d workers.', workers)
289 logger.debug('Creating a default sized processpool executor')
290 self._process_executor = fut.ProcessPoolExecutor(
293 self.already_shutdown = False
295 # This is run in another process; do not adjust task count here.
297 def _run_cloud_pickle(pickle):
298 fun, args, kwargs = cloudpickle.loads(pickle)
299 logger.debug("Running pickled bundle at %s", fun.__name__)
300 result = fun(*args, **kwargs)
304 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
305 if self.already_shutdown:
306 raise Exception('Submitted work after shutdown.')
308 self.adjust_task_count(+1)
309 pickle = _make_cloud_pickle(function, *args, **kwargs)
310 result = self._process_executor.submit(
311 ProcessExecutor._run_cloud_pickle, pickle
313 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
314 result.add_done_callback(lambda _: self.adjust_task_count(-1))
318 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
319 if not self.already_shutdown:
320 logger.debug('Shutting down processpool executor %s', self.title)
321 self._process_executor.shutdown(wait)
323 print(self.histogram.__repr__(label_formatter='%ds'))
324 self.already_shutdown = True
326 def __getstate__(self):
327 state = self.__dict__.copy()
328 state['_process_executor'] = None
332 class RemoteExecutorException(Exception):
333 """Thrown when a bundle cannot be executed despite several retries."""
339 class RemoteWorkerRecord:
340 """A record of info about a remote worker."""
343 """Username we can ssh into on this machine to run work."""
346 """Machine address / name."""
349 """Relative probability for the weighted policy to select this
350 machine for scheduling work."""
353 """If this machine is selected, what is the maximum number of task
354 that it can handle?"""
357 return hash((self.username, self.machine))
360 return f'{self.username}@{self.machine}'
365 """All info necessary to define some unit of work that needs to be
366 done, where it is being run, its state, whether it is an original
367 bundle of a backup bundle, how many times it has failed, etc...
371 """The code to run, cloud pickled"""
374 """A unique identifier"""
377 """The name of the function we pickled"""
379 worker: Optional[RemoteWorkerRecord]
380 """The remote worker running this bundle or None if none (yet)"""
382 username: Optional[str]
383 """The remote username running this bundle or None if none (yet)"""
385 machine: Optional[str]
386 """The remote machine running this bundle or None if none (yet)"""
389 """The controller machine"""
392 """A unique filename to hold the work to be done"""
395 """Where the results should be placed / read from"""
398 """The process id of the local subprocess watching the ssh connection
399 to the remote machine"""
407 slower_than_local_p95: bool
408 """Currently slower then 95% of other bundles on remote host"""
410 slower_than_global_p95: bool
411 """Currently slower than 95% of other bundles globally"""
413 src_bundle: Optional[BundleDetails]
414 """If this is a backup bundle, this points to the original bundle
415 that it's backing up. None otherwise."""
417 is_cancelled: threading.Event
418 """An event that can be signaled to indicate this bundle is cancelled.
419 This is set when another copy (backup or original) of this work has
420 completed successfully elsewhere."""
423 """True if this bundle was cancelled, False if it finished normally"""
425 backup_bundles: Optional[List[BundleDetails]]
426 """If we've created backups of this bundle, this is the list of them"""
429 """How many times has this bundle failed already?"""
433 if uuid[-9:-2] == '_backup':
435 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
439 # We colorize the uuid based on some bits from it to make them
440 # stand out in the logging and help a reader correlate log messages
441 # related to the same bundle.
448 fg('marigold yellow'),
451 fg('cornflower blue'),
452 fg('turquoise blue'),
454 fg('lavender purple'),
457 c = colorz[int(uuid[-2:], 16) % len(colorz)]
459 self.function_name if self.function_name is not None else 'nofname'
461 machine = self.machine if self.machine is not None else 'nomachine'
462 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
465 class RemoteExecutorStatus:
466 """A status 'scoreboard' for a remote executor tracking various
467 metrics and able to render a periodic dump of global state.
470 def __init__(self, total_worker_count: int) -> None:
473 total_worker_count: number of workers in the pool
475 self.worker_count: int = total_worker_count
476 self.known_workers: Set[RemoteWorkerRecord] = set()
477 self.start_time: float = time.time()
478 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
479 self.end_per_bundle: Dict[str, float] = defaultdict(float)
480 self.finished_bundle_timings_per_worker: Dict[
481 RemoteWorkerRecord, math_utils.NumericPopulation
483 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
484 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
485 self.finished_bundle_timings: math_utils.NumericPopulation = (
486 math_utils.NumericPopulation()
488 self.last_periodic_dump: Optional[float] = None
489 self.total_bundles_submitted: int = 0
491 # Protects reads and modification using self. Also used
492 # as a memory fence for modifications to bundle.
493 self.lock: threading.Lock = threading.Lock()
495 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
496 """Record that bundle with uuid is assigned to a particular worker.
499 worker: the record of the worker to which uuid is assigned
500 uuid: the uuid of a bundle that has been assigned to a worker
503 self.record_acquire_worker_already_locked(worker, uuid)
505 def record_acquire_worker_already_locked(
506 self, worker: RemoteWorkerRecord, uuid: str
508 """Same as above but an entry point that doesn't acquire the lock
509 for codepaths where it's already held."""
510 assert self.lock.locked()
511 self.known_workers.add(worker)
512 self.start_per_bundle[uuid] = None
513 x = self.in_flight_bundles_by_worker.get(worker, set())
515 self.in_flight_bundles_by_worker[worker] = x
517 def record_bundle_details(self, details: BundleDetails) -> None:
518 """Register the details about a bundle of work."""
520 self.record_bundle_details_already_locked(details)
522 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
523 """Same as above but for codepaths that already hold the lock."""
524 assert self.lock.locked()
525 self.bundle_details_by_uuid[details.uuid] = details
527 def record_release_worker(
529 worker: RemoteWorkerRecord,
533 """Record that a bundle has released a worker."""
535 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
537 def record_release_worker_already_locked(
539 worker: RemoteWorkerRecord,
543 """Same as above but for codepaths that already hold the lock."""
544 assert self.lock.locked()
546 self.end_per_bundle[uuid] = ts
547 self.in_flight_bundles_by_worker[worker].remove(uuid)
548 if not was_cancelled:
549 start = self.start_per_bundle[uuid]
550 assert start is not None
551 bundle_latency = ts - start
552 x = self.finished_bundle_timings_per_worker.get(
553 worker, math_utils.NumericPopulation()
555 x.add_number(bundle_latency)
556 self.finished_bundle_timings_per_worker[worker] = x
557 self.finished_bundle_timings.add_number(bundle_latency)
559 def record_processing_began(self, uuid: str):
560 """Record when work on a bundle begins."""
562 self.start_per_bundle[uuid] = time.time()
564 def total_in_flight(self) -> int:
565 """How many bundles are in flight currently?"""
566 assert self.lock.locked()
568 for worker in self.known_workers:
569 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
570 return total_in_flight
572 def total_idle(self) -> int:
573 """How many idle workers are there currently?"""
574 assert self.lock.locked()
575 return self.worker_count - self.total_in_flight()
578 assert self.lock.locked()
580 total_finished = len(self.finished_bundle_timings)
581 total_in_flight = self.total_in_flight()
582 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
584 if len(self.finished_bundle_timings) > 1:
585 qall_median = self.finished_bundle_timings.get_median()
586 qall_p95 = self.finished_bundle_timings.get_percentile(95)
588 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
589 f'✅={total_finished}/{self.total_bundles_submitted}, '
590 f'💻n={total_in_flight}/{self.worker_count}\n'
594 f'⏱={ts-self.start_time:.1f}s, '
595 f'✅={total_finished}/{self.total_bundles_submitted}, '
596 f'💻n={total_in_flight}/{self.worker_count}\n'
599 for worker in self.known_workers:
600 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
601 timings = self.finished_bundle_timings_per_worker.get(
602 worker, math_utils.NumericPopulation()
605 qworker_median = None
608 qworker_median = timings.get_median()
609 qworker_p95 = timings.get_percentile(95)
610 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
614 ret += f' ...finished {count} total bundle(s) so far\n'
615 in_flight = len(self.in_flight_bundles_by_worker[worker])
617 ret += f' ...{in_flight} bundles currently in flight:\n'
618 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
619 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
620 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
621 if self.start_per_bundle[bundle_uuid] is not None:
622 sec = ts - self.start_per_bundle[bundle_uuid]
623 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
625 ret += f' {details} setting up / copying data...'
628 if qworker_p95 is not None:
629 if sec > qworker_p95:
630 ret += f'{bg("red")}>💻p95{reset()} '
631 if details is not None:
632 details.slower_than_local_p95 = True
634 if details is not None:
635 details.slower_than_local_p95 = False
639 ret += f'{bg("red")}>∀p95{reset()} '
640 if details is not None:
641 details.slower_than_global_p95 = True
643 details.slower_than_global_p95 = False
647 def periodic_dump(self, total_bundles_submitted: int) -> None:
648 assert self.lock.locked()
649 self.total_bundles_submitted = total_bundles_submitted
651 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
653 self.last_periodic_dump = ts
656 class RemoteWorkerSelectionPolicy(ABC):
657 """An interface definition of a policy for selecting a remote worker."""
660 self.workers: Optional[List[RemoteWorkerRecord]] = None
662 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
663 self.workers = workers
666 def is_worker_available(self) -> bool:
670 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
674 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
675 """A remote worker selector that uses weighted RNG."""
678 def is_worker_available(self) -> bool:
680 for worker in self.workers:
686 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
689 for worker in self.workers:
690 if worker.machine != machine_to_avoid:
692 for _ in range(worker.count * worker.weight):
693 grabbag.append(worker)
695 if len(grabbag) == 0:
697 'There are no available workers that avoid %s', machine_to_avoid
700 for worker in self.workers:
702 for _ in range(worker.count * worker.weight):
703 grabbag.append(worker)
705 if len(grabbag) == 0:
706 logger.warning('There are no available workers?!')
709 worker = random.sample(grabbag, 1)[0]
710 assert worker.count > 0
712 logger.debug('Selected worker %s', worker)
716 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
717 """A remote worker selector that just round robins."""
719 def __init__(self) -> None:
724 def is_worker_available(self) -> bool:
726 for worker in self.workers:
733 self, machine_to_avoid: str = None
734 ) -> Optional[RemoteWorkerRecord]:
738 worker = self.workers[x]
742 if x >= len(self.workers):
745 logger.debug('Selected worker %s', worker)
748 if x >= len(self.workers):
751 logger.warning('Unexpectedly could not find a worker, retrying...')
756 class RemoteExecutor(BaseExecutor):
757 """An executor that uses processes on remote machines to do work.
758 To do so, it requires that a pool of remote workers to be properly
759 configured. See instructions in
760 :class:`pyutils.parallelize.parallelize`.
762 Each machine in a worker pool has a *weight* and a *count*. A
763 *weight* captures the relative speed of a processor on that worker
764 and a *count* captures the number of synchronous tasks the worker
765 can accept (i.e. the number of cpus on the machine).
767 To dispatch work to a remote machine, this class pickles the code
768 to be executed remotely using `cloudpickle`. For that to work,
769 the remote machine should be running the same version of Python as
770 this machine, ideally in a virtual environment with the same
771 import libraries installed. Differences in operating system
772 and/or processor architecture don't seem to matter for most code,
777 Mismatches in Python version or in the version numbers of
778 third-party libraries between machines can cause problems
779 when trying to unpickle and run code remotely.
781 Work to be dispatched is represented in this code by creating a
782 "bundle". Each bundle is assigned to a remote worker based on
783 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
784 general, it attempts to load all workers in the pool and maximize
785 throughput. Once assigned to a remote worker, pickled code is
786 copied to that worker via `scp` and a remote command is issued via
787 `ssh` to execute a :file:`remote_worker.py` process on the remote
788 machine. This process unpickles the code, runs it, and produces a
789 result which is then copied back to the local machine (again via
790 `scp`) where it can be processed by local code.
792 You can and probably must override the path of
793 :file:`remote_worker.py` on your pool machines using the
794 `--remote_worker_helper_path` commandline argument (or by just
795 changing the default in code, see above in this file's code).
797 During remote work execution, this local machine acts as a
798 controller dispatching all work to the network, copying pickled
799 tasks out, and copying results back in. It may also be a worker
800 in the pool but do not underestimate the cost of being a
801 controller -- it takes some cpu and a lot of network bandwidth.
802 The work dispatcher logic attempts to detect when a controller is
803 also a worker and reduce its load.
805 Some redundancy and safety provisions are made when scheduling
806 tasks to the worker pool; e.g. slower than expected tasks have
807 redundant backups tasks created, especially if there are otherwise
808 idle workers. If a task fails repeatedly, the dispatcher consider
809 it poisoned and give up on it.
813 This executor probably only makes sense to use with
814 computationally expensive tasks such as jobs that will execute
815 for ~30 seconds or longer.
817 The network overhead and latency of copying work from the
818 controller (local) machine to the remote workers and copying
819 results back again is relatively high. Especially at startup,
820 the network can become a bottleneck. Future versions of this
821 code may attempt to split the responsibility of being a
822 controller (distributing work to pool machines).
824 Instructions for how to set this up are provided in
825 :class:`pyutils.parallelize.parallelize`.
827 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
833 workers: List[RemoteWorkerRecord],
834 policy: RemoteWorkerSelectionPolicy,
838 workers: A list of remote workers we can call on to do tasks.
839 policy: A policy for selecting remote workers for tasks.
843 self.workers = workers
845 self.worker_count = 0
846 for worker in self.workers:
847 self.worker_count += worker.count
848 if self.worker_count <= 0:
849 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
851 raise RemoteExecutorException(msg)
852 self.policy.register_worker_pool(self.workers)
853 self.cv = threading.Condition()
855 'Creating %d local threads, one per remote worker.', self.worker_count
857 self._helper_executor = fut.ThreadPoolExecutor(
858 thread_name_prefix="remote_executor_helper",
859 max_workers=self.worker_count,
861 self.status = RemoteExecutorStatus(self.worker_count)
862 self.total_bundles_submitted = 0
863 self.backup_lock = threading.Lock()
864 self.last_backup = None
866 self.heartbeat_thread,
867 self.heartbeat_stop_event,
868 ) = self._run_periodic_heartbeat()
869 self.already_shutdown = False
872 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
874 We create a background thread to invoke :meth:`_heartbeat` regularly
875 while we are scheduling work. It does some accounting such as
876 looking for slow bundles to tag for backup creation, checking for
877 unexpected failures, and printing a fancy message on stdout.
879 while not stop_event.is_set():
881 logger.debug('Running periodic heartbeat code...')
883 logger.debug('Periodic heartbeat thread shutting down.')
885 def _heartbeat(self) -> None:
886 # Note: this is invoked on a background thread, not an
887 # executor thread. Be careful what you do with it b/c it
888 # needs to get back and dump status again periodically.
889 with self.status.lock:
890 self.status.periodic_dump(self.total_bundles_submitted)
892 # Look for bundles to reschedule via executor.submit
893 if config.config['executors_schedule_remote_backups']:
894 self._maybe_schedule_backup_bundles()
896 def _maybe_schedule_backup_bundles(self):
897 """Maybe schedule backup bundles if we see a very slow bundle."""
899 assert self.status.lock.locked()
900 num_done = len(self.status.finished_bundle_timings)
901 num_idle_workers = self.worker_count - self.task_count
905 and num_idle_workers > 0
906 and (self.last_backup is None or (now - self.last_backup > 9.0))
907 and self.backup_lock.acquire(blocking=False)
910 assert self.backup_lock.locked()
912 bundle_to_backup = None
917 ) in self.status.in_flight_bundles_by_worker.items():
919 # Prefer to schedule backups of bundles running on
922 for record in self.workers:
923 if worker.machine == record.machine:
924 base_score = float(record.weight)
925 base_score = 1.0 / base_score
927 base_score = int(base_score)
930 for uuid in bundle_uuids:
931 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
934 and bundle.src_bundle is None
935 and bundle.backup_bundles is not None
939 # Schedule backups of bundles running
940 # longer; especially those that are
942 start_ts = self.status.start_per_bundle[uuid]
943 if start_ts is not None:
944 runtime = now - start_ts
947 'score[%s] => %.1f # latency boost', bundle, score
950 if bundle.slower_than_local_p95:
953 'score[%s] => %.1f # >worker p95',
958 if bundle.slower_than_global_p95:
961 'score[%s] => %.1f # >global p95',
966 # Prefer backups of bundles that don't
967 # have backups already.
968 backup_count = len(bundle.backup_bundles)
969 if backup_count == 0:
971 elif backup_count == 1:
973 elif backup_count == 2:
978 'score[%s] => %.1f # {backup_count} dup backup factor',
984 best_score is None or score > best_score
986 bundle_to_backup = bundle
987 assert bundle is not None
988 assert bundle.backup_bundles is not None
989 assert bundle.src_bundle is None
992 # Note: this is all still happening on the heartbeat
993 # runner thread. That's ok because
994 # _schedule_backup_for_bundle uses the executor to
995 # submit the bundle again which will cause it to be
996 # picked up by a worker thread and allow this thread
997 # to return to run future heartbeats.
998 if bundle_to_backup is not None:
999 self.last_backup = now
1001 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1005 self._schedule_backup_for_bundle(bundle_to_backup)
1007 self.backup_lock.release()
1009 def _is_worker_available(self) -> bool:
1010 """Is there a worker available currently?"""
1011 return self.policy.is_worker_available()
1013 def _acquire_worker(
1014 self, machine_to_avoid: str = None
1015 ) -> Optional[RemoteWorkerRecord]:
1016 """Try to acquire a worker."""
1017 return self.policy.acquire_worker(machine_to_avoid)
1019 def _find_available_worker_or_block(
1020 self, machine_to_avoid: str = None
1021 ) -> RemoteWorkerRecord:
1022 """Find a worker or block until one becomes available."""
1024 while not self._is_worker_available():
1026 worker = self._acquire_worker(machine_to_avoid)
1027 if worker is not None:
1029 msg = "We should never reach this point in the code"
1030 logger.critical(msg)
1031 raise Exception(msg)
1033 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1034 """Release a previously acquired worker."""
1035 worker = bundle.worker
1036 assert worker is not None
1037 logger.debug('Released worker %s', worker)
1038 self.status.record_release_worker(
1046 self.adjust_task_count(-1)
1048 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1049 """See if a particular bundle is cancelled. Do not block."""
1050 with self.status.lock:
1051 if bundle.is_cancelled.wait(timeout=0.0):
1052 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1053 bundle.was_cancelled = True
1057 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1058 """Find a worker for bundle or block until one is available."""
1060 self.adjust_task_count(+1)
1062 hostname = bundle.hostname
1063 avoid_machine = override_avoid_machine
1064 is_original = bundle.src_bundle is None
1066 # Try not to schedule a backup on the same host as the original.
1067 if avoid_machine is None and bundle.src_bundle is not None:
1068 avoid_machine = bundle.src_bundle.machine
1070 while worker is None:
1071 worker = self._find_available_worker_or_block(avoid_machine)
1072 assert worker is not None
1074 # Ok, found a worker.
1075 bundle.worker = worker
1076 machine = bundle.machine = worker.machine
1077 username = bundle.username = worker.username
1078 self.status.record_acquire_worker(worker, uuid)
1079 logger.debug('%s: Running bundle on %s...', bundle, worker)
1081 # Before we do any work, make sure the bundle is still viable.
1082 # It may have been some time between when it was submitted and
1083 # now due to lack of worker availability and someone else may
1084 # have already finished it.
1085 if self._check_if_cancelled(bundle):
1087 return self._process_work_result(bundle)
1088 except Exception as e:
1090 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1092 self._release_worker(bundle)
1094 # Weird. We are the original owner of this
1095 # bundle. For it to have been cancelled, a backup
1096 # must have already started and completed before
1097 # we even for started. Moreover, the backup says
1098 # it is done but we can't find the results it
1099 # should have copied over. Reschedule the whole
1103 '%s: We are the original owner thread and yet there are '
1104 'no results for this bundle. This is unexpected and bad.',
1107 return self._emergency_retry_nasty_bundle(bundle)
1109 # We're a backup and our bundle is cancelled
1110 # before we even got started. Do nothing and let
1111 # the original bundle's thread worry about either
1112 # finding the results or complaining about it.
1115 # Send input code / data to worker machine if it's not local.
1116 if hostname not in machine:
1119 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1121 start_ts = time.time()
1122 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1124 xfer_latency = time.time() - start_ts
1126 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1128 except Exception as e:
1129 self._release_worker(bundle)
1131 # Weird. We tried to copy the code to the worker
1132 # and it failed... And we're the original bundle.
1136 "%s: Failed to send instructions to the worker machine?! "
1137 "This is not expected; we\'re the original bundle so this shouldn\'t "
1138 "be a race condition. Attempting an emergency retry...",
1141 return self._emergency_retry_nasty_bundle(bundle)
1143 # This is actually expected; we're a backup.
1144 # There's a race condition where someone else
1145 # already finished the work and removed the source
1146 # code_file before we could copy it. Ignore.
1148 '%s: Failed to send instructions to the worker machine... '
1149 'We\'re a backup and this may be caused by the original (or '
1150 'some other backup) already finishing this work. Ignoring.',
1155 # Kick off the work. Note that if this fails we let
1156 # _wait_for_process deal with it.
1157 self.status.record_processing_began(uuid)
1158 helper_path = config.config['remote_worker_helper_path']
1160 f'{SSH} {bundle.username}@{bundle.machine} '
1161 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1164 '%s: Executing %s in the background to kick off work...', bundle, cmd
1166 p = cmd_in_background(cmd, silent=True)
1169 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1171 return self._wait_for_process(p, bundle, 0)
1173 def _wait_for_process(
1174 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1176 """At this point we've copied the bundle's pickled code to the remote
1177 worker and started an ssh process that should be invoking the
1178 remote worker to have it execute the user's code. See how
1179 that's going and wait for it to complete or fail. Note that
1180 this code is recursive: there are codepaths where we decide to
1181 stop waiting for an ssh process (because another backup seems
1182 to have finished) but then fail to fetch or parse the results
1183 from that backup and thus call ourselves to continue waiting
1184 on an active ssh process. This is the purpose of the depth
1185 argument: to curtail potential infinite recursion by giving up
1189 p: the Popen record of the ssh job
1190 bundle: the bundle of work being executed remotely
1191 depth: how many retries we've made so far. Starts at zero.
1195 machine = bundle.machine
1196 assert p is not None
1197 pid = p.pid # pid of the ssh process
1200 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1204 self._release_worker(bundle)
1205 return self._emergency_retry_nasty_bundle(bundle)
1207 # Spin until either the ssh job we scheduled finishes the
1208 # bundle or some backup worker signals that they finished it
1212 p.wait(timeout=0.25)
1213 except subprocess.TimeoutExpired:
1214 if self._check_if_cancelled(bundle):
1216 '%s: looks like another worker finished bundle...', bundle
1220 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1224 # If we get here we believe the bundle is done; either the ssh
1225 # subprocess finished (hopefully successfully) or we noticed
1226 # that some other worker seems to have completed the bundle
1227 # before us and we're bailing out.
1229 ret = self._process_work_result(bundle)
1230 if ret is not None and p is not None:
1234 # Something went wrong; e.g. we could not copy the results
1235 # back, cleanup after ourselves on the remote machine, or
1236 # unpickle the results we got from the remove machine. If we
1237 # still have an active ssh subprocess, keep waiting on it.
1238 # Otherwise, time for an emergency reschedule.
1239 except Exception as e:
1241 logger.error('%s: Something unexpected just happened...', bundle)
1244 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1247 return self._wait_for_process(p, bundle, depth + 1)
1249 self._release_worker(bundle)
1250 return self._emergency_retry_nasty_bundle(bundle)
1252 def _process_work_result(self, bundle: BundleDetails) -> Any:
1253 """A bundle seems to be completed. Check on the results."""
1255 with self.status.lock:
1256 is_original = bundle.src_bundle is None
1257 was_cancelled = bundle.was_cancelled
1258 username = bundle.username
1259 machine = bundle.machine
1260 result_file = bundle.result_file
1261 code_file = bundle.code_file
1263 # Whether original or backup, if we finished first we must
1264 # fetch the results if the computation happened on a
1266 bundle.end_ts = time.time()
1267 if not was_cancelled:
1268 assert bundle.machine is not None
1269 if bundle.hostname not in bundle.machine:
1270 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1272 "%s: Fetching results back from %s@%s via %s",
1279 # If either of these throw they are handled in
1280 # _wait_for_process.
1285 except Exception as e:
1292 # Cleanup remote /tmp files.
1294 f'{SSH} {username}@{machine}'
1295 f' "/bin/rm -f {code_file} {result_file}"'
1298 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1300 dur = bundle.end_ts - bundle.start_ts
1301 self.histogram.add_item(dur)
1303 # Only the original worker should unpickle the file contents
1304 # though since it's the only one whose result matters. The
1305 # original is also the only job that may delete result_file
1306 # from disk. Note that the original may have been cancelled
1307 # if one of the backups finished first; it still must read the
1308 # result from disk. It still does that here with is_cancelled
1311 logger.debug("%s: Unpickling %s.", bundle, result_file)
1313 with open(result_file, 'rb') as rb:
1314 serialized = rb.read()
1315 result = cloudpickle.loads(serialized)
1316 except Exception as e:
1318 logger.error('Failed to load %s... this is bad news.', result_file)
1319 self._release_worker(bundle)
1321 # Re-raise the exception; the code in _wait_for_process may
1322 # decide to _emergency_retry_nasty_bundle here.
1324 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1325 os.remove(result_file)
1326 os.remove(code_file)
1328 # Notify any backups that the original is done so they
1329 # should stop ASAP. Do this whether or not we
1330 # finished first since there could be more than one
1332 if bundle.backup_bundles is not None:
1333 for backup in bundle.backup_bundles:
1335 '%s: Notifying backup %s that it\'s cancelled',
1339 backup.is_cancelled.set()
1341 # This is a backup job and, by now, we have already fetched
1342 # the bundle results.
1344 # Backup results don't matter, they just need to leave the
1345 # result file in the right place for their originals to
1346 # read/unpickle later.
1349 # Tell the original to stop if we finished first.
1350 if not was_cancelled:
1351 orig_bundle = bundle.src_bundle
1352 assert orig_bundle is not None
1354 '%s: Notifying original %s we beat them to it.',
1358 orig_bundle.is_cancelled.set()
1359 self._release_worker(bundle, was_cancelled=was_cancelled)
1362 def _create_original_bundle(self, pickle, function_name: str):
1363 """Creates a bundle that is not a backup of any other bundle but
1364 rather represents a user task.
1367 uuid = string_utils.generate_uuid(omit_dashes=True)
1368 code_file = f'/tmp/{uuid}.code.bin'
1369 result_file = f'/tmp/{uuid}.result.bin'
1371 logger.debug('Writing pickled code to %s', code_file)
1372 with open(code_file, 'wb') as wb:
1375 bundle = BundleDetails(
1376 pickled_code=pickle,
1378 function_name=function_name,
1382 hostname=platform.node(),
1383 code_file=code_file,
1384 result_file=result_file,
1386 start_ts=time.time(),
1388 slower_than_local_p95=False,
1389 slower_than_global_p95=False,
1391 is_cancelled=threading.Event(),
1392 was_cancelled=False,
1396 self.status.record_bundle_details(bundle)
1397 logger.debug('%s: Created an original bundle', bundle)
1400 def _create_backup_bundle(self, src_bundle: BundleDetails):
1401 """Creates a bundle that is a backup of another bundle that is
1402 running too slowly."""
1404 assert self.status.lock.locked()
1405 assert src_bundle.backup_bundles is not None
1406 n = len(src_bundle.backup_bundles)
1407 uuid = src_bundle.uuid + f'_backup#{n}'
1409 backup_bundle = BundleDetails(
1410 pickled_code=src_bundle.pickled_code,
1412 function_name=src_bundle.function_name,
1416 hostname=src_bundle.hostname,
1417 code_file=src_bundle.code_file,
1418 result_file=src_bundle.result_file,
1420 start_ts=time.time(),
1422 slower_than_local_p95=False,
1423 slower_than_global_p95=False,
1424 src_bundle=src_bundle,
1425 is_cancelled=threading.Event(),
1426 was_cancelled=False,
1427 backup_bundles=None, # backup backups not allowed
1430 src_bundle.backup_bundles.append(backup_bundle)
1431 self.status.record_bundle_details_already_locked(backup_bundle)
1432 logger.debug('%s: Created a backup bundle', backup_bundle)
1433 return backup_bundle
1435 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1436 """Schedule a backup of src_bundle."""
1438 assert self.status.lock.locked()
1439 assert src_bundle is not None
1440 backup_bundle = self._create_backup_bundle(src_bundle)
1442 '%s/%s: Scheduling backup for execution...',
1444 backup_bundle.function_name,
1446 self._helper_executor.submit(self._launch, backup_bundle)
1448 # Results from backups don't matter; if they finish first
1449 # they will move the result_file to this machine and let
1450 # the original pick them up and unpickle them (and return
1453 def _emergency_retry_nasty_bundle(
1454 self, bundle: BundleDetails
1455 ) -> Optional[fut.Future]:
1456 """Something unexpectedly failed with bundle. Either retry it
1457 from the beginning or throw in the towel and give up on it."""
1459 is_original = bundle.src_bundle is None
1460 bundle.worker = None
1461 avoid_last_machine = bundle.machine
1462 bundle.machine = None
1463 bundle.username = None
1464 bundle.failure_count += 1
1470 if bundle.failure_count > retry_limit:
1472 '%s: Tried this bundle too many times already (%dx); giving up.',
1477 raise RemoteExecutorException(
1478 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1482 '%s: At least it\'s only a backup; better luck with the others.',
1487 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1490 return self._launch(bundle, avoid_last_machine)
1493 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1494 """Submit work to be done. This is the user entry point of this
1496 if self.already_shutdown:
1497 raise Exception('Submitted work after shutdown.')
1498 pickle = _make_cloud_pickle(function, *args, **kwargs)
1499 bundle = self._create_original_bundle(pickle, function.__name__)
1500 self.total_bundles_submitted += 1
1501 return self._helper_executor.submit(self._launch, bundle)
1504 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1505 """Shutdown the executor."""
1506 if not self.already_shutdown:
1507 logging.debug('Shutting down RemoteExecutor %s', self.title)
1508 self.heartbeat_stop_event.set()
1509 self.heartbeat_thread.join()
1510 self._helper_executor.shutdown(wait)
1512 print(self.histogram.__repr__(label_formatter='%ds'))
1513 self.already_shutdown = True
1516 class RemoteWorkerPoolProvider:
1518 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1522 @persistent.persistent_autoloaded_singleton() # type: ignore
1523 class ConfigRemoteWorkerPoolProvider(
1524 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1526 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1527 self.remote_worker_pool = []
1528 for record in json_remote_worker_pool['remote_worker_records']:
1529 self.remote_worker_pool.append(
1530 self.dataclassFromDict(RemoteWorkerRecord, record)
1532 assert len(self.remote_worker_pool) > 0
1535 def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1536 fieldSet = {f.name for f in fields(clsName) if f.init}
1537 filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1538 return clsName(**filteredArgDict)
1541 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1542 return self.remote_worker_pool
1545 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1546 return self.remote_worker_pool
1550 def get_filename() -> str:
1551 return config.config['remote_worker_records_file']
1555 def should_we_load_data(filename: str) -> bool:
1560 def should_we_save_data(filename: str) -> bool:
1565 class DefaultExecutors(object):
1566 """A container for a default thread, process and remote executor.
1567 These are not created until needed and we take care to clean up
1568 before process exit automatically for the caller's convenience.
1569 Instead of creating your own executor, consider using the one
1570 from this pool. e.g.::
1572 @par.parallelize(method=par.Method.PROCESS)
1574 solutions: List[Work],
1581 def start_do_work(all_work: List[Work]):
1583 logger.debug('Sharding work into groups of 10.')
1584 for subset in list_utils.shard(all_work, 10):
1585 shards.append([x for x in subset])
1587 logger.debug('Kicking off helper pool.')
1589 for n, shard in enumerate(shards):
1592 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1595 smart_future.wait_all(results)
1597 # Note: if you forget to do this it will clean itself up
1598 # during program termination including tearing down any
1599 # active ssh connections.
1600 executors.DefaultExecutors().process_pool().shutdown()
1604 self.thread_executor: Optional[ThreadExecutor] = None
1605 self.process_executor: Optional[ProcessExecutor] = None
1606 self.remote_executor: Optional[RemoteExecutor] = None
1609 def _ping(host) -> bool:
1610 logger.debug('RUN> ping -c 1 %s', host)
1613 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1619 def thread_pool(self) -> ThreadExecutor:
1620 if self.thread_executor is None:
1621 self.thread_executor = ThreadExecutor()
1622 return self.thread_executor
1624 def process_pool(self) -> ProcessExecutor:
1625 if self.process_executor is None:
1626 self.process_executor = ProcessExecutor()
1627 return self.process_executor
1629 def remote_pool(self) -> RemoteExecutor:
1630 if self.remote_executor is None:
1631 logger.info('Looking for some helper machines...')
1632 provider = ConfigRemoteWorkerPoolProvider()
1633 all_machines = provider.get_remote_workers()
1636 # Make sure we can ping each machine.
1637 for record in all_machines:
1638 if self._ping(record.machine):
1639 logger.info('%s is alive / responding to pings', record.machine)
1642 # The controller machine has a lot to do; go easy on it.
1644 if record.machine == platform.node() and record.count > 1:
1645 logger.info('Reducing workload for %s.', record.machine)
1646 record.count = max(int(record.count / 2), 1)
1648 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1649 policy.register_worker_pool(pool)
1650 self.remote_executor = RemoteExecutor(pool, policy)
1651 return self.remote_executor
1653 def shutdown(self) -> None:
1654 if self.thread_executor is not None:
1655 self.thread_executor.shutdown(wait=True, quiet=True)
1656 self.thread_executor = None
1657 if self.process_executor is not None:
1658 self.process_executor.shutdown(wait=True, quiet=True)
1659 self.process_executor = None
1660 if self.remote_executor is not None:
1661 self.remote_executor.shutdown(wait=True, quiet=True)
1662 self.remote_executor = None