2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
6 # © Copyright 2021-2023, Scott Gasch
9 This module defines a :class:`BaseExecutor` interface and three
12 - :class:`ThreadExecutor`
13 - :class:`ProcessExecutor`
14 - :class:`RemoteExecutor`
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work. Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code. But it's good for
20 work that is mostly I/O bound.
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network. You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed. It has the added benefit of being automatically
35 cleaned up at process termination time.
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
41 from __future__ import annotations
43 import concurrent.futures as fut
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
57 import cloudpickle # type: ignore
58 from overrides import overrides
60 import pyutils.typez.histogram as hist
61 from pyutils import argparse_utils, config, dataclass_utils, math_utils, string_utils
62 from pyutils.ansi import bg, fg, reset, underline
63 from pyutils.decorator_utils import singleton
64 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
65 from pyutils.parallelize.thread_utils import background_thread
66 from pyutils.typez import persistent, type_utils
68 logger = logging.getLogger(__name__)
70 parser = config.add_commandline_args(
71 f"Executors ({__file__})", "Args related to processing executors."
74 '--executors_threadpool_size',
77 help='Number of threads in the default threadpool, leave unset for default',
81 '--executors_processpool_size',
84 help='Number of processes in the default processpool, leave unset for default',
88 '--executors_schedule_remote_backups',
90 action=argparse_utils.ActionNoYes,
91 help='Should we schedule duplicative backup work if a remote bundle is slow',
94 '--executors_max_bundle_failures',
98 help='Maximum number of failures before giving up on a bundle',
101 '--remote_worker_records_file',
104 help='Path of the remote worker records file (JSON)',
105 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
108 '--remote_worker_helper_path',
110 metavar='PATH_TO_REMOTE_WORKER_PY',
111 help='Path to remote_worker.py on remote machines',
112 default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
116 SSH = '/usr/bin/ssh -oForwardX11=no'
117 SCP = '/usr/bin/scp -C'
120 def _make_cloud_pickle(fun, *args, **kwargs):
121 """Internal helper to create cloud pickles."""
122 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
123 return cloudpickle.dumps((fun, args, kwargs))
126 class BaseExecutor(ABC):
127 """The base executor interface definition. The interface for
128 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
129 :class:`ThreadExecutor`.
132 def __init__(self, *, title=''):
135 title: the name of this executor.
138 self.histogram = hist.SimpleHistogram(
139 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
144 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
145 """Submit work for the executor to do.
148 function: the Callable to be executed.
149 *args: the arguments to function
150 **kwargs: the arguments to function
153 A concurrent :class:`Future` representing the result of the
159 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
160 """Shutdown the executor.
163 wait: wait for the shutdown to complete before returning?
164 quiet: keep it quiet, please.
168 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
169 """Shutdown the executor and return True if the executor is idle
170 (i.e. there are no pending or active tasks). Return False
171 otherwise. Note: this should only be called by the launcher
175 quiet: keep it quiet, please.
178 True if the executor could be shut down because it has no
179 pending work, False otherwise.
181 if self.task_count == 0:
182 self.shutdown(wait=True, quiet=quiet)
186 def adjust_task_count(self, delta: int) -> None:
187 """Change the task count. Note: do not call this method from a
188 worker, it should only be called by the launcher process /
192 delta: the delta value by which to adjust task count.
194 self.task_count += delta
195 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
197 def get_task_count(self) -> int:
198 """Change the task count. Note: do not call this method from a
199 worker, it should only be called by the launcher process /
203 The executor's current task count.
205 return self.task_count
208 class ThreadExecutor(BaseExecutor):
209 """A threadpool executor. This executor uses Python threads to
210 schedule tasks. Note that, at least as of python3.10, because of
211 the global lock in the interpreter itself, these do not
212 parallelize very well so this class is useful mostly for non-CPU
215 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
218 def __init__(self, max_workers: Optional[int] = None):
221 max_workers: maximum number of threads to create in the pool.
225 if max_workers is not None:
226 workers = max_workers
227 elif 'executors_threadpool_size' in config.config:
228 workers = config.config['executors_threadpool_size']
229 if workers is not None:
230 logger.debug('Creating threadpool executor with %d workers', workers)
232 logger.debug('Creating a default sized threadpool executor')
233 self._thread_pool_executor = fut.ThreadPoolExecutor(
234 max_workers=workers, thread_name_prefix="thread_executor_helper"
236 self.already_shutdown = False
238 # This is run on a different thread; do not adjust task count here.
240 def _run_local_bundle(fun, *args, **kwargs):
241 logger.debug("Running local bundle at %s", fun.__name__)
242 result = fun(*args, **kwargs)
246 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
247 if self.already_shutdown:
248 raise Exception('Submitted work after shutdown.')
249 self.adjust_task_count(+1)
251 newargs.append(function)
255 result = self._thread_pool_executor.submit(
256 ThreadExecutor._run_local_bundle, *newargs, **kwargs
258 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
259 result.add_done_callback(lambda _: self.adjust_task_count(-1))
263 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
264 if not self.already_shutdown:
265 logger.debug('Shutting down threadpool executor %s', self.title)
266 self._thread_pool_executor.shutdown(wait)
268 print(self.histogram.__repr__(label_formatter='%ds'))
269 self.already_shutdown = True
272 class ProcessExecutor(BaseExecutor):
273 """An executor which runs tasks in child processes.
275 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
278 def __init__(self, max_workers=None):
281 max_workers: the max number of worker processes to create.
285 if max_workers is not None:
286 workers = max_workers
287 elif 'executors_processpool_size' in config.config:
288 workers = config.config['executors_processpool_size']
289 if workers is not None:
290 logger.debug('Creating processpool executor with %d workers.', workers)
292 logger.debug('Creating a default sized processpool executor')
293 self._process_executor = fut.ProcessPoolExecutor(
296 self.already_shutdown = False
298 # This is run in another process; do not adjust task count here.
300 def _run_cloud_pickle(pickle):
301 fun, args, kwargs = cloudpickle.loads(pickle)
302 logger.debug("Running pickled bundle at %s", fun.__name__)
303 result = fun(*args, **kwargs)
307 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
308 if self.already_shutdown:
309 raise Exception('Submitted work after shutdown.')
311 self.adjust_task_count(+1)
312 pickle = _make_cloud_pickle(function, *args, **kwargs)
313 result = self._process_executor.submit(
314 ProcessExecutor._run_cloud_pickle, pickle
316 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
317 result.add_done_callback(lambda _: self.adjust_task_count(-1))
321 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
322 if not self.already_shutdown:
323 logger.debug('Shutting down processpool executor %s', self.title)
324 self._process_executor.shutdown(wait)
326 print(self.histogram.__repr__(label_formatter='%ds'))
327 self.already_shutdown = True
329 def __getstate__(self):
330 state = self.__dict__.copy()
331 state['_process_executor'] = None
335 class RemoteExecutorException(Exception):
336 """Thrown when a bundle cannot be executed despite several retries."""
342 class RemoteWorkerRecord:
343 """A record of info about a remote worker."""
346 """Username we can ssh into on this machine to run work."""
349 """Machine address / name."""
352 """Relative probability for the weighted policy to select this
353 machine for scheduling work."""
356 """If this machine is selected, what is the maximum number of task
357 that it can handle?"""
360 return hash((self.username, self.machine))
363 return f'{self.username}@{self.machine}'
368 """All info necessary to define some unit of work that needs to be
369 done, where it is being run, its state, whether it is an original
370 bundle of a backup bundle, how many times it has failed, etc...
374 """The code to run, cloud pickled"""
377 """A unique identifier"""
380 """The name of the function we pickled"""
382 worker: Optional[RemoteWorkerRecord]
383 """The remote worker running this bundle or None if none (yet)"""
385 username: Optional[str]
386 """The remote username running this bundle or None if none (yet)"""
388 machine: Optional[str]
389 """The remote machine running this bundle or None if none (yet)"""
392 """The controller machine"""
395 """A unique filename to hold the work to be done"""
398 """Where the results should be placed / read from"""
401 """The process id of the local subprocess watching the ssh connection
402 to the remote machine"""
410 slower_than_local_p95: bool
411 """Currently slower then 95% of other bundles on remote host"""
413 slower_than_global_p95: bool
414 """Currently slower than 95% of other bundles globally"""
416 src_bundle: Optional[BundleDetails]
417 """If this is a backup bundle, this points to the original bundle
418 that it's backing up. None otherwise."""
420 is_cancelled: threading.Event
421 """An event that can be signaled to indicate this bundle is cancelled.
422 This is set when another copy (backup or original) of this work has
423 completed successfully elsewhere."""
426 """True if this bundle was cancelled, False if it finished normally"""
428 backup_bundles: Optional[List[BundleDetails]]
429 """If we've created backups of this bundle, this is the list of them"""
432 """How many times has this bundle failed already?"""
436 if uuid[-9:-2] == '_backup':
438 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
442 # We colorize the uuid based on some bits from it to make them
443 # stand out in the logging and help a reader correlate log messages
444 # related to the same bundle.
451 fg('marigold yellow'),
454 fg('cornflower blue'),
455 fg('turquoise blue'),
457 fg('lavender purple'),
460 c = colorz[int(uuid[-2:], 16) % len(colorz)]
462 self.function_name if self.function_name is not None else 'nofname'
464 machine = self.machine if self.machine is not None else 'nomachine'
465 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
468 class RemoteExecutorStatus:
469 """A status 'scoreboard' for a remote executor tracking various
470 metrics and able to render a periodic dump of global state.
473 def __init__(self, total_worker_count: int) -> None:
476 total_worker_count: number of workers in the pool
478 self.worker_count: int = total_worker_count
479 self.known_workers: Set[RemoteWorkerRecord] = set()
480 self.start_time: float = time.time()
481 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
482 self.end_per_bundle: Dict[str, float] = defaultdict(float)
483 self.finished_bundle_timings_per_worker: Dict[
484 RemoteWorkerRecord, math_utils.NumericPopulation
486 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
487 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
488 self.finished_bundle_timings: math_utils.NumericPopulation = (
489 math_utils.NumericPopulation()
491 self.last_periodic_dump: Optional[float] = None
492 self.total_bundles_submitted: int = 0
494 # Protects reads and modification using self. Also used
495 # as a memory fence for modifications to bundle.
496 self.lock: threading.Lock = threading.Lock()
498 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
499 """Record that bundle with uuid is assigned to a particular worker.
502 worker: the record of the worker to which uuid is assigned
503 uuid: the uuid of a bundle that has been assigned to a worker
506 self.record_acquire_worker_already_locked(worker, uuid)
508 def record_acquire_worker_already_locked(
509 self, worker: RemoteWorkerRecord, uuid: str
511 """Same as above but an entry point that doesn't acquire the lock
512 for codepaths where it's already held."""
513 assert self.lock.locked()
514 self.known_workers.add(worker)
515 self.start_per_bundle[uuid] = None
516 x = self.in_flight_bundles_by_worker.get(worker, set())
518 self.in_flight_bundles_by_worker[worker] = x
520 def record_bundle_details(self, details: BundleDetails) -> None:
521 """Register the details about a bundle of work."""
523 self.record_bundle_details_already_locked(details)
525 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
526 """Same as above but for codepaths that already hold the lock."""
527 assert self.lock.locked()
528 self.bundle_details_by_uuid[details.uuid] = details
530 def record_release_worker(
532 worker: RemoteWorkerRecord,
536 """Record that a bundle has released a worker."""
538 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
540 def record_release_worker_already_locked(
542 worker: RemoteWorkerRecord,
546 """Same as above but for codepaths that already hold the lock."""
547 assert self.lock.locked()
549 self.end_per_bundle[uuid] = ts
550 self.in_flight_bundles_by_worker[worker].remove(uuid)
551 if not was_cancelled:
552 start = self.start_per_bundle[uuid]
553 assert start is not None
554 bundle_latency = ts - start
555 x = self.finished_bundle_timings_per_worker.get(
556 worker, math_utils.NumericPopulation()
558 x.add_number(bundle_latency)
559 self.finished_bundle_timings_per_worker[worker] = x
560 self.finished_bundle_timings.add_number(bundle_latency)
562 def record_processing_began(self, uuid: str):
563 """Record when work on a bundle begins."""
565 self.start_per_bundle[uuid] = time.time()
567 def total_in_flight(self) -> int:
568 """How many bundles are in flight currently?"""
569 assert self.lock.locked()
571 for worker in self.known_workers:
572 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
573 return total_in_flight
575 def total_idle(self) -> int:
576 """How many idle workers are there currently?"""
577 assert self.lock.locked()
578 return self.worker_count - self.total_in_flight()
581 assert self.lock.locked()
583 total_finished = len(self.finished_bundle_timings)
584 total_in_flight = self.total_in_flight()
585 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
588 if len(self.finished_bundle_timings) > 1:
589 qall_median = self.finished_bundle_timings.get_median()
590 qall_p95 = self.finished_bundle_timings.get_percentile(95)
592 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
593 f'✅={total_finished}/{self.total_bundles_submitted}, '
594 f'💻n={total_in_flight}/{self.worker_count}\n'
598 f'⏱={ts-self.start_time:.1f}s, '
599 f'✅={total_finished}/{self.total_bundles_submitted}, '
600 f'💻n={total_in_flight}/{self.worker_count}\n'
603 for worker in self.known_workers:
604 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
605 timings = self.finished_bundle_timings_per_worker.get(
606 worker, math_utils.NumericPopulation()
609 qworker_median = None
612 qworker_median = timings.get_median()
613 qworker_p95 = timings.get_percentile(95)
614 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
618 ret += f' ...finished {count} total bundle(s) so far\n'
619 in_flight = len(self.in_flight_bundles_by_worker[worker])
621 ret += f' ...{in_flight} bundles currently in flight:\n'
622 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
623 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
624 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
625 if self.start_per_bundle[bundle_uuid] is not None:
626 sec = ts - self.start_per_bundle[bundle_uuid]
627 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
629 ret += f' {details} setting up / copying data...'
632 if qworker_p95 is not None:
633 if sec > qworker_p95:
634 ret += f'{bg("red")}>💻p95{reset()} '
635 if details is not None:
636 details.slower_than_local_p95 = True
638 if details is not None:
639 details.slower_than_local_p95 = False
641 if qall_p95 is not None:
643 ret += f'{bg("red")}>∀p95{reset()} '
644 if details is not None:
645 details.slower_than_global_p95 = True
647 details.slower_than_global_p95 = False
651 def periodic_dump(self, total_bundles_submitted: int) -> None:
652 assert self.lock.locked()
653 self.total_bundles_submitted = total_bundles_submitted
655 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
657 self.last_periodic_dump = ts
660 class RemoteWorkerSelectionPolicy(ABC):
661 """An interface definition of a policy for selecting a remote worker."""
664 self.workers: Optional[List[RemoteWorkerRecord]] = None
666 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
667 self.workers = workers
670 def is_worker_available(self) -> bool:
675 self, machine_to_avoid: str = None
676 ) -> Optional[RemoteWorkerRecord]:
680 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
681 """A remote worker selector that uses weighted RNG."""
684 def is_worker_available(self) -> bool:
686 for worker in self.workers:
693 self, machine_to_avoid: str = None
694 ) -> Optional[RemoteWorkerRecord]:
697 for worker in self.workers:
698 if worker.machine != machine_to_avoid:
700 for _ in range(worker.count * worker.weight):
701 grabbag.append(worker)
703 if len(grabbag) == 0:
705 'There are no available workers that avoid %s', machine_to_avoid
708 for worker in self.workers:
710 for _ in range(worker.count * worker.weight):
711 grabbag.append(worker)
713 if len(grabbag) == 0:
714 logger.warning('There are no available workers?!')
717 worker = random.sample(grabbag, 1)[0]
718 assert worker.count > 0
720 logger.debug('Selected worker %s', worker)
724 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
725 """A remote worker selector that just round robins."""
727 def __init__(self) -> None:
732 def is_worker_available(self) -> bool:
734 for worker in self.workers:
739 def _increment_index(self, index: int) -> None:
742 if index >= len(self.workers):
748 self, machine_to_avoid: str = None
749 ) -> Optional[RemoteWorkerRecord]:
753 worker = self.workers[x]
754 if worker.machine != machine_to_avoid and worker.count > 0:
756 self._increment_index(x)
757 logger.debug('Selected worker %s', worker)
760 if x >= len(self.workers):
763 logger.warning('Unexpectedly could not find a worker, retrying...')
768 class RemoteExecutor(BaseExecutor):
769 """An executor that uses processes on remote machines to do work.
770 To do so, it requires that a pool of remote workers to be properly
771 configured. See instructions in
772 :class:`pyutils.parallelize.parallelize`.
774 Each machine in a worker pool has a *weight* and a *count*. A
775 *weight* captures the relative speed of a processor on that worker
776 and a *count* captures the number of synchronous tasks the worker
777 can accept (i.e. the number of cpus on the machine).
779 To dispatch work to a remote machine, this class pickles the code
780 to be executed remotely using `cloudpickle`. For that to work,
781 the remote machine should be running the same version of Python as
782 this machine, ideally in a virtual environment with the same
783 import libraries installed. Differences in operating system
784 and/or processor architecture don't seem to matter for most code,
789 Mismatches in Python version or in the version numbers of
790 third-party libraries between machines can cause problems
791 when trying to unpickle and run code remotely.
793 Work to be dispatched is represented in this code by creating a
794 "bundle". Each bundle is assigned to a remote worker based on
795 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
796 general, it attempts to load all workers in the pool and maximize
797 throughput. Once assigned to a remote worker, pickled code is
798 copied to that worker via `scp` and a remote command is issued via
799 `ssh` to execute a :file:`remote_worker.py` process on the remote
800 machine. This process unpickles the code, runs it, and produces a
801 result which is then copied back to the local machine (again via
802 `scp`) where it can be processed by local code.
804 You can and probably must override the path of
805 :file:`remote_worker.py` on your pool machines using the
806 `--remote_worker_helper_path` commandline argument (or by just
807 changing the default in code, see above in this file's code).
809 During remote work execution, this local machine acts as a
810 controller dispatching all work to the network, copying pickled
811 tasks out, and copying results back in. It may also be a worker
812 in the pool but do not underestimate the cost of being a
813 controller -- it takes some cpu and a lot of network bandwidth.
814 The work dispatcher logic attempts to detect when a controller is
815 also a worker and reduce its load.
817 Some redundancy and safety provisions are made when scheduling
818 tasks to the worker pool; e.g. slower than expected tasks have
819 redundant backups tasks created, especially if there are otherwise
820 idle workers. If a task fails repeatedly, the dispatcher consider
821 it poisoned and give up on it.
825 This executor probably only makes sense to use with
826 computationally expensive tasks such as jobs that will execute
827 for ~30 seconds or longer.
829 The network overhead and latency of copying work from the
830 controller (local) machine to the remote workers and copying
831 results back again is relatively high. Especially at startup,
832 the network can become a bottleneck. Future versions of this
833 code may attempt to split the responsibility of being a
834 controller (distributing work to pool machines).
836 Instructions for how to set this up are provided in
837 :class:`pyutils.parallelize.parallelize`.
839 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
845 workers: List[RemoteWorkerRecord],
846 policy: RemoteWorkerSelectionPolicy,
850 workers: A list of remote workers we can call on to do tasks.
851 policy: A policy for selecting remote workers for tasks.
855 self.workers = workers
857 self.worker_count = 0
858 for worker in self.workers:
859 self.worker_count += worker.count
860 if self.worker_count <= 0:
861 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
863 raise RemoteExecutorException(msg)
864 self.policy.register_worker_pool(self.workers)
865 self.cv = threading.Condition()
867 'Creating %d local threads, one per remote worker.', self.worker_count
869 self._helper_executor = fut.ThreadPoolExecutor(
870 thread_name_prefix="remote_executor_helper",
871 max_workers=self.worker_count,
873 self.status = RemoteExecutorStatus(self.worker_count)
874 self.total_bundles_submitted = 0
875 self.backup_lock = threading.Lock()
876 self.last_backup = None
878 self.heartbeat_thread,
879 self.heartbeat_stop_event,
880 ) = self._run_periodic_heartbeat()
881 self.already_shutdown = False
884 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
886 We create a background thread to invoke :meth:`_heartbeat` regularly
887 while we are scheduling work. It does some accounting such as
888 looking for slow bundles to tag for backup creation, checking for
889 unexpected failures, and printing a fancy message on stdout.
891 while not stop_event.is_set():
893 logger.debug('Running periodic heartbeat code...')
895 logger.debug('Periodic heartbeat thread shutting down.')
897 def _heartbeat(self) -> None:
898 # Note: this is invoked on a background thread, not an
899 # executor thread. Be careful what you do with it b/c it
900 # needs to get back and dump status again periodically.
901 with self.status.lock:
902 self.status.periodic_dump(self.total_bundles_submitted)
904 # Look for bundles to reschedule via executor.submit
905 if config.config['executors_schedule_remote_backups']:
906 self._maybe_schedule_backup_bundles()
908 def _maybe_schedule_backup_bundles(self):
909 """Maybe schedule backup bundles if we see a very slow bundle."""
911 assert self.status.lock.locked()
912 num_done = len(self.status.finished_bundle_timings)
913 num_idle_workers = self.worker_count - self.task_count
917 and num_idle_workers > 0
918 and (self.last_backup is None or (now - self.last_backup > 9.0))
919 and self.backup_lock.acquire(blocking=False)
922 assert self.backup_lock.locked()
924 bundle_to_backup = None
929 ) in self.status.in_flight_bundles_by_worker.items():
931 # Prefer to schedule backups of bundles running on
934 for record in self.workers:
935 if worker.machine == record.machine:
936 temp_score = float(record.weight)
937 temp_score = 1.0 / temp_score
939 base_score = int(temp_score)
942 for uuid in bundle_uuids:
943 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
946 and bundle.src_bundle is None
947 and bundle.backup_bundles is not None
951 # Schedule backups of bundles running
952 # longer; especially those that are
954 start_ts = self.status.start_per_bundle[uuid]
955 if start_ts is not None:
956 runtime = now - start_ts
959 'score[%s] => %.1f # latency boost', bundle, score
962 if bundle.slower_than_local_p95:
965 'score[%s] => %.1f # >worker p95',
970 if bundle.slower_than_global_p95:
973 'score[%s] => %.1f # >global p95',
978 # Prefer backups of bundles that don't
979 # have backups already.
980 backup_count = len(bundle.backup_bundles)
981 if backup_count == 0:
983 elif backup_count == 1:
985 elif backup_count == 2:
990 'score[%s] => %.1f # {backup_count} dup backup factor',
996 best_score is None or score > best_score
998 bundle_to_backup = bundle
999 assert bundle is not None
1000 assert bundle.backup_bundles is not None
1001 assert bundle.src_bundle is None
1004 # Note: this is all still happening on the heartbeat
1005 # runner thread. That's ok because
1006 # _schedule_backup_for_bundle uses the executor to
1007 # submit the bundle again which will cause it to be
1008 # picked up by a worker thread and allow this thread
1009 # to return to run future heartbeats.
1010 if bundle_to_backup is not None:
1011 self.last_backup = now
1013 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1017 self._schedule_backup_for_bundle(bundle_to_backup)
1019 self.backup_lock.release()
1021 def _is_worker_available(self) -> bool:
1022 """Is there a worker available currently?"""
1023 return self.policy.is_worker_available()
1025 def _acquire_worker(
1026 self, machine_to_avoid: str = None
1027 ) -> Optional[RemoteWorkerRecord]:
1028 """Try to acquire a worker."""
1029 return self.policy.acquire_worker(machine_to_avoid)
1031 def _find_available_worker_or_block(
1032 self, machine_to_avoid: str = None
1033 ) -> RemoteWorkerRecord:
1034 """Find a worker or block until one becomes available."""
1036 while not self._is_worker_available():
1038 worker = self._acquire_worker(machine_to_avoid)
1039 if worker is not None:
1041 msg = "We should never reach this point in the code"
1042 logger.critical(msg)
1043 raise Exception(msg)
1045 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1046 """Release a previously acquired worker."""
1047 worker = bundle.worker
1048 assert worker is not None
1049 logger.debug('Released worker %s', worker)
1050 self.status.record_release_worker(
1058 self.adjust_task_count(-1)
1060 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1061 """See if a particular bundle is cancelled. Do not block."""
1062 with self.status.lock:
1063 if bundle.is_cancelled.wait(timeout=0.0):
1064 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1065 bundle.was_cancelled = True
1069 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1070 """Find a worker for bundle or block until one is available."""
1072 self.adjust_task_count(+1)
1074 controller = bundle.controller
1075 avoid_machine = override_avoid_machine
1076 is_original = bundle.src_bundle is None
1078 # Try not to schedule a backup on the same host as the original.
1079 if avoid_machine is None and bundle.src_bundle is not None:
1080 avoid_machine = bundle.src_bundle.machine
1082 while worker is None:
1083 worker = self._find_available_worker_or_block(avoid_machine)
1084 assert worker is not None
1086 # Ok, found a worker.
1087 bundle.worker = worker
1088 machine = bundle.machine = worker.machine
1089 username = bundle.username = worker.username
1090 self.status.record_acquire_worker(worker, uuid)
1091 logger.debug('%s: Running bundle on %s...', bundle, worker)
1093 # Before we do any work, make sure the bundle is still viable.
1094 # It may have been some time between when it was submitted and
1095 # now due to lack of worker availability and someone else may
1096 # have already finished it.
1097 if self._check_if_cancelled(bundle):
1099 return self._process_work_result(bundle)
1102 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1104 self._release_worker(bundle)
1106 # Weird. We are the original owner of this
1107 # bundle. For it to have been cancelled, a backup
1108 # must have already started and completed before
1109 # we even for started. Moreover, the backup says
1110 # it is done but we can't find the results it
1111 # should have copied over. Reschedule the whole
1114 '%s: We are the original owner thread and yet there are '
1115 'no results for this bundle. This is unexpected and bad. '
1116 'Attempting an emergency retry...',
1119 return self._emergency_retry_nasty_bundle(bundle)
1121 # We're a backup and our bundle is cancelled
1122 # before we even got started. Do nothing and let
1123 # the original bundle's thread worry about either
1124 # finding the results or complaining about it.
1127 # Send input code / data to worker machine if it's not local.
1128 if controller not in machine:
1131 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1133 start_ts = time.time()
1134 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1136 xfer_latency = time.time() - start_ts
1138 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1141 self._release_worker(bundle)
1143 # Weird. We tried to copy the code to the worker
1144 # and it failed... And we're the original bundle.
1147 "%s: Failed to send instructions to the worker machine?! "
1148 "This is not expected; we\'re the original bundle so this shouldn\'t "
1149 "be a race condition. Attempting an emergency retry...",
1152 return self._emergency_retry_nasty_bundle(bundle)
1154 # This is actually expected; we're a backup.
1155 # There's a race condition where someone else
1156 # already finished the work and removed the source
1157 # code_file before we could copy it. Ignore.
1159 '%s: Failed to send instructions to the worker machine... '
1160 'We\'re a backup and this may be caused by the original (or '
1161 'some other backup) already finishing this work. Ignoring.',
1166 # Kick off the work. Note that if this fails we let
1167 # _wait_for_process deal with it.
1168 self.status.record_processing_began(uuid)
1169 helper_path = config.config['remote_worker_helper_path']
1171 f'{SSH} {bundle.username}@{bundle.machine} '
1172 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1175 '%s: Executing %s in the background to kick off work...', bundle, cmd
1177 p = cmd_in_background(cmd, silent=True)
1180 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1182 return self._wait_for_process(p, bundle, 0)
1184 def _wait_for_process(
1185 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1187 """At this point we've copied the bundle's pickled code to the remote
1188 worker and started an ssh process that should be invoking the
1189 remote worker to have it execute the user's code. See how
1190 that's going and wait for it to complete or fail. Note that
1191 this code is recursive: there are codepaths where we decide to
1192 stop waiting for an ssh process (because another backup seems
1193 to have finished) but then fail to fetch or parse the results
1194 from that backup and thus call ourselves to continue waiting
1195 on an active ssh process. This is the purpose of the depth
1196 argument: to curtail potential infinite recursion by giving up
1200 p: the Popen record of the ssh job
1201 bundle: the bundle of work being executed remotely
1202 depth: how many retries we've made so far. Starts at zero.
1206 machine = bundle.machine
1207 assert p is not None
1208 pid = p.pid # pid of the ssh process
1211 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1215 self._release_worker(bundle)
1216 return self._emergency_retry_nasty_bundle(bundle)
1218 # Spin until either the ssh job we scheduled finishes the
1219 # bundle or some backup worker signals that they finished it
1223 p.wait(timeout=0.25)
1224 except subprocess.TimeoutExpired:
1225 if self._check_if_cancelled(bundle):
1227 '%s: looks like another worker finished bundle...', bundle
1231 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1235 # If we get here we believe the bundle is done; either the ssh
1236 # subprocess finished (hopefully successfully) or we noticed
1237 # that some other worker seems to have completed the bundle
1238 # before us and we're bailing out.
1240 ret = self._process_work_result(bundle)
1241 if ret is not None and p is not None:
1245 # Something went wrong; e.g. we could not copy the results
1246 # back, cleanup after ourselves on the remote machine, or
1247 # unpickle the results we got from the remove machine. If we
1248 # still have an active ssh subprocess, keep waiting on it.
1249 # Otherwise, time for an emergency reschedule.
1251 logger.exception('%s: Something unexpected just happened...', bundle)
1254 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1257 return self._wait_for_process(p, bundle, depth + 1)
1259 self._release_worker(bundle)
1260 return self._emergency_retry_nasty_bundle(bundle)
1262 def _process_work_result(self, bundle: BundleDetails) -> Any:
1263 """A bundle seems to be completed. Check on the results."""
1265 with self.status.lock:
1266 is_original = bundle.src_bundle is None
1267 was_cancelled = bundle.was_cancelled
1268 username = bundle.username
1269 machine = bundle.machine
1270 result_file = bundle.result_file
1271 code_file = bundle.code_file
1273 # Whether original or backup, if we finished first we must
1274 # fetch the results if the computation happened on a
1276 bundle.end_ts = time.time()
1277 if not was_cancelled:
1278 assert bundle.machine is not None
1279 if bundle.controller not in bundle.machine:
1280 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1282 "%s: Fetching results back from %s@%s via %s",
1289 # If either of these throw they are handled in
1290 # _wait_for_process.
1295 except Exception as e:
1302 # Cleanup remote /tmp files.
1304 f'{SSH} {username}@{machine}'
1305 f' "/bin/rm -f {code_file} {result_file}"'
1308 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1310 dur = bundle.end_ts - bundle.start_ts
1311 self.histogram.add_item(dur)
1313 # Only the original worker should unpickle the file contents
1314 # though since it's the only one whose result matters. The
1315 # original is also the only job that may delete result_file
1316 # from disk. Note that the original may have been cancelled
1317 # if one of the backups finished first; it still must read the
1318 # result from disk. It still does that here with is_cancelled
1321 logger.debug("%s: Unpickling %s.", bundle, result_file)
1323 with open(result_file, 'rb') as rb:
1324 serialized = rb.read()
1325 result = cloudpickle.loads(serialized)
1326 except Exception as e:
1327 logger.exception('Failed to load %s... this is bad news.', result_file)
1328 self._release_worker(bundle)
1330 # Re-raise the exception; the code in _wait_for_process may
1331 # decide to _emergency_retry_nasty_bundle here.
1333 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1334 os.remove(result_file)
1335 os.remove(code_file)
1337 # Notify any backups that the original is done so they
1338 # should stop ASAP. Do this whether or not we
1339 # finished first since there could be more than one
1341 if bundle.backup_bundles is not None:
1342 for backup in bundle.backup_bundles:
1344 '%s: Notifying backup %s that it\'s cancelled',
1348 backup.is_cancelled.set()
1350 # This is a backup job and, by now, we have already fetched
1351 # the bundle results.
1353 # Backup results don't matter, they just need to leave the
1354 # result file in the right place for their originals to
1355 # read/unpickle later.
1358 # Tell the original to stop if we finished first.
1359 if not was_cancelled:
1360 orig_bundle = bundle.src_bundle
1361 assert orig_bundle is not None
1363 '%s: Notifying original %s we beat them to it.',
1367 orig_bundle.is_cancelled.set()
1368 self._release_worker(bundle, was_cancelled=was_cancelled)
1371 def _create_original_bundle(self, pickle, function_name: str):
1372 """Creates a bundle that is not a backup of any other bundle but
1373 rather represents a user task.
1376 uuid = string_utils.generate_uuid(omit_dashes=True)
1377 code_file = f'/tmp/{uuid}.code.bin'
1378 result_file = f'/tmp/{uuid}.result.bin'
1380 logger.debug('Writing pickled code to %s', code_file)
1381 with open(code_file, 'wb') as wb:
1384 bundle = BundleDetails(
1385 pickled_code=pickle,
1387 function_name=function_name,
1391 controller=platform.node(),
1392 code_file=code_file,
1393 result_file=result_file,
1395 start_ts=time.time(),
1397 slower_than_local_p95=False,
1398 slower_than_global_p95=False,
1400 is_cancelled=threading.Event(),
1401 was_cancelled=False,
1405 self.status.record_bundle_details(bundle)
1406 logger.debug('%s: Created an original bundle', bundle)
1409 def _create_backup_bundle(self, src_bundle: BundleDetails):
1410 """Creates a bundle that is a backup of another bundle that is
1411 running too slowly."""
1413 assert self.status.lock.locked()
1414 assert src_bundle.backup_bundles is not None
1415 n = len(src_bundle.backup_bundles)
1416 uuid = src_bundle.uuid + f'_backup#{n}'
1418 backup_bundle = BundleDetails(
1419 pickled_code=src_bundle.pickled_code,
1421 function_name=src_bundle.function_name,
1425 controller=src_bundle.controller,
1426 code_file=src_bundle.code_file,
1427 result_file=src_bundle.result_file,
1429 start_ts=time.time(),
1431 slower_than_local_p95=False,
1432 slower_than_global_p95=False,
1433 src_bundle=src_bundle,
1434 is_cancelled=threading.Event(),
1435 was_cancelled=False,
1436 backup_bundles=None, # backup backups not allowed
1439 src_bundle.backup_bundles.append(backup_bundle)
1440 self.status.record_bundle_details_already_locked(backup_bundle)
1441 logger.debug('%s: Created a backup bundle', backup_bundle)
1442 return backup_bundle
1444 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1445 """Schedule a backup of src_bundle."""
1447 assert self.status.lock.locked()
1448 assert src_bundle is not None
1449 backup_bundle = self._create_backup_bundle(src_bundle)
1451 '%s/%s: Scheduling backup for execution...',
1453 backup_bundle.function_name,
1455 self._helper_executor.submit(self._launch, backup_bundle)
1457 # Results from backups don't matter; if they finish first
1458 # they will move the result_file to this machine and let
1459 # the original pick them up and unpickle them (and return
1462 def _emergency_retry_nasty_bundle(
1463 self, bundle: BundleDetails
1464 ) -> Optional[fut.Future]:
1465 """Something unexpectedly failed with bundle. Either retry it
1466 from the beginning or throw in the towel and give up on it."""
1468 is_original = bundle.src_bundle is None
1469 bundle.worker = None
1470 avoid_last_machine = bundle.machine
1471 bundle.machine = None
1472 bundle.username = None
1473 bundle.failure_count += 1
1479 if bundle.failure_count > retry_limit:
1481 '%s: Tried this bundle too many times already (%dx); giving up.',
1486 raise RemoteExecutorException(
1487 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1490 '%s: At least it\'s only a backup; better luck with the others.',
1495 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1498 return self._launch(bundle, avoid_last_machine)
1501 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1502 """Submit work to be done. This is the user entry point of this
1504 if self.already_shutdown:
1505 raise Exception('Submitted work after shutdown.')
1506 pickle = _make_cloud_pickle(function, *args, **kwargs)
1507 bundle = self._create_original_bundle(pickle, function.__name__)
1508 self.total_bundles_submitted += 1
1509 return self._helper_executor.submit(self._launch, bundle)
1512 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1513 """Shutdown the executor."""
1514 if not self.already_shutdown:
1515 logging.debug('Shutting down RemoteExecutor %s', self.title)
1516 self.heartbeat_stop_event.set()
1517 self.heartbeat_thread.join()
1518 self._helper_executor.shutdown(wait)
1520 print(self.histogram.__repr__(label_formatter='%ds'))
1521 self.already_shutdown = True
1524 class RemoteWorkerPoolProvider:
1526 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1530 @persistent.persistent_autoloaded_singleton() # type: ignore
1531 class ConfigRemoteWorkerPoolProvider(
1532 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1534 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1535 self.remote_worker_pool: List[RemoteWorkerRecord] = []
1536 for record in json_remote_worker_pool['remote_worker_records']:
1537 self.remote_worker_pool.append(
1538 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1540 assert len(self.remote_worker_pool) > 0
1543 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1544 return self.remote_worker_pool
1547 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1548 return self.remote_worker_pool
1552 def get_filename() -> str:
1553 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1557 def should_we_load_data(filename: str) -> bool:
1562 def should_we_save_data(filename: str) -> bool:
1567 class DefaultExecutors(object):
1568 """A container for a default thread, process and remote executor.
1569 These are not created until needed and we take care to clean up
1570 before process exit automatically for the caller's convenience.
1571 Instead of creating your own executor, consider using the one
1572 from this pool. e.g.::
1574 @par.parallelize(method=par.Method.PROCESS)
1576 solutions: List[Work],
1583 def start_do_work(all_work: List[Work]):
1585 logger.debug('Sharding work into groups of 10.')
1586 for subset in list_utils.shard(all_work, 10):
1587 shards.append([x for x in subset])
1589 logger.debug('Kicking off helper pool.')
1591 for n, shard in enumerate(shards):
1594 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1597 smart_future.wait_all(results)
1599 # Note: if you forget to do this it will clean itself up
1600 # during program termination including tearing down any
1601 # active ssh connections.
1602 executors.DefaultExecutors().process_pool().shutdown()
1606 self.thread_executor: Optional[ThreadExecutor] = None
1607 self.process_executor: Optional[ProcessExecutor] = None
1608 self.remote_executor: Optional[RemoteExecutor] = None
1611 def _ping(host) -> bool:
1612 logger.debug('RUN> ping -c 1 %s', host)
1615 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1621 def thread_pool(self) -> ThreadExecutor:
1622 if self.thread_executor is None:
1623 self.thread_executor = ThreadExecutor()
1624 return self.thread_executor
1626 def process_pool(self) -> ProcessExecutor:
1627 if self.process_executor is None:
1628 self.process_executor = ProcessExecutor()
1629 return self.process_executor
1631 def remote_pool(self) -> RemoteExecutor:
1632 if self.remote_executor is None:
1633 logger.info('Looking for some helper machines...')
1634 provider = ConfigRemoteWorkerPoolProvider()
1635 all_machines = provider.get_remote_workers()
1638 # Make sure we can ping each machine.
1639 for record in all_machines:
1640 if self._ping(record.machine):
1641 logger.info('%s is alive / responding to pings', record.machine)
1644 # The controller machine has a lot to do; go easy on it.
1646 if record.machine == platform.node() and record.count > 1:
1647 logger.info('Reducing workload for %s.', record.machine)
1648 record.count = max(int(record.count / 2), 1)
1650 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1651 policy.register_worker_pool(pool)
1652 self.remote_executor = RemoteExecutor(pool, policy)
1653 return self.remote_executor
1655 def shutdown(self) -> None:
1656 if self.thread_executor is not None:
1657 self.thread_executor.shutdown(wait=True, quiet=True)
1658 self.thread_executor = None
1659 if self.process_executor is not None:
1660 self.process_executor.shutdown(wait=True, quiet=True)
1661 self.process_executor = None
1662 if self.remote_executor is not None:
1663 self.remote_executor.shutdown(wait=True, quiet=True)
1664 self.remote_executor = None