2 # -*- coding: utf-8 -*-
3 # pylint: disable=too-many-instance-attributes
4 # pylint: disable=too-many-nested-blocks
6 # © Copyright 2021-2023, Scott Gasch
9 This module defines a :class:`BaseExecutor` interface and three
12 - :class:`ThreadExecutor`
13 - :class:`ProcessExecutor`
14 - :class:`RemoteExecutor`
16 The :class:`ThreadExecutor` is used to dispatch work to background
17 threads in the same Python process for parallelized work. Of course,
18 until the Global Interpreter Lock (GIL) bottleneck is resolved, this
19 is not terribly useful for compute-bound code. But it's good for
20 work that is mostly I/O bound.
22 The :class:`ProcessExecutor` is used to dispatch work to other
23 processes on the same machine and is more useful for compute-bound
26 The :class:`RemoteExecutor` is used in conjunection with `ssh`,
27 the `cloudpickle` dependency, and `remote_worker.py <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=src/pyutils/remote_worker.py;hb=HEAD>`_ file
28 to dispatch work to a set of remote worker machines on your
29 network. You can configure this pool via a JSON configuration file,
30 an example of which `can be found in examples <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
32 Finally, this file defines a :class:`DefaultExecutors` pool that
33 contains a pre-created and ready instance of each of the three
34 executors discussed. It has the added benefit of being automatically
35 cleaned up at process termination time.
37 See instructions in :mod:`pyutils.parallelize.parallelize` for
38 setting up and using the framework.
41 from __future__ import annotations
43 import concurrent.futures as fut
52 from abc import ABC, abstractmethod
53 from collections import defaultdict
54 from dataclasses import dataclass
55 from typing import Any, Callable, Dict, List, Optional, Set
57 import cloudpickle # type: ignore
58 from overrides import overrides
60 import pyutils.typez.histogram as hist
69 from pyutils.ansi import bg, fg, reset, underline
70 from pyutils.decorator_utils import singleton
71 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
72 from pyutils.parallelize.thread_utils import background_thread
73 from pyutils.typez import type_utils
75 logger = logging.getLogger(__name__)
77 parser = config.add_commandline_args(
78 f"Executors ({__file__})", "Args related to processing executors."
81 '--executors_threadpool_size',
84 help='Number of threads in the default threadpool, leave unset for default',
88 '--executors_processpool_size',
91 help='Number of processes in the default processpool, leave unset for default',
95 '--executors_schedule_remote_backups',
97 action=argparse_utils.ActionNoYes,
98 help='Should we schedule duplicative backup work if a remote bundle is slow',
101 '--executors_max_bundle_failures',
105 help='Maximum number of failures before giving up on a bundle',
108 '--remote_worker_records_file',
111 help='Path of the remote worker records file (JSON)',
112 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
115 '--remote_worker_helper_path',
117 metavar='PATH_TO_REMOTE_WORKER_PY',
118 help='Path to remote_worker.py on remote machines',
119 default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
123 SSH = '/usr/bin/ssh -oForwardX11=no'
124 SCP = '/usr/bin/scp -C'
127 def _make_cloud_pickle(fun, *args, **kwargs):
128 """Internal helper to create cloud pickles."""
129 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
130 return cloudpickle.dumps((fun, args, kwargs))
133 class BaseExecutor(ABC):
134 """The base executor interface definition. The interface for
135 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
136 :class:`ThreadExecutor`.
139 def __init__(self, *, title=''):
142 title: the name of this executor.
145 self.histogram = hist.SimpleHistogram(
146 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
151 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
152 """Submit work for the executor to do.
155 function: the Callable to be executed.
156 *args: the arguments to function
157 **kwargs: the arguments to function
160 A concurrent :class:`Future` representing the result of the
166 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
167 """Shutdown the executor.
170 wait: wait for the shutdown to complete before returning?
171 quiet: keep it quiet, please.
175 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
176 """Shutdown the executor and return True if the executor is idle
177 (i.e. there are no pending or active tasks). Return False
178 otherwise. Note: this should only be called by the launcher
182 quiet: keep it quiet, please.
185 True if the executor could be shut down because it has no
186 pending work, False otherwise.
188 if self.task_count == 0:
189 self.shutdown(wait=True, quiet=quiet)
193 def adjust_task_count(self, delta: int) -> None:
194 """Change the task count. Note: do not call this method from a
195 worker, it should only be called by the launcher process /
199 delta: the delta value by which to adjust task count.
201 self.task_count += delta
202 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
204 def get_task_count(self) -> int:
205 """Change the task count. Note: do not call this method from a
206 worker, it should only be called by the launcher process /
210 The executor's current task count.
212 return self.task_count
215 class ThreadExecutor(BaseExecutor):
216 """A threadpool executor. This executor uses Python threads to
217 schedule tasks. Note that, at least as of python3.10, because of
218 the global lock in the interpreter itself, these do not
219 parallelize very well so this class is useful mostly for non-CPU
222 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
225 def __init__(self, max_workers: Optional[int] = None):
228 max_workers: maximum number of threads to create in the pool.
232 if max_workers is not None:
233 workers = max_workers
234 elif 'executors_threadpool_size' in config.config:
235 workers = config.config['executors_threadpool_size']
236 if workers is not None:
237 logger.debug('Creating threadpool executor with %d workers', workers)
239 logger.debug('Creating a default sized threadpool executor')
240 self._thread_pool_executor = fut.ThreadPoolExecutor(
241 max_workers=workers, thread_name_prefix="thread_executor_helper"
243 self.already_shutdown = False
245 # This is run on a different thread; do not adjust task count here.
247 def _run_local_bundle(fun, *args, **kwargs):
248 logger.debug("Running local bundle at %s", fun.__name__)
249 result = fun(*args, **kwargs)
253 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
254 if self.already_shutdown:
255 raise Exception('Submitted work after shutdown.')
256 self.adjust_task_count(+1)
258 newargs.append(function)
262 result = self._thread_pool_executor.submit(
263 ThreadExecutor._run_local_bundle, *newargs, **kwargs
265 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
266 result.add_done_callback(lambda _: self.adjust_task_count(-1))
270 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
271 if not self.already_shutdown:
272 logger.debug('Shutting down threadpool executor %s', self.title)
273 self._thread_pool_executor.shutdown(wait)
275 print(self.histogram.__repr__(label_formatter='%ds'))
276 self.already_shutdown = True
279 class ProcessExecutor(BaseExecutor):
280 """An executor which runs tasks in child processes.
282 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
285 def __init__(self, max_workers=None):
288 max_workers: the max number of worker processes to create.
292 if max_workers is not None:
293 workers = max_workers
294 elif 'executors_processpool_size' in config.config:
295 workers = config.config['executors_processpool_size']
296 if workers is not None:
297 logger.debug('Creating processpool executor with %d workers.', workers)
299 logger.debug('Creating a default sized processpool executor')
300 self._process_executor = fut.ProcessPoolExecutor(
303 self.already_shutdown = False
305 # This is run in another process; do not adjust task count here.
307 def _run_cloud_pickle(pickle):
308 fun, args, kwargs = cloudpickle.loads(pickle)
309 logger.debug("Running pickled bundle at %s", fun.__name__)
310 result = fun(*args, **kwargs)
314 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
315 if self.already_shutdown:
316 raise Exception('Submitted work after shutdown.')
318 self.adjust_task_count(+1)
319 pickle = _make_cloud_pickle(function, *args, **kwargs)
320 result = self._process_executor.submit(
321 ProcessExecutor._run_cloud_pickle, pickle
323 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
324 result.add_done_callback(lambda _: self.adjust_task_count(-1))
328 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
329 if not self.already_shutdown:
330 logger.debug('Shutting down processpool executor %s', self.title)
331 self._process_executor.shutdown(wait)
333 print(self.histogram.__repr__(label_formatter='%ds'))
334 self.already_shutdown = True
336 def __getstate__(self):
337 state = self.__dict__.copy()
338 state['_process_executor'] = None
342 class RemoteExecutorException(Exception):
343 """Thrown when a bundle cannot be executed despite several retries."""
349 class RemoteWorkerRecord:
350 """A record of info about a remote worker."""
353 """Username we can ssh into on this machine to run work."""
356 """Machine address / name."""
359 """Relative probability for the weighted policy to select this
360 machine for scheduling work."""
363 """If this machine is selected, what is the maximum number of task
364 that it can handle?"""
367 return hash((self.username, self.machine))
370 return f'{self.username}@{self.machine}'
375 """All info necessary to define some unit of work that needs to be
376 done, where it is being run, its state, whether it is an original
377 bundle of a backup bundle, how many times it has failed, etc...
381 """The code to run, cloud pickled"""
384 """A unique identifier"""
387 """The name of the function we pickled"""
389 worker: Optional[RemoteWorkerRecord]
390 """The remote worker running this bundle or None if none (yet)"""
392 username: Optional[str]
393 """The remote username running this bundle or None if none (yet)"""
395 machine: Optional[str]
396 """The remote machine running this bundle or None if none (yet)"""
399 """The controller machine"""
402 """A unique filename to hold the work to be done"""
405 """Where the results should be placed / read from"""
408 """The process id of the local subprocess watching the ssh connection
409 to the remote machine"""
417 slower_than_local_p95: bool
418 """Currently slower then 95% of other bundles on remote host"""
420 slower_than_global_p95: bool
421 """Currently slower than 95% of other bundles globally"""
423 src_bundle: Optional[BundleDetails]
424 """If this is a backup bundle, this points to the original bundle
425 that it's backing up. None otherwise."""
427 is_cancelled: threading.Event
428 """An event that can be signaled to indicate this bundle is cancelled.
429 This is set when another copy (backup or original) of this work has
430 completed successfully elsewhere."""
433 """True if this bundle was cancelled, False if it finished normally"""
435 backup_bundles: Optional[List[BundleDetails]]
436 """If we've created backups of this bundle, this is the list of them"""
439 """How many times has this bundle failed already?"""
443 if uuid[-9:-2] == '_backup':
445 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
449 # We colorize the uuid based on some bits from it to make them
450 # stand out in the logging and help a reader correlate log messages
451 # related to the same bundle.
458 fg('marigold yellow'),
461 fg('cornflower blue'),
462 fg('turquoise blue'),
464 fg('lavender purple'),
467 c = colorz[int(uuid[-2:], 16) % len(colorz)]
469 self.function_name if self.function_name is not None else 'nofname'
471 machine = self.machine if self.machine is not None else 'nomachine'
472 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
475 class RemoteExecutorStatus:
476 """A status 'scoreboard' for a remote executor tracking various
477 metrics and able to render a periodic dump of global state.
480 def __init__(self, total_worker_count: int) -> None:
483 total_worker_count: number of workers in the pool
485 self.worker_count: int = total_worker_count
486 self.known_workers: Set[RemoteWorkerRecord] = set()
487 self.start_time: float = time.time()
488 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
489 self.end_per_bundle: Dict[str, float] = defaultdict(float)
490 self.finished_bundle_timings_per_worker: Dict[
491 RemoteWorkerRecord, math_utils.NumericPopulation
493 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
494 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
495 self.finished_bundle_timings: math_utils.NumericPopulation = (
496 math_utils.NumericPopulation()
498 self.last_periodic_dump: Optional[float] = None
499 self.total_bundles_submitted: int = 0
501 # Protects reads and modification using self. Also used
502 # as a memory fence for modifications to bundle.
503 self.lock: threading.Lock = threading.Lock()
505 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
506 """Record that bundle with uuid is assigned to a particular worker.
509 worker: the record of the worker to which uuid is assigned
510 uuid: the uuid of a bundle that has been assigned to a worker
513 self.record_acquire_worker_already_locked(worker, uuid)
515 def record_acquire_worker_already_locked(
516 self, worker: RemoteWorkerRecord, uuid: str
518 """Same as above but an entry point that doesn't acquire the lock
519 for codepaths where it's already held."""
520 assert self.lock.locked()
521 self.known_workers.add(worker)
522 self.start_per_bundle[uuid] = None
523 x = self.in_flight_bundles_by_worker.get(worker, set())
525 self.in_flight_bundles_by_worker[worker] = x
527 def record_bundle_details(self, details: BundleDetails) -> None:
528 """Register the details about a bundle of work."""
530 self.record_bundle_details_already_locked(details)
532 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
533 """Same as above but for codepaths that already hold the lock."""
534 assert self.lock.locked()
535 self.bundle_details_by_uuid[details.uuid] = details
537 def record_release_worker(
539 worker: RemoteWorkerRecord,
543 """Record that a bundle has released a worker."""
545 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
547 def record_release_worker_already_locked(
549 worker: RemoteWorkerRecord,
553 """Same as above but for codepaths that already hold the lock."""
554 assert self.lock.locked()
556 self.end_per_bundle[uuid] = ts
557 self.in_flight_bundles_by_worker[worker].remove(uuid)
558 if not was_cancelled:
559 start = self.start_per_bundle[uuid]
560 assert start is not None
561 bundle_latency = ts - start
562 x = self.finished_bundle_timings_per_worker.get(
563 worker, math_utils.NumericPopulation()
565 x.add_number(bundle_latency)
566 self.finished_bundle_timings_per_worker[worker] = x
567 self.finished_bundle_timings.add_number(bundle_latency)
569 def record_processing_began(self, uuid: str):
570 """Record when work on a bundle begins."""
572 self.start_per_bundle[uuid] = time.time()
574 def total_in_flight(self) -> int:
575 """How many bundles are in flight currently?"""
576 assert self.lock.locked()
578 for worker in self.known_workers:
579 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
580 return total_in_flight
582 def total_idle(self) -> int:
583 """How many idle workers are there currently?"""
584 assert self.lock.locked()
585 return self.worker_count - self.total_in_flight()
588 assert self.lock.locked()
590 total_finished = len(self.finished_bundle_timings)
591 total_in_flight = self.total_in_flight()
592 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
595 if len(self.finished_bundle_timings) > 1:
596 qall_median = self.finished_bundle_timings.get_median()
597 qall_p95 = self.finished_bundle_timings.get_percentile(95)
599 f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
600 f'✅={total_finished}/{self.total_bundles_submitted}, '
601 f'💻n={total_in_flight}/{self.worker_count}\n'
605 f'⏱={ts-self.start_time:.1f}s, '
606 f'✅={total_finished}/{self.total_bundles_submitted}, '
607 f'💻n={total_in_flight}/{self.worker_count}\n'
610 for worker in self.known_workers:
611 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
612 timings = self.finished_bundle_timings_per_worker.get(
613 worker, math_utils.NumericPopulation()
616 qworker_median = None
619 qworker_median = timings.get_median()
620 qworker_p95 = timings.get_percentile(95)
621 ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
625 ret += f' ...finished {count} total bundle(s) so far\n'
626 in_flight = len(self.in_flight_bundles_by_worker[worker])
628 ret += f' ...{in_flight} bundles currently in flight:\n'
629 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
630 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
631 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
632 if self.start_per_bundle[bundle_uuid] is not None:
633 sec = ts - self.start_per_bundle[bundle_uuid]
634 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
636 ret += f' {details} setting up / copying data...'
639 if qworker_p95 is not None:
640 if sec > qworker_p95:
641 ret += f'{bg("red")}>💻p95{reset()} '
642 if details is not None:
643 details.slower_than_local_p95 = True
645 if details is not None:
646 details.slower_than_local_p95 = False
648 if qall_p95 is not None:
650 ret += f'{bg("red")}>∀p95{reset()} '
651 if details is not None:
652 details.slower_than_global_p95 = True
654 details.slower_than_global_p95 = False
658 def periodic_dump(self, total_bundles_submitted: int) -> None:
659 assert self.lock.locked()
660 self.total_bundles_submitted = total_bundles_submitted
662 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
664 self.last_periodic_dump = ts
667 class RemoteWorkerSelectionPolicy(ABC):
668 """An interface definition of a policy for selecting a remote worker."""
671 self.workers: Optional[List[RemoteWorkerRecord]] = None
673 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
674 self.workers = workers
677 def is_worker_available(self) -> bool:
681 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
685 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
686 """A remote worker selector that uses weighted RNG."""
689 def is_worker_available(self) -> bool:
691 for worker in self.workers:
697 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
700 for worker in self.workers:
701 if worker.machine != machine_to_avoid:
703 for _ in range(worker.count * worker.weight):
704 grabbag.append(worker)
706 if len(grabbag) == 0:
708 'There are no available workers that avoid %s', machine_to_avoid
711 for worker in self.workers:
713 for _ in range(worker.count * worker.weight):
714 grabbag.append(worker)
716 if len(grabbag) == 0:
717 logger.warning('There are no available workers?!')
720 worker = random.sample(grabbag, 1)[0]
721 assert worker.count > 0
723 logger.debug('Selected worker %s', worker)
727 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
728 """A remote worker selector that just round robins."""
730 def __init__(self) -> None:
735 def is_worker_available(self) -> bool:
737 for worker in self.workers:
744 self, machine_to_avoid: str = None
745 ) -> Optional[RemoteWorkerRecord]:
749 worker = self.workers[x]
753 if x >= len(self.workers):
756 logger.debug('Selected worker %s', worker)
759 if x >= len(self.workers):
762 logger.warning('Unexpectedly could not find a worker, retrying...')
767 class RemoteExecutor(BaseExecutor):
768 """An executor that uses processes on remote machines to do work.
769 To do so, it requires that a pool of remote workers to be properly
770 configured. See instructions in
771 :class:`pyutils.parallelize.parallelize`.
773 Each machine in a worker pool has a *weight* and a *count*. A
774 *weight* captures the relative speed of a processor on that worker
775 and a *count* captures the number of synchronous tasks the worker
776 can accept (i.e. the number of cpus on the machine).
778 To dispatch work to a remote machine, this class pickles the code
779 to be executed remotely using `cloudpickle`. For that to work,
780 the remote machine should be running the same version of Python as
781 this machine, ideally in a virtual environment with the same
782 import libraries installed. Differences in operating system
783 and/or processor architecture don't seem to matter for most code,
788 Mismatches in Python version or in the version numbers of
789 third-party libraries between machines can cause problems
790 when trying to unpickle and run code remotely.
792 Work to be dispatched is represented in this code by creating a
793 "bundle". Each bundle is assigned to a remote worker based on
794 heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
795 general, it attempts to load all workers in the pool and maximize
796 throughput. Once assigned to a remote worker, pickled code is
797 copied to that worker via `scp` and a remote command is issued via
798 `ssh` to execute a :file:`remote_worker.py` process on the remote
799 machine. This process unpickles the code, runs it, and produces a
800 result which is then copied back to the local machine (again via
801 `scp`) where it can be processed by local code.
803 You can and probably must override the path of
804 :file:`remote_worker.py` on your pool machines using the
805 `--remote_worker_helper_path` commandline argument (or by just
806 changing the default in code, see above in this file's code).
808 During remote work execution, this local machine acts as a
809 controller dispatching all work to the network, copying pickled
810 tasks out, and copying results back in. It may also be a worker
811 in the pool but do not underestimate the cost of being a
812 controller -- it takes some cpu and a lot of network bandwidth.
813 The work dispatcher logic attempts to detect when a controller is
814 also a worker and reduce its load.
816 Some redundancy and safety provisions are made when scheduling
817 tasks to the worker pool; e.g. slower than expected tasks have
818 redundant backups tasks created, especially if there are otherwise
819 idle workers. If a task fails repeatedly, the dispatcher consider
820 it poisoned and give up on it.
824 This executor probably only makes sense to use with
825 computationally expensive tasks such as jobs that will execute
826 for ~30 seconds or longer.
828 The network overhead and latency of copying work from the
829 controller (local) machine to the remote workers and copying
830 results back again is relatively high. Especially at startup,
831 the network can become a bottleneck. Future versions of this
832 code may attempt to split the responsibility of being a
833 controller (distributing work to pool machines).
835 Instructions for how to set this up are provided in
836 :class:`pyutils.parallelize.parallelize`.
838 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
844 workers: List[RemoteWorkerRecord],
845 policy: RemoteWorkerSelectionPolicy,
849 workers: A list of remote workers we can call on to do tasks.
850 policy: A policy for selecting remote workers for tasks.
854 self.workers = workers
856 self.worker_count = 0
857 for worker in self.workers:
858 self.worker_count += worker.count
859 if self.worker_count <= 0:
860 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
862 raise RemoteExecutorException(msg)
863 self.policy.register_worker_pool(self.workers)
864 self.cv = threading.Condition()
866 'Creating %d local threads, one per remote worker.', self.worker_count
868 self._helper_executor = fut.ThreadPoolExecutor(
869 thread_name_prefix="remote_executor_helper",
870 max_workers=self.worker_count,
872 self.status = RemoteExecutorStatus(self.worker_count)
873 self.total_bundles_submitted = 0
874 self.backup_lock = threading.Lock()
875 self.last_backup = None
877 self.heartbeat_thread,
878 self.heartbeat_stop_event,
879 ) = self._run_periodic_heartbeat()
880 self.already_shutdown = False
883 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
885 We create a background thread to invoke :meth:`_heartbeat` regularly
886 while we are scheduling work. It does some accounting such as
887 looking for slow bundles to tag for backup creation, checking for
888 unexpected failures, and printing a fancy message on stdout.
890 while not stop_event.is_set():
892 logger.debug('Running periodic heartbeat code...')
894 logger.debug('Periodic heartbeat thread shutting down.')
896 def _heartbeat(self) -> None:
897 # Note: this is invoked on a background thread, not an
898 # executor thread. Be careful what you do with it b/c it
899 # needs to get back and dump status again periodically.
900 with self.status.lock:
901 self.status.periodic_dump(self.total_bundles_submitted)
903 # Look for bundles to reschedule via executor.submit
904 if config.config['executors_schedule_remote_backups']:
905 self._maybe_schedule_backup_bundles()
907 def _maybe_schedule_backup_bundles(self):
908 """Maybe schedule backup bundles if we see a very slow bundle."""
910 assert self.status.lock.locked()
911 num_done = len(self.status.finished_bundle_timings)
912 num_idle_workers = self.worker_count - self.task_count
916 and num_idle_workers > 0
917 and (self.last_backup is None or (now - self.last_backup > 9.0))
918 and self.backup_lock.acquire(blocking=False)
921 assert self.backup_lock.locked()
923 bundle_to_backup = None
928 ) in self.status.in_flight_bundles_by_worker.items():
930 # Prefer to schedule backups of bundles running on
933 for record in self.workers:
934 if worker.machine == record.machine:
935 temp_score = float(record.weight)
936 temp_score = 1.0 / temp_score
938 base_score = int(temp_score)
941 for uuid in bundle_uuids:
942 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
945 and bundle.src_bundle is None
946 and bundle.backup_bundles is not None
950 # Schedule backups of bundles running
951 # longer; especially those that are
953 start_ts = self.status.start_per_bundle[uuid]
954 if start_ts is not None:
955 runtime = now - start_ts
958 'score[%s] => %.1f # latency boost', bundle, score
961 if bundle.slower_than_local_p95:
964 'score[%s] => %.1f # >worker p95',
969 if bundle.slower_than_global_p95:
972 'score[%s] => %.1f # >global p95',
977 # Prefer backups of bundles that don't
978 # have backups already.
979 backup_count = len(bundle.backup_bundles)
980 if backup_count == 0:
982 elif backup_count == 1:
984 elif backup_count == 2:
989 'score[%s] => %.1f # {backup_count} dup backup factor',
995 best_score is None or score > best_score
997 bundle_to_backup = bundle
998 assert bundle is not None
999 assert bundle.backup_bundles is not None
1000 assert bundle.src_bundle is None
1003 # Note: this is all still happening on the heartbeat
1004 # runner thread. That's ok because
1005 # _schedule_backup_for_bundle uses the executor to
1006 # submit the bundle again which will cause it to be
1007 # picked up by a worker thread and allow this thread
1008 # to return to run future heartbeats.
1009 if bundle_to_backup is not None:
1010 self.last_backup = now
1012 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
1016 self._schedule_backup_for_bundle(bundle_to_backup)
1018 self.backup_lock.release()
1020 def _is_worker_available(self) -> bool:
1021 """Is there a worker available currently?"""
1022 return self.policy.is_worker_available()
1024 def _acquire_worker(
1025 self, machine_to_avoid: str = None
1026 ) -> Optional[RemoteWorkerRecord]:
1027 """Try to acquire a worker."""
1028 return self.policy.acquire_worker(machine_to_avoid)
1030 def _find_available_worker_or_block(
1031 self, machine_to_avoid: str = None
1032 ) -> RemoteWorkerRecord:
1033 """Find a worker or block until one becomes available."""
1035 while not self._is_worker_available():
1037 worker = self._acquire_worker(machine_to_avoid)
1038 if worker is not None:
1040 msg = "We should never reach this point in the code"
1041 logger.critical(msg)
1042 raise Exception(msg)
1044 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
1045 """Release a previously acquired worker."""
1046 worker = bundle.worker
1047 assert worker is not None
1048 logger.debug('Released worker %s', worker)
1049 self.status.record_release_worker(
1057 self.adjust_task_count(-1)
1059 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
1060 """See if a particular bundle is cancelled. Do not block."""
1061 with self.status.lock:
1062 if bundle.is_cancelled.wait(timeout=0.0):
1063 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
1064 bundle.was_cancelled = True
1068 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
1069 """Find a worker for bundle or block until one is available."""
1071 self.adjust_task_count(+1)
1073 controller = bundle.controller
1074 avoid_machine = override_avoid_machine
1075 is_original = bundle.src_bundle is None
1077 # Try not to schedule a backup on the same host as the original.
1078 if avoid_machine is None and bundle.src_bundle is not None:
1079 avoid_machine = bundle.src_bundle.machine
1081 while worker is None:
1082 worker = self._find_available_worker_or_block(avoid_machine)
1083 assert worker is not None
1085 # Ok, found a worker.
1086 bundle.worker = worker
1087 machine = bundle.machine = worker.machine
1088 username = bundle.username = worker.username
1089 self.status.record_acquire_worker(worker, uuid)
1090 logger.debug('%s: Running bundle on %s...', bundle, worker)
1092 # Before we do any work, make sure the bundle is still viable.
1093 # It may have been some time between when it was submitted and
1094 # now due to lack of worker availability and someone else may
1095 # have already finished it.
1096 if self._check_if_cancelled(bundle):
1098 return self._process_work_result(bundle)
1101 '%s: bundle says it\'s cancelled upfront but no results?!', bundle
1103 self._release_worker(bundle)
1105 # Weird. We are the original owner of this
1106 # bundle. For it to have been cancelled, a backup
1107 # must have already started and completed before
1108 # we even for started. Moreover, the backup says
1109 # it is done but we can't find the results it
1110 # should have copied over. Reschedule the whole
1113 '%s: We are the original owner thread and yet there are '
1114 'no results for this bundle. This is unexpected and bad. '
1115 'Attempting an emergency retry...',
1118 return self._emergency_retry_nasty_bundle(bundle)
1120 # We're a backup and our bundle is cancelled
1121 # before we even got started. Do nothing and let
1122 # the original bundle's thread worry about either
1123 # finding the results or complaining about it.
1126 # Send input code / data to worker machine if it's not local.
1127 if controller not in machine:
1130 f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
1132 start_ts = time.time()
1133 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
1135 xfer_latency = time.time() - start_ts
1137 "%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
1140 self._release_worker(bundle)
1142 # Weird. We tried to copy the code to the worker
1143 # and it failed... And we're the original bundle.
1146 "%s: Failed to send instructions to the worker machine?! "
1147 "This is not expected; we\'re the original bundle so this shouldn\'t "
1148 "be a race condition. Attempting an emergency retry...",
1151 return self._emergency_retry_nasty_bundle(bundle)
1153 # This is actually expected; we're a backup.
1154 # There's a race condition where someone else
1155 # already finished the work and removed the source
1156 # code_file before we could copy it. Ignore.
1158 '%s: Failed to send instructions to the worker machine... '
1159 'We\'re a backup and this may be caused by the original (or '
1160 'some other backup) already finishing this work. Ignoring.',
1165 # Kick off the work. Note that if this fails we let
1166 # _wait_for_process deal with it.
1167 self.status.record_processing_began(uuid)
1168 helper_path = config.config['remote_worker_helper_path']
1170 f'{SSH} {bundle.username}@{bundle.machine} '
1171 f'"{helper_path} --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1174 '%s: Executing %s in the background to kick off work...', bundle, cmd
1176 p = cmd_in_background(cmd, silent=True)
1179 '%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine
1181 return self._wait_for_process(p, bundle, 0)
1183 def _wait_for_process(
1184 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1186 """At this point we've copied the bundle's pickled code to the remote
1187 worker and started an ssh process that should be invoking the
1188 remote worker to have it execute the user's code. See how
1189 that's going and wait for it to complete or fail. Note that
1190 this code is recursive: there are codepaths where we decide to
1191 stop waiting for an ssh process (because another backup seems
1192 to have finished) but then fail to fetch or parse the results
1193 from that backup and thus call ourselves to continue waiting
1194 on an active ssh process. This is the purpose of the depth
1195 argument: to curtail potential infinite recursion by giving up
1199 p: the Popen record of the ssh job
1200 bundle: the bundle of work being executed remotely
1201 depth: how many retries we've made so far. Starts at zero.
1205 machine = bundle.machine
1206 assert p is not None
1207 pid = p.pid # pid of the ssh process
1210 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d",
1214 self._release_worker(bundle)
1215 return self._emergency_retry_nasty_bundle(bundle)
1217 # Spin until either the ssh job we scheduled finishes the
1218 # bundle or some backup worker signals that they finished it
1222 p.wait(timeout=0.25)
1223 except subprocess.TimeoutExpired:
1224 if self._check_if_cancelled(bundle):
1226 '%s: looks like another worker finished bundle...', bundle
1230 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1234 # If we get here we believe the bundle is done; either the ssh
1235 # subprocess finished (hopefully successfully) or we noticed
1236 # that some other worker seems to have completed the bundle
1237 # before us and we're bailing out.
1239 ret = self._process_work_result(bundle)
1240 if ret is not None and p is not None:
1244 # Something went wrong; e.g. we could not copy the results
1245 # back, cleanup after ourselves on the remote machine, or
1246 # unpickle the results we got from the remove machine. If we
1247 # still have an active ssh subprocess, keep waiting on it.
1248 # Otherwise, time for an emergency reschedule.
1250 logger.exception('%s: Something unexpected just happened...', bundle)
1253 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.",
1256 return self._wait_for_process(p, bundle, depth + 1)
1258 self._release_worker(bundle)
1259 return self._emergency_retry_nasty_bundle(bundle)
1261 def _process_work_result(self, bundle: BundleDetails) -> Any:
1262 """A bundle seems to be completed. Check on the results."""
1264 with self.status.lock:
1265 is_original = bundle.src_bundle is None
1266 was_cancelled = bundle.was_cancelled
1267 username = bundle.username
1268 machine = bundle.machine
1269 result_file = bundle.result_file
1270 code_file = bundle.code_file
1272 # Whether original or backup, if we finished first we must
1273 # fetch the results if the computation happened on a
1275 bundle.end_ts = time.time()
1276 if not was_cancelled:
1277 assert bundle.machine is not None
1278 if bundle.controller not in bundle.machine:
1279 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1281 "%s: Fetching results back from %s@%s via %s",
1288 # If either of these throw they are handled in
1289 # _wait_for_process.
1294 except Exception as e:
1301 # Cleanup remote /tmp files.
1303 f'{SSH} {username}@{machine}'
1304 f' "/bin/rm -f {code_file} {result_file}"'
1307 'Fetching results back took %.2fs', time.time() - bundle.end_ts
1309 dur = bundle.end_ts - bundle.start_ts
1310 self.histogram.add_item(dur)
1312 # Only the original worker should unpickle the file contents
1313 # though since it's the only one whose result matters. The
1314 # original is also the only job that may delete result_file
1315 # from disk. Note that the original may have been cancelled
1316 # if one of the backups finished first; it still must read the
1317 # result from disk. It still does that here with is_cancelled
1320 logger.debug("%s: Unpickling %s.", bundle, result_file)
1322 with open(result_file, 'rb') as rb:
1323 serialized = rb.read()
1324 result = cloudpickle.loads(serialized)
1325 except Exception as e:
1326 logger.exception('Failed to load %s... this is bad news.', result_file)
1327 self._release_worker(bundle)
1329 # Re-raise the exception; the code in _wait_for_process may
1330 # decide to _emergency_retry_nasty_bundle here.
1332 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1333 os.remove(result_file)
1334 os.remove(code_file)
1336 # Notify any backups that the original is done so they
1337 # should stop ASAP. Do this whether or not we
1338 # finished first since there could be more than one
1340 if bundle.backup_bundles is not None:
1341 for backup in bundle.backup_bundles:
1343 '%s: Notifying backup %s that it\'s cancelled',
1347 backup.is_cancelled.set()
1349 # This is a backup job and, by now, we have already fetched
1350 # the bundle results.
1352 # Backup results don't matter, they just need to leave the
1353 # result file in the right place for their originals to
1354 # read/unpickle later.
1357 # Tell the original to stop if we finished first.
1358 if not was_cancelled:
1359 orig_bundle = bundle.src_bundle
1360 assert orig_bundle is not None
1362 '%s: Notifying original %s we beat them to it.',
1366 orig_bundle.is_cancelled.set()
1367 self._release_worker(bundle, was_cancelled=was_cancelled)
1370 def _create_original_bundle(self, pickle, function_name: str):
1371 """Creates a bundle that is not a backup of any other bundle but
1372 rather represents a user task.
1375 uuid = string_utils.generate_uuid(omit_dashes=True)
1376 code_file = f'/tmp/{uuid}.code.bin'
1377 result_file = f'/tmp/{uuid}.result.bin'
1379 logger.debug('Writing pickled code to %s', code_file)
1380 with open(code_file, 'wb') as wb:
1383 bundle = BundleDetails(
1384 pickled_code=pickle,
1386 function_name=function_name,
1390 controller=platform.node(),
1391 code_file=code_file,
1392 result_file=result_file,
1394 start_ts=time.time(),
1396 slower_than_local_p95=False,
1397 slower_than_global_p95=False,
1399 is_cancelled=threading.Event(),
1400 was_cancelled=False,
1404 self.status.record_bundle_details(bundle)
1405 logger.debug('%s: Created an original bundle', bundle)
1408 def _create_backup_bundle(self, src_bundle: BundleDetails):
1409 """Creates a bundle that is a backup of another bundle that is
1410 running too slowly."""
1412 assert self.status.lock.locked()
1413 assert src_bundle.backup_bundles is not None
1414 n = len(src_bundle.backup_bundles)
1415 uuid = src_bundle.uuid + f'_backup#{n}'
1417 backup_bundle = BundleDetails(
1418 pickled_code=src_bundle.pickled_code,
1420 function_name=src_bundle.function_name,
1424 controller=src_bundle.controller,
1425 code_file=src_bundle.code_file,
1426 result_file=src_bundle.result_file,
1428 start_ts=time.time(),
1430 slower_than_local_p95=False,
1431 slower_than_global_p95=False,
1432 src_bundle=src_bundle,
1433 is_cancelled=threading.Event(),
1434 was_cancelled=False,
1435 backup_bundles=None, # backup backups not allowed
1438 src_bundle.backup_bundles.append(backup_bundle)
1439 self.status.record_bundle_details_already_locked(backup_bundle)
1440 logger.debug('%s: Created a backup bundle', backup_bundle)
1441 return backup_bundle
1443 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1444 """Schedule a backup of src_bundle."""
1446 assert self.status.lock.locked()
1447 assert src_bundle is not None
1448 backup_bundle = self._create_backup_bundle(src_bundle)
1450 '%s/%s: Scheduling backup for execution...',
1452 backup_bundle.function_name,
1454 self._helper_executor.submit(self._launch, backup_bundle)
1456 # Results from backups don't matter; if they finish first
1457 # they will move the result_file to this machine and let
1458 # the original pick them up and unpickle them (and return
1461 def _emergency_retry_nasty_bundle(
1462 self, bundle: BundleDetails
1463 ) -> Optional[fut.Future]:
1464 """Something unexpectedly failed with bundle. Either retry it
1465 from the beginning or throw in the towel and give up on it."""
1467 is_original = bundle.src_bundle is None
1468 bundle.worker = None
1469 avoid_last_machine = bundle.machine
1470 bundle.machine = None
1471 bundle.username = None
1472 bundle.failure_count += 1
1478 if bundle.failure_count > retry_limit:
1480 '%s: Tried this bundle too many times already (%dx); giving up.',
1485 raise RemoteExecutorException(
1486 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1489 '%s: At least it\'s only a backup; better luck with the others.',
1494 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1497 return self._launch(bundle, avoid_last_machine)
1500 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1501 """Submit work to be done. This is the user entry point of this
1503 if self.already_shutdown:
1504 raise Exception('Submitted work after shutdown.')
1505 pickle = _make_cloud_pickle(function, *args, **kwargs)
1506 bundle = self._create_original_bundle(pickle, function.__name__)
1507 self.total_bundles_submitted += 1
1508 return self._helper_executor.submit(self._launch, bundle)
1511 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1512 """Shutdown the executor."""
1513 if not self.already_shutdown:
1514 logging.debug('Shutting down RemoteExecutor %s', self.title)
1515 self.heartbeat_stop_event.set()
1516 self.heartbeat_thread.join()
1517 self._helper_executor.shutdown(wait)
1519 print(self.histogram.__repr__(label_formatter='%ds'))
1520 self.already_shutdown = True
1523 class RemoteWorkerPoolProvider:
1525 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1529 @persistent.persistent_autoloaded_singleton() # type: ignore
1530 class ConfigRemoteWorkerPoolProvider(
1531 RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
1533 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1534 self.remote_worker_pool: List[RemoteWorkerRecord] = []
1535 for record in json_remote_worker_pool['remote_worker_records']:
1536 self.remote_worker_pool.append(
1537 dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
1539 assert len(self.remote_worker_pool) > 0
1542 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1543 return self.remote_worker_pool
1546 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1547 return self.remote_worker_pool
1551 def get_filename() -> str:
1552 return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
1556 def should_we_load_data(filename: str) -> bool:
1561 def should_we_save_data(filename: str) -> bool:
1566 class DefaultExecutors(object):
1567 """A container for a default thread, process and remote executor.
1568 These are not created until needed and we take care to clean up
1569 before process exit automatically for the caller's convenience.
1570 Instead of creating your own executor, consider using the one
1571 from this pool. e.g.::
1573 @par.parallelize(method=par.Method.PROCESS)
1575 solutions: List[Work],
1582 def start_do_work(all_work: List[Work]):
1584 logger.debug('Sharding work into groups of 10.')
1585 for subset in list_utils.shard(all_work, 10):
1586 shards.append([x for x in subset])
1588 logger.debug('Kicking off helper pool.')
1590 for n, shard in enumerate(shards):
1593 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1596 smart_future.wait_all(results)
1598 # Note: if you forget to do this it will clean itself up
1599 # during program termination including tearing down any
1600 # active ssh connections.
1601 executors.DefaultExecutors().process_pool().shutdown()
1605 self.thread_executor: Optional[ThreadExecutor] = None
1606 self.process_executor: Optional[ProcessExecutor] = None
1607 self.remote_executor: Optional[RemoteExecutor] = None
1610 def _ping(host) -> bool:
1611 logger.debug('RUN> ping -c 1 %s', host)
1614 f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
1620 def thread_pool(self) -> ThreadExecutor:
1621 if self.thread_executor is None:
1622 self.thread_executor = ThreadExecutor()
1623 return self.thread_executor
1625 def process_pool(self) -> ProcessExecutor:
1626 if self.process_executor is None:
1627 self.process_executor = ProcessExecutor()
1628 return self.process_executor
1630 def remote_pool(self) -> RemoteExecutor:
1631 if self.remote_executor is None:
1632 logger.info('Looking for some helper machines...')
1633 provider = ConfigRemoteWorkerPoolProvider()
1634 all_machines = provider.get_remote_workers()
1637 # Make sure we can ping each machine.
1638 for record in all_machines:
1639 if self._ping(record.machine):
1640 logger.info('%s is alive / responding to pings', record.machine)
1643 # The controller machine has a lot to do; go easy on it.
1645 if record.machine == platform.node() and record.count > 1:
1646 logger.info('Reducing workload for %s.', record.machine)
1647 record.count = max(int(record.count / 2), 1)
1649 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1650 policy.register_worker_pool(pool)
1651 self.remote_executor = RemoteExecutor(pool, policy)
1652 return self.remote_executor
1654 def shutdown(self) -> None:
1655 if self.thread_executor is not None:
1656 self.thread_executor.shutdown(wait=True, quiet=True)
1657 self.thread_executor = None
1658 if self.process_executor is not None:
1659 self.process_executor.shutdown(wait=True, quiet=True)
1660 self.process_executor = None
1661 if self.remote_executor is not None:
1662 self.remote_executor.shutdown(wait=True, quiet=True)
1663 self.remote_executor = None