2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
14 from __future__ import annotations
15 import concurrent.futures as fut
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
29 import cloudpickle # type: ignore
31 from overrides import overrides
35 import histogram as hist
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_exitcode, cmd_in_background, run_silently
40 from thread_utils import background_thread
42 logger = logging.getLogger(__name__)
44 parser = config.add_commandline_args(
45 f"Executors ({__file__})", "Args related to processing executors."
48 '--executors_threadpool_size',
51 help='Number of threads in the default threadpool, leave unset for default',
55 '--executors_processpool_size',
58 help='Number of processes in the default processpool, leave unset for default',
62 '--executors_schedule_remote_backups',
64 action=argparse_utils.ActionNoYes,
65 help='Should we schedule duplicative backup work if a remote bundle is slow',
68 '--executors_max_bundle_failures',
72 help='Maximum number of failures before giving up on a bundle',
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
79 def _make_cloud_pickle(fun, *args, **kwargs):
80 """Internal helper to create cloud pickles."""
81 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
82 return cloudpickle.dumps((fun, args, kwargs))
85 class BaseExecutor(ABC):
86 """The base executor interface definition. The interface for
87 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
88 :class:`ThreadExecutor`.
91 def __init__(self, *, title=''):
93 self.histogram = hist.SimpleHistogram(
94 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
99 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
103 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
106 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
107 """Shutdown the executor and return True if the executor is idle
108 (i.e. there are no pending or active tasks). Return False
109 otherwise. Note: this should only be called by the launcher
113 if self.task_count == 0:
114 self.shutdown(wait=True, quiet=quiet)
118 def adjust_task_count(self, delta: int) -> None:
119 """Change the task count. Note: do not call this method from a
120 worker, it should only be called by the launcher process /
124 self.task_count += delta
125 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
127 def get_task_count(self) -> int:
128 """Change the task count. Note: do not call this method from a
129 worker, it should only be called by the launcher process /
133 return self.task_count
136 class ThreadExecutor(BaseExecutor):
137 """A threadpool executor. This executor uses python threads to
138 schedule tasks. Note that, at least as of python3.10, because of
139 the global lock in the interpreter itself, these do not
140 parallelize very well so this class is useful mostly for non-CPU
143 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
146 def __init__(self, max_workers: Optional[int] = None):
149 if max_workers is not None:
150 workers = max_workers
151 elif 'executors_threadpool_size' in config.config:
152 workers = config.config['executors_threadpool_size']
153 if workers is not None:
154 logger.debug('Creating threadpool executor with %d workers', workers)
156 logger.debug('Creating a default sized threadpool executor')
157 self._thread_pool_executor = fut.ThreadPoolExecutor(
158 max_workers=workers, thread_name_prefix="thread_executor_helper"
160 self.already_shutdown = False
162 # This is run on a different thread; do not adjust task count here.
164 def run_local_bundle(fun, *args, **kwargs):
165 logger.debug("Running local bundle at %s", fun.__name__)
166 result = fun(*args, **kwargs)
170 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
171 if self.already_shutdown:
172 raise Exception('Submitted work after shutdown.')
173 self.adjust_task_count(+1)
175 newargs.append(function)
179 result = self._thread_pool_executor.submit(
180 ThreadExecutor.run_local_bundle, *newargs, **kwargs
182 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
183 result.add_done_callback(lambda _: self.adjust_task_count(-1))
187 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
188 if not self.already_shutdown:
189 logger.debug('Shutting down threadpool executor %s', self.title)
190 self._thread_pool_executor.shutdown(wait)
192 print(self.histogram.__repr__(label_formatter='%ds'))
193 self.already_shutdown = True
196 class ProcessExecutor(BaseExecutor):
197 """An executor which runs tasks in child processes.
199 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
202 def __init__(self, max_workers=None):
205 if max_workers is not None:
206 workers = max_workers
207 elif 'executors_processpool_size' in config.config:
208 workers = config.config['executors_processpool_size']
209 if workers is not None:
210 logger.debug('Creating processpool executor with %d workers.', workers)
212 logger.debug('Creating a default sized processpool executor')
213 self._process_executor = fut.ProcessPoolExecutor(
216 self.already_shutdown = False
218 # This is run in another process; do not adjust task count here.
220 def run_cloud_pickle(pickle):
221 fun, args, kwargs = cloudpickle.loads(pickle)
222 logger.debug("Running pickled bundle at %s", fun.__name__)
223 result = fun(*args, **kwargs)
227 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
228 if self.already_shutdown:
229 raise Exception('Submitted work after shutdown.')
231 self.adjust_task_count(+1)
232 pickle = _make_cloud_pickle(function, *args, **kwargs)
233 result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
234 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
235 result.add_done_callback(lambda _: self.adjust_task_count(-1))
239 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
240 if not self.already_shutdown:
241 logger.debug('Shutting down processpool executor %s', self.title)
242 self._process_executor.shutdown(wait)
244 print(self.histogram.__repr__(label_formatter='%ds'))
245 self.already_shutdown = True
247 def __getstate__(self):
248 state = self.__dict__.copy()
249 state['_process_executor'] = None
253 class RemoteExecutorException(Exception):
254 """Thrown when a bundle cannot be executed despite several retries."""
260 class RemoteWorkerRecord:
261 """A record of info about a remote worker."""
264 """Username we can ssh into on this machine to run work."""
267 """Machine address / name."""
270 """Relative probability for the weighted policy to select this
271 machine for scheduling work."""
274 """If this machine is selected, what is the maximum number of task
275 that it can handle?"""
278 return hash((self.username, self.machine))
281 return f'{self.username}@{self.machine}'
286 """All info necessary to define some unit of work that needs to be
287 done, where it is being run, its state, whether it is an original
288 bundle of a backup bundle, how many times it has failed, etc...
292 """The code to run, cloud pickled"""
295 """A unique identifier"""
298 """The name of the function we pickled"""
300 worker: Optional[RemoteWorkerRecord]
301 """The remote worker running this bundle or None if none (yet)"""
303 username: Optional[str]
304 """The remote username running this bundle or None if none (yet)"""
306 machine: Optional[str]
307 """The remote machine running this bundle or None if none (yet)"""
310 """The controller machine"""
313 """A unique filename to hold the work to be done"""
316 """Where the results should be placed / read from"""
319 """The process id of the local subprocess watching the ssh connection
320 to the remote machine"""
328 slower_than_local_p95: bool
329 """Currently slower then 95% of other bundles on remote host"""
331 slower_than_global_p95: bool
332 """Currently slower than 95% of other bundles globally"""
334 src_bundle: Optional[BundleDetails]
335 """If this is a backup bundle, this points to the original bundle
336 that it's backing up. None otherwise."""
338 is_cancelled: threading.Event
339 """An event that can be signaled to indicate this bundle is cancelled.
340 This is set when another copy (backup or original) of this work has
341 completed successfully elsewhere."""
344 """True if this bundle was cancelled, False if it finished normally"""
346 backup_bundles: Optional[List[BundleDetails]]
347 """If we've created backups of this bundle, this is the list of them"""
350 """How many times has this bundle failed already?"""
354 if uuid[-9:-2] == '_backup':
356 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
360 # We colorize the uuid based on some bits from it to make them
361 # stand out in the logging and help a reader correlate log messages
362 # related to the same bundle.
369 fg('marigold yellow'),
372 fg('cornflower blue'),
373 fg('turquoise blue'),
375 fg('lavender purple'),
378 c = colorz[int(uuid[-2:], 16) % len(colorz)]
379 function_name = self.function_name if self.function_name is not None else 'nofname'
380 machine = self.machine if self.machine is not None else 'nomachine'
381 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
384 class RemoteExecutorStatus:
385 """A status 'scoreboard' for a remote executor tracking various
386 metrics and able to render a periodic dump of global state.
389 def __init__(self, total_worker_count: int) -> None:
393 total_worker_count: number of workers in the pool
396 self.worker_count: int = total_worker_count
397 self.known_workers: Set[RemoteWorkerRecord] = set()
398 self.start_time: float = time.time()
399 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
400 self.end_per_bundle: Dict[str, float] = defaultdict(float)
401 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
402 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
403 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
404 self.finished_bundle_timings: List[float] = []
405 self.last_periodic_dump: Optional[float] = None
406 self.total_bundles_submitted: int = 0
408 # Protects reads and modification using self. Also used
409 # as a memory fence for modifications to bundle.
410 self.lock: threading.Lock = threading.Lock()
412 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
413 """Record that bundle with uuid is assigned to a particular worker.
416 worker: the record of the worker to which uuid is assigned
417 uuid: the uuid of a bundle that has been assigned to a worker
420 self.record_acquire_worker_already_locked(worker, uuid)
422 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
423 """Same as above but an entry point that doesn't acquire the lock
424 for codepaths where it's already held."""
425 assert self.lock.locked()
426 self.known_workers.add(worker)
427 self.start_per_bundle[uuid] = None
428 x = self.in_flight_bundles_by_worker.get(worker, set())
430 self.in_flight_bundles_by_worker[worker] = x
432 def record_bundle_details(self, details: BundleDetails) -> None:
433 """Register the details about a bundle of work."""
435 self.record_bundle_details_already_locked(details)
437 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
438 """Same as above but for codepaths that already hold the lock."""
439 assert self.lock.locked()
440 self.bundle_details_by_uuid[details.uuid] = details
442 def record_release_worker(
444 worker: RemoteWorkerRecord,
448 """Record that a bundle has released a worker."""
450 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
452 def record_release_worker_already_locked(
454 worker: RemoteWorkerRecord,
458 """Same as above but for codepaths that already hold the lock."""
459 assert self.lock.locked()
461 self.end_per_bundle[uuid] = ts
462 self.in_flight_bundles_by_worker[worker].remove(uuid)
463 if not was_cancelled:
464 start = self.start_per_bundle[uuid]
465 assert start is not None
466 bundle_latency = ts - start
467 x = self.finished_bundle_timings_per_worker.get(worker, [])
468 x.append(bundle_latency)
469 self.finished_bundle_timings_per_worker[worker] = x
470 self.finished_bundle_timings.append(bundle_latency)
472 def record_processing_began(self, uuid: str):
473 """Record when work on a bundle begins."""
475 self.start_per_bundle[uuid] = time.time()
477 def total_in_flight(self) -> int:
478 """How many bundles are in flight currently?"""
479 assert self.lock.locked()
481 for worker in self.known_workers:
482 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
483 return total_in_flight
485 def total_idle(self) -> int:
486 """How many idle workers are there currently?"""
487 assert self.lock.locked()
488 return self.worker_count - self.total_in_flight()
491 assert self.lock.locked()
493 total_finished = len(self.finished_bundle_timings)
494 total_in_flight = self.total_in_flight()
495 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
497 if len(self.finished_bundle_timings) > 1:
498 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
500 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
501 f'✅={total_finished}/{self.total_bundles_submitted}, '
502 f'💻n={total_in_flight}/{self.worker_count}\n'
506 f'⏱={ts-self.start_time:.1f}s, '
507 f'✅={total_finished}/{self.total_bundles_submitted}, '
508 f'💻n={total_in_flight}/{self.worker_count}\n'
511 for worker in self.known_workers:
512 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
513 timings = self.finished_bundle_timings_per_worker.get(worker, [])
517 qworker = numpy.quantile(timings, [0.5, 0.95])
518 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
522 ret += f' ...finished {count} total bundle(s) so far\n'
523 in_flight = len(self.in_flight_bundles_by_worker[worker])
525 ret += f' ...{in_flight} bundles currently in flight:\n'
526 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
527 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
528 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
529 if self.start_per_bundle[bundle_uuid] is not None:
530 sec = ts - self.start_per_bundle[bundle_uuid]
531 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
533 ret += f' {details} setting up / copying data...'
536 if qworker is not None:
538 ret += f'{bg("red")}>💻p95{reset()} '
539 if details is not None:
540 details.slower_than_local_p95 = True
542 if details is not None:
543 details.slower_than_local_p95 = False
547 ret += f'{bg("red")}>∀p95{reset()} '
548 if details is not None:
549 details.slower_than_global_p95 = True
551 details.slower_than_global_p95 = False
555 def periodic_dump(self, total_bundles_submitted: int) -> None:
556 assert self.lock.locked()
557 self.total_bundles_submitted = total_bundles_submitted
559 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
561 self.last_periodic_dump = ts
564 class RemoteWorkerSelectionPolicy(ABC):
565 """A policy for selecting a remote worker base class."""
568 self.workers: Optional[List[RemoteWorkerRecord]] = None
570 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
571 self.workers = workers
574 def is_worker_available(self) -> bool:
578 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
582 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
583 """A remote worker selector that uses weighted RNG."""
586 def is_worker_available(self) -> bool:
588 for worker in self.workers:
594 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
597 for worker in self.workers:
598 if worker.machine != machine_to_avoid:
600 for _ in range(worker.count * worker.weight):
601 grabbag.append(worker)
603 if len(grabbag) == 0:
604 logger.debug('There are no available workers that avoid %s', machine_to_avoid)
606 for worker in self.workers:
608 for _ in range(worker.count * worker.weight):
609 grabbag.append(worker)
611 if len(grabbag) == 0:
612 logger.warning('There are no available workers?!')
615 worker = random.sample(grabbag, 1)[0]
616 assert worker.count > 0
618 logger.debug('Selected worker %s', worker)
622 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
623 """A remote worker selector that just round robins."""
625 def __init__(self) -> None:
630 def is_worker_available(self) -> bool:
632 for worker in self.workers:
638 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
642 worker = self.workers[x]
646 if x >= len(self.workers):
649 logger.debug('Selected worker %s', worker)
652 if x >= len(self.workers):
655 logger.warning('Unexpectedly could not find a worker, retrying...')
660 class RemoteExecutor(BaseExecutor):
661 """An executor that uses processes on remote machines to do work. This
662 works by creating "bundles" of work with pickled code in each to be
663 executed. Each bundle is assigned a remote worker based on some policy
664 heuristics. Once assigned to a remote worker, a local subprocess is
665 created. It copies the pickled code to the remote machine via ssh/scp
666 and then starts up work on the remote machine again using ssh. When
667 the work is complete it copies the results back to the local machine.
669 So there is essentially one "controller" machine (which may also be
670 in the remote executor pool and therefore do task work in addition to
671 controlling) and N worker machines. This code runs on the controller
672 whereas on the worker machines we invoke pickled user code via a
673 shim in :file:`remote_worker.py`.
675 Some redundancy and safety provisions are made; e.g. slower than
676 expected tasks have redundant backups created and if a task fails
677 repeatedly we consider it poisoned and give up on it.
681 The network overhead / latency of copying work from the
682 controller machine to the remote workers is relatively high.
683 This executor probably only makes sense to use with
684 computationally expensive tasks such as jobs that will execute
685 for ~30 seconds or longer.
687 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
692 workers: List[RemoteWorkerRecord],
693 policy: RemoteWorkerSelectionPolicy,
698 workers: A list of remote workers we can call on to do tasks.
699 policy: A policy for selecting remote workers for tasks.
703 self.workers = workers
705 self.worker_count = 0
706 for worker in self.workers:
707 self.worker_count += worker.count
708 if self.worker_count <= 0:
709 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
711 raise RemoteExecutorException(msg)
712 self.policy.register_worker_pool(self.workers)
713 self.cv = threading.Condition()
714 logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
715 self._helper_executor = fut.ThreadPoolExecutor(
716 thread_name_prefix="remote_executor_helper",
717 max_workers=self.worker_count,
719 self.status = RemoteExecutorStatus(self.worker_count)
720 self.total_bundles_submitted = 0
721 self.backup_lock = threading.Lock()
722 self.last_backup = None
724 self.heartbeat_thread,
725 self.heartbeat_stop_event,
726 ) = self._run_periodic_heartbeat()
727 self.already_shutdown = False
730 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
732 We create a background thread to invoke :meth:`_heartbeat` regularly
733 while we are scheduling work. It does some accounting such as
734 looking for slow bundles to tag for backup creation, checking for
735 unexpected failures, and printing a fancy message on stdout.
737 while not stop_event.is_set():
739 logger.debug('Running periodic heartbeat code...')
741 logger.debug('Periodic heartbeat thread shutting down.')
743 def _heartbeat(self) -> None:
744 # Note: this is invoked on a background thread, not an
745 # executor thread. Be careful what you do with it b/c it
746 # needs to get back and dump status again periodically.
747 with self.status.lock:
748 self.status.periodic_dump(self.total_bundles_submitted)
750 # Look for bundles to reschedule via executor.submit
751 if config.config['executors_schedule_remote_backups']:
752 self._maybe_schedule_backup_bundles()
754 def _maybe_schedule_backup_bundles(self):
755 """Maybe schedule backup bundles if we see a very slow bundle."""
757 assert self.status.lock.locked()
758 num_done = len(self.status.finished_bundle_timings)
759 num_idle_workers = self.worker_count - self.task_count
763 and num_idle_workers > 0
764 and (self.last_backup is None or (now - self.last_backup > 9.0))
765 and self.backup_lock.acquire(blocking=False)
768 assert self.backup_lock.locked()
770 bundle_to_backup = None
775 ) in self.status.in_flight_bundles_by_worker.items():
777 # Prefer to schedule backups of bundles running on
780 for record in self.workers:
781 if worker.machine == record.machine:
782 base_score = float(record.weight)
783 base_score = 1.0 / base_score
785 base_score = int(base_score)
788 for uuid in bundle_uuids:
789 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
792 and bundle.src_bundle is None
793 and bundle.backup_bundles is not None
797 # Schedule backups of bundles running
798 # longer; especially those that are
800 start_ts = self.status.start_per_bundle[uuid]
801 if start_ts is not None:
802 runtime = now - start_ts
804 logger.debug('score[%s] => %.1f # latency boost', bundle, score)
806 if bundle.slower_than_local_p95:
808 logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
810 if bundle.slower_than_global_p95:
812 logger.debug('score[%s] => %.1f # >global p95', bundle, score)
814 # Prefer backups of bundles that don't
815 # have backups already.
816 backup_count = len(bundle.backup_bundles)
817 if backup_count == 0:
819 elif backup_count == 1:
821 elif backup_count == 2:
826 'score[%s] => %.1f # {backup_count} dup backup factor',
831 if score != 0 and (best_score is None or score > best_score):
832 bundle_to_backup = bundle
833 assert bundle is not None
834 assert bundle.backup_bundles is not None
835 assert bundle.src_bundle is None
838 # Note: this is all still happening on the heartbeat
839 # runner thread. That's ok because
840 # _schedule_backup_for_bundle uses the executor to
841 # submit the bundle again which will cause it to be
842 # picked up by a worker thread and allow this thread
843 # to return to run future heartbeats.
844 if bundle_to_backup is not None:
845 self.last_backup = now
847 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
851 self._schedule_backup_for_bundle(bundle_to_backup)
853 self.backup_lock.release()
855 def _is_worker_available(self) -> bool:
856 """Is there a worker available currently?"""
857 return self.policy.is_worker_available()
859 def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
860 """Try to acquire a worker."""
861 return self.policy.acquire_worker(machine_to_avoid)
863 def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
864 """Find a worker or block until one becomes available."""
866 while not self._is_worker_available():
868 worker = self._acquire_worker(machine_to_avoid)
869 if worker is not None:
871 msg = "We should never reach this point in the code"
875 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
876 """Release a previously acquired worker."""
877 worker = bundle.worker
878 assert worker is not None
879 logger.debug('Released worker %s', worker)
880 self.status.record_release_worker(
888 self.adjust_task_count(-1)
890 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
891 """See if a particular bundle is cancelled. Do not block."""
892 with self.status.lock:
893 if bundle.is_cancelled.wait(timeout=0.0):
894 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
895 bundle.was_cancelled = True
899 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
900 """Find a worker for bundle or block until one is available."""
902 self.adjust_task_count(+1)
904 hostname = bundle.hostname
905 avoid_machine = override_avoid_machine
906 is_original = bundle.src_bundle is None
908 # Try not to schedule a backup on the same host as the original.
909 if avoid_machine is None and bundle.src_bundle is not None:
910 avoid_machine = bundle.src_bundle.machine
912 while worker is None:
913 worker = self._find_available_worker_or_block(avoid_machine)
914 assert worker is not None
916 # Ok, found a worker.
917 bundle.worker = worker
918 machine = bundle.machine = worker.machine
919 username = bundle.username = worker.username
920 self.status.record_acquire_worker(worker, uuid)
921 logger.debug('%s: Running bundle on %s...', bundle, worker)
923 # Before we do any work, make sure the bundle is still viable.
924 # It may have been some time between when it was submitted and
925 # now due to lack of worker availability and someone else may
926 # have already finished it.
927 if self._check_if_cancelled(bundle):
929 return self._process_work_result(bundle)
930 except Exception as e:
931 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
932 self._release_worker(bundle)
934 # Weird. We are the original owner of this
935 # bundle. For it to have been cancelled, a backup
936 # must have already started and completed before
937 # we even for started. Moreover, the backup says
938 # it is done but we can't find the results it
939 # should have copied over. Reschedule the whole
943 '%s: We are the original owner thread and yet there are '
944 'no results for this bundle. This is unexpected and bad.',
947 return self._emergency_retry_nasty_bundle(bundle)
949 # We're a backup and our bundle is cancelled
950 # before we even got started. Do nothing and let
951 # the original bundle's thread worry about either
952 # finding the results or complaining about it.
955 # Send input code / data to worker machine if it's not local.
956 if hostname not in machine:
958 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
959 start_ts = time.time()
960 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
962 xfer_latency = time.time() - start_ts
963 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
964 except Exception as e:
965 self._release_worker(bundle)
967 # Weird. We tried to copy the code to the worker
968 # and it failed... And we're the original bundle.
972 "%s: Failed to send instructions to the worker machine?! "
973 "This is not expected; we\'re the original bundle so this shouldn\'t "
974 "be a race condition. Attempting an emergency retry...",
977 return self._emergency_retry_nasty_bundle(bundle)
979 # This is actually expected; we're a backup.
980 # There's a race condition where someone else
981 # already finished the work and removed the source
982 # code_file before we could copy it. Ignore.
984 '%s: Failed to send instructions to the worker machine... '
985 'We\'re a backup and this may be caused by the original (or '
986 'some other backup) already finishing this work. Ignoring.',
991 # Kick off the work. Note that if this fails we let
992 # _wait_for_process deal with it.
993 self.status.record_processing_began(uuid)
995 f'{SSH} {bundle.username}@{bundle.machine} '
996 f'"source py39-venv/bin/activate &&'
997 f' /home/scott/lib/python_modules/remote_worker.py'
998 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1000 logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
1001 p = cmd_in_background(cmd, silent=True)
1003 logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
1004 return self._wait_for_process(p, bundle, 0)
1006 def _wait_for_process(
1007 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1009 """At this point we've copied the bundle's pickled code to the remote
1010 worker and started an ssh process that should be invoking the
1011 remote worker to have it execute the user's code. See how
1012 that's going and wait for it to complete or fail. Note that
1013 this code is recursive: there are codepaths where we decide to
1014 stop waiting for an ssh process (because another backup seems
1015 to have finished) but then fail to fetch or parse the results
1016 from that backup and thus call ourselves to continue waiting
1017 on an active ssh process. This is the purpose of the depth
1018 argument: to curtail potential infinite recursion by giving up
1022 p: the Popen record of the ssh job
1023 bundle: the bundle of work being executed remotely
1024 depth: how many retries we've made so far. Starts at zero.
1028 machine = bundle.machine
1029 assert p is not None
1030 pid = p.pid # pid of the ssh process
1033 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1036 self._release_worker(bundle)
1037 return self._emergency_retry_nasty_bundle(bundle)
1039 # Spin until either the ssh job we scheduled finishes the
1040 # bundle or some backup worker signals that they finished it
1044 p.wait(timeout=0.25)
1045 except subprocess.TimeoutExpired:
1046 if self._check_if_cancelled(bundle):
1047 logger.info('%s: looks like another worker finished bundle...', bundle)
1050 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1054 # If we get here we believe the bundle is done; either the ssh
1055 # subprocess finished (hopefully successfully) or we noticed
1056 # that some other worker seems to have completed the bundle
1057 # before us and we're bailing out.
1059 ret = self._process_work_result(bundle)
1060 if ret is not None and p is not None:
1064 # Something went wrong; e.g. we could not copy the results
1065 # back, cleanup after ourselves on the remote machine, or
1066 # unpickle the results we got from the remove machine. If we
1067 # still have an active ssh subprocess, keep waiting on it.
1068 # Otherwise, time for an emergency reschedule.
1069 except Exception as e:
1071 logger.error('%s: Something unexpected just happened...', bundle)
1074 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1076 return self._wait_for_process(p, bundle, depth + 1)
1078 self._release_worker(bundle)
1079 return self._emergency_retry_nasty_bundle(bundle)
1081 def _process_work_result(self, bundle: BundleDetails) -> Any:
1082 """A bundle seems to be completed. Check on the results."""
1084 with self.status.lock:
1085 is_original = bundle.src_bundle is None
1086 was_cancelled = bundle.was_cancelled
1087 username = bundle.username
1088 machine = bundle.machine
1089 result_file = bundle.result_file
1090 code_file = bundle.code_file
1092 # Whether original or backup, if we finished first we must
1093 # fetch the results if the computation happened on a
1095 bundle.end_ts = time.time()
1096 if not was_cancelled:
1097 assert bundle.machine is not None
1098 if bundle.hostname not in bundle.machine:
1099 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1101 "%s: Fetching results back from %s@%s via %s",
1108 # If either of these throw they are handled in
1109 # _wait_for_process.
1114 except Exception as e:
1121 # Cleanup remote /tmp files.
1123 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1125 logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1126 dur = bundle.end_ts - bundle.start_ts
1127 self.histogram.add_item(dur)
1129 # Only the original worker should unpickle the file contents
1130 # though since it's the only one whose result matters. The
1131 # original is also the only job that may delete result_file
1132 # from disk. Note that the original may have been cancelled
1133 # if one of the backups finished first; it still must read the
1134 # result from disk. It still does that here with is_cancelled
1137 logger.debug("%s: Unpickling %s.", bundle, result_file)
1139 with open(result_file, 'rb') as rb:
1140 serialized = rb.read()
1141 result = cloudpickle.loads(serialized)
1142 except Exception as e:
1144 logger.error('Failed to load %s... this is bad news.', result_file)
1145 self._release_worker(bundle)
1147 # Re-raise the exception; the code in _wait_for_process may
1148 # decide to _emergency_retry_nasty_bundle here.
1150 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1151 os.remove(result_file)
1152 os.remove(code_file)
1154 # Notify any backups that the original is done so they
1155 # should stop ASAP. Do this whether or not we
1156 # finished first since there could be more than one
1158 if bundle.backup_bundles is not None:
1159 for backup in bundle.backup_bundles:
1161 '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1163 backup.is_cancelled.set()
1165 # This is a backup job and, by now, we have already fetched
1166 # the bundle results.
1168 # Backup results don't matter, they just need to leave the
1169 # result file in the right place for their originals to
1170 # read/unpickle later.
1173 # Tell the original to stop if we finished first.
1174 if not was_cancelled:
1175 orig_bundle = bundle.src_bundle
1176 assert orig_bundle is not None
1178 '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1180 orig_bundle.is_cancelled.set()
1181 self._release_worker(bundle, was_cancelled=was_cancelled)
1184 def _create_original_bundle(self, pickle, function_name: str):
1185 """Creates a bundle that is not a backup of any other bundle but
1186 rather represents a user task.
1189 uuid = string_utils.generate_uuid(omit_dashes=True)
1190 code_file = f'/tmp/{uuid}.code.bin'
1191 result_file = f'/tmp/{uuid}.result.bin'
1193 logger.debug('Writing pickled code to %s', code_file)
1194 with open(code_file, 'wb') as wb:
1197 bundle = BundleDetails(
1198 pickled_code=pickle,
1200 function_name=function_name,
1204 hostname=platform.node(),
1205 code_file=code_file,
1206 result_file=result_file,
1208 start_ts=time.time(),
1210 slower_than_local_p95=False,
1211 slower_than_global_p95=False,
1213 is_cancelled=threading.Event(),
1214 was_cancelled=False,
1218 self.status.record_bundle_details(bundle)
1219 logger.debug('%s: Created an original bundle', bundle)
1222 def _create_backup_bundle(self, src_bundle: BundleDetails):
1223 """Creates a bundle that is a backup of another bundle that is
1224 running too slowly."""
1226 assert self.status.lock.locked()
1227 assert src_bundle.backup_bundles is not None
1228 n = len(src_bundle.backup_bundles)
1229 uuid = src_bundle.uuid + f'_backup#{n}'
1231 backup_bundle = BundleDetails(
1232 pickled_code=src_bundle.pickled_code,
1234 function_name=src_bundle.function_name,
1238 hostname=src_bundle.hostname,
1239 code_file=src_bundle.code_file,
1240 result_file=src_bundle.result_file,
1242 start_ts=time.time(),
1244 slower_than_local_p95=False,
1245 slower_than_global_p95=False,
1246 src_bundle=src_bundle,
1247 is_cancelled=threading.Event(),
1248 was_cancelled=False,
1249 backup_bundles=None, # backup backups not allowed
1252 src_bundle.backup_bundles.append(backup_bundle)
1253 self.status.record_bundle_details_already_locked(backup_bundle)
1254 logger.debug('%s: Created a backup bundle', backup_bundle)
1255 return backup_bundle
1257 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1258 """Schedule a backup of src_bundle."""
1260 assert self.status.lock.locked()
1261 assert src_bundle is not None
1262 backup_bundle = self._create_backup_bundle(src_bundle)
1264 '%s/%s: Scheduling backup for execution...',
1266 backup_bundle.function_name,
1268 self._helper_executor.submit(self._launch, backup_bundle)
1270 # Results from backups don't matter; if they finish first
1271 # they will move the result_file to this machine and let
1272 # the original pick them up and unpickle them (and return
1275 def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1276 """Something unexpectedly failed with bundle. Either retry it
1277 from the beginning or throw in the towel and give up on it."""
1279 is_original = bundle.src_bundle is None
1280 bundle.worker = None
1281 avoid_last_machine = bundle.machine
1282 bundle.machine = None
1283 bundle.username = None
1284 bundle.failure_count += 1
1290 if bundle.failure_count > retry_limit:
1292 '%s: Tried this bundle too many times already (%dx); giving up.',
1297 raise RemoteExecutorException(
1298 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1302 '%s: At least it\'s only a backup; better luck with the others.', bundle
1306 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1309 return self._launch(bundle, avoid_last_machine)
1312 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1313 """Submit work to be done. This is the user entry point of this
1315 if self.already_shutdown:
1316 raise Exception('Submitted work after shutdown.')
1317 pickle = _make_cloud_pickle(function, *args, **kwargs)
1318 bundle = self._create_original_bundle(pickle, function.__name__)
1319 self.total_bundles_submitted += 1
1320 return self._helper_executor.submit(self._launch, bundle)
1323 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1324 """Shutdown the executor."""
1325 if not self.already_shutdown:
1326 logging.debug('Shutting down RemoteExecutor %s', self.title)
1327 self.heartbeat_stop_event.set()
1328 self.heartbeat_thread.join()
1329 self._helper_executor.shutdown(wait)
1331 print(self.histogram.__repr__(label_formatter='%ds'))
1332 self.already_shutdown = True
1336 class DefaultExecutors(object):
1337 """A container for a default thread, process and remote executor.
1338 These are not created until needed and we take care to clean up
1339 before process exit automatically for the caller's convenience.
1340 Instead of creating your own executor, consider using the one
1341 from this pool. e.g.::
1343 @par.parallelize(method=par.Method.PROCESS)
1345 solutions: List[Work],
1352 def start_do_work(all_work: List[Work]):
1354 logger.debug('Sharding work into groups of 10.')
1355 for subset in list_utils.shard(all_work, 10):
1356 shards.append([x for x in subset])
1358 logger.debug('Kicking off helper pool.')
1360 for n, shard in enumerate(shards):
1363 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1366 smart_future.wait_all(results)
1368 # Note: if you forget to do this it will clean itself up
1369 # during program termination including tearing down any
1370 # active ssh connections.
1371 executors.DefaultExecutors().process_pool().shutdown()
1375 self.thread_executor: Optional[ThreadExecutor] = None
1376 self.process_executor: Optional[ProcessExecutor] = None
1377 self.remote_executor: Optional[RemoteExecutor] = None
1380 def _ping(host) -> bool:
1381 logger.debug('RUN> ping -c 1 %s', host)
1383 x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1388 def thread_pool(self) -> ThreadExecutor:
1389 if self.thread_executor is None:
1390 self.thread_executor = ThreadExecutor()
1391 return self.thread_executor
1393 def process_pool(self) -> ProcessExecutor:
1394 if self.process_executor is None:
1395 self.process_executor = ProcessExecutor()
1396 return self.process_executor
1398 def remote_pool(self) -> RemoteExecutor:
1399 if self.remote_executor is None:
1400 logger.info('Looking for some helper machines...')
1401 pool: List[RemoteWorkerRecord] = []
1402 if self._ping('cheetah.house'):
1403 logger.info('Found cheetah.house')
1407 machine='cheetah.house',
1412 if self._ping('meerkat.cabin'):
1413 logger.info('Found meerkat.cabin')
1417 machine='meerkat.cabin',
1422 if self._ping('wannabe.house'):
1423 logger.info('Found wannabe.house')
1427 machine='wannabe.house',
1432 if self._ping('puma.cabin'):
1433 logger.info('Found puma.cabin')
1437 machine='puma.cabin',
1442 if self._ping('backup.house'):
1443 logger.info('Found backup.house')
1447 machine='backup.house',
1453 # The controller machine has a lot to do; go easy on it.
1455 if record.machine == platform.node() and record.count > 1:
1456 logger.info('Reducing workload for %s.', record.machine)
1457 record.count = max(int(record.count / 2), 1)
1459 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1460 policy.register_worker_pool(pool)
1461 self.remote_executor = RemoteExecutor(pool, policy)
1462 return self.remote_executor
1464 def shutdown(self) -> None:
1465 if self.thread_executor is not None:
1466 self.thread_executor.shutdown(wait=True, quiet=True)
1467 self.thread_executor = None
1468 if self.process_executor is not None:
1469 self.process_executor.shutdown(wait=True, quiet=True)
1470 self.process_executor = None
1471 if self.remote_executor is not None:
1472 self.remote_executor.shutdown(wait=True, quiet=True)
1473 self.remote_executor = None