2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
14 from __future__ import annotations
15 import concurrent.futures as fut
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass
27 from typing import Any, Callable, Dict, List, Optional, Set
29 import cloudpickle # type: ignore
31 from overrides import overrides
35 import histogram as hist
37 from ansi import bg, fg, reset, underline
38 from decorator_utils import singleton
39 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
40 from thread_utils import background_thread
42 logger = logging.getLogger(__name__)
44 parser = config.add_commandline_args(
45 f"Executors ({__file__})", "Args related to processing executors."
48 '--executors_threadpool_size',
51 help='Number of threads in the default threadpool, leave unset for default',
55 '--executors_processpool_size',
58 help='Number of processes in the default processpool, leave unset for default',
62 '--executors_schedule_remote_backups',
64 action=argparse_utils.ActionNoYes,
65 help='Should we schedule duplicative backup work if a remote bundle is slow',
68 '--executors_max_bundle_failures',
72 help='Maximum number of failures before giving up on a bundle',
75 SSH = '/usr/bin/ssh -oForwardX11=no'
76 SCP = '/usr/bin/scp -C'
79 def _make_cloud_pickle(fun, *args, **kwargs):
80 """Internal helper to create cloud pickles."""
81 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
82 return cloudpickle.dumps((fun, args, kwargs))
85 class BaseExecutor(ABC):
86 """The base executor interface definition. The interface for
87 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
88 :class:`ThreadExecutor`.
91 def __init__(self, *, title=''):
93 self.histogram = hist.SimpleHistogram(
94 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
99 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
103 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
106 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
107 """Shutdown the executor and return True if the executor is idle
108 (i.e. there are no pending or active tasks). Return False
109 otherwise. Note: this should only be called by the launcher
113 if self.task_count == 0:
114 self.shutdown(wait=True, quiet=quiet)
118 def adjust_task_count(self, delta: int) -> None:
119 """Change the task count. Note: do not call this method from a
120 worker, it should only be called by the launcher process /
124 self.task_count += delta
125 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
127 def get_task_count(self) -> int:
128 """Change the task count. Note: do not call this method from a
129 worker, it should only be called by the launcher process /
133 return self.task_count
136 class ThreadExecutor(BaseExecutor):
137 """A threadpool executor. This executor uses python threads to
138 schedule tasks. Note that, at least as of python3.10, because of
139 the global lock in the interpreter itself, these do not
140 parallelize very well so this class is useful mostly for non-CPU
143 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
146 def __init__(self, max_workers: Optional[int] = None):
149 if max_workers is not None:
150 workers = max_workers
151 elif 'executors_threadpool_size' in config.config:
152 workers = config.config['executors_threadpool_size']
153 logger.debug('Creating threadpool executor with %d workers', workers)
154 self._thread_pool_executor = fut.ThreadPoolExecutor(
155 max_workers=workers, thread_name_prefix="thread_executor_helper"
157 self.already_shutdown = False
159 # This is run on a different thread; do not adjust task count here.
161 def run_local_bundle(fun, *args, **kwargs):
162 logger.debug("Running local bundle at %s", fun.__name__)
163 result = fun(*args, **kwargs)
167 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
168 if self.already_shutdown:
169 raise Exception('Submitted work after shutdown.')
170 self.adjust_task_count(+1)
172 newargs.append(function)
176 result = self._thread_pool_executor.submit(
177 ThreadExecutor.run_local_bundle, *newargs, **kwargs
179 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
180 result.add_done_callback(lambda _: self.adjust_task_count(-1))
184 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
185 if not self.already_shutdown:
186 logger.debug('Shutting down threadpool executor %s', self.title)
187 self._thread_pool_executor.shutdown(wait)
189 print(self.histogram.__repr__(label_formatter='%ds'))
190 self.already_shutdown = True
193 class ProcessExecutor(BaseExecutor):
194 """An executor which runs tasks in child processes.
196 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
199 def __init__(self, max_workers=None):
202 if max_workers is not None:
203 workers = max_workers
204 elif 'executors_processpool_size' in config.config:
205 workers = config.config['executors_processpool_size']
206 logger.debug('Creating processpool executor with %d workers.', workers)
207 self._process_executor = fut.ProcessPoolExecutor(
210 self.already_shutdown = False
212 # This is run in another process; do not adjust task count here.
214 def run_cloud_pickle(pickle):
215 fun, args, kwargs = cloudpickle.loads(pickle)
216 logger.debug("Running pickled bundle at %s", fun.__name__)
217 result = fun(*args, **kwargs)
221 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
222 if self.already_shutdown:
223 raise Exception('Submitted work after shutdown.')
225 self.adjust_task_count(+1)
226 pickle = _make_cloud_pickle(function, *args, **kwargs)
227 result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
228 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
229 result.add_done_callback(lambda _: self.adjust_task_count(-1))
233 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
234 if not self.already_shutdown:
235 logger.debug('Shutting down processpool executor %s', self.title)
236 self._process_executor.shutdown(wait)
238 print(self.histogram.__repr__(label_formatter='%ds'))
239 self.already_shutdown = True
241 def __getstate__(self):
242 state = self.__dict__.copy()
243 state['_process_executor'] = None
247 class RemoteExecutorException(Exception):
248 """Thrown when a bundle cannot be executed despite several retries."""
254 class RemoteWorkerRecord:
255 """A record of info about a remote worker."""
258 """Username we can ssh into on this machine to run work."""
261 """Machine address / name."""
264 """Relative probability for the weighted policy to select this
265 machine for scheduling work."""
268 """If this machine is selected, what is the maximum number of task
269 that it can handle?"""
272 return hash((self.username, self.machine))
275 return f'{self.username}@{self.machine}'
280 """All info necessary to define some unit of work that needs to be
281 done, where it is being run, its state, whether it is an original
282 bundle of a backup bundle, how many times it has failed, etc...
286 """The code to run, cloud pickled"""
289 """A unique identifier"""
292 """The name of the function we pickled"""
294 worker: Optional[RemoteWorkerRecord]
295 """The remote worker running this bundle or None if none (yet)"""
297 username: Optional[str]
298 """The remote username running this bundle or None if none (yet)"""
300 machine: Optional[str]
301 """The remote machine running this bundle or None if none (yet)"""
304 """The controller machine"""
307 """A unique filename to hold the work to be done"""
310 """Where the results should be placed / read from"""
313 """The process id of the local subprocess watching the ssh connection
314 to the remote machine"""
322 slower_than_local_p95: bool
323 """Currently slower then 95% of other bundles on remote host"""
325 slower_than_global_p95: bool
326 """Currently slower than 95% of other bundles globally"""
328 src_bundle: Optional[BundleDetails]
329 """If this is a backup bundle, this points to the original bundle
330 that it's backing up. None otherwise."""
332 is_cancelled: threading.Event
333 """An event that can be signaled to indicate this bundle is cancelled.
334 This is set when another copy (backup or original) of this work has
335 completed successfully elsewhere."""
338 """True if this bundle was cancelled, False if it finished normally"""
340 backup_bundles: Optional[List[BundleDetails]]
341 """If we've created backups of this bundle, this is the list of them"""
344 """How many times has this bundle failed already?"""
348 if uuid[-9:-2] == '_backup':
350 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
354 # We colorize the uuid based on some bits from it to make them
355 # stand out in the logging and help a reader correlate log messages
356 # related to the same bundle.
363 fg('marigold yellow'),
366 fg('cornflower blue'),
367 fg('turquoise blue'),
369 fg('lavender purple'),
372 c = colorz[int(uuid[-2:], 16) % len(colorz)]
373 function_name = self.function_name if self.function_name is not None else 'nofname'
374 machine = self.machine if self.machine is not None else 'nomachine'
375 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
378 class RemoteExecutorStatus:
379 """A status 'scoreboard' for a remote executor tracking various
380 metrics and able to render a periodic dump of global state.
383 def __init__(self, total_worker_count: int) -> None:
387 total_worker_count: number of workers in the pool
390 self.worker_count: int = total_worker_count
391 self.known_workers: Set[RemoteWorkerRecord] = set()
392 self.start_time: float = time.time()
393 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
394 self.end_per_bundle: Dict[str, float] = defaultdict(float)
395 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
396 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
397 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
398 self.finished_bundle_timings: List[float] = []
399 self.last_periodic_dump: Optional[float] = None
400 self.total_bundles_submitted: int = 0
402 # Protects reads and modification using self. Also used
403 # as a memory fence for modifications to bundle.
404 self.lock: threading.Lock = threading.Lock()
406 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
407 """Record that bundle with uuid is assigned to a particular worker.
410 worker: the record of the worker to which uuid is assigned
411 uuid: the uuid of a bundle that has been assigned to a worker
414 self.record_acquire_worker_already_locked(worker, uuid)
416 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
417 """Same as above but an entry point that doesn't acquire the lock
418 for codepaths where it's already held."""
419 assert self.lock.locked()
420 self.known_workers.add(worker)
421 self.start_per_bundle[uuid] = None
422 x = self.in_flight_bundles_by_worker.get(worker, set())
424 self.in_flight_bundles_by_worker[worker] = x
426 def record_bundle_details(self, details: BundleDetails) -> None:
427 """Register the details about a bundle of work."""
429 self.record_bundle_details_already_locked(details)
431 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
432 """Same as above but for codepaths that already hold the lock."""
433 assert self.lock.locked()
434 self.bundle_details_by_uuid[details.uuid] = details
436 def record_release_worker(
438 worker: RemoteWorkerRecord,
442 """Record that a bundle has released a worker."""
444 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
446 def record_release_worker_already_locked(
448 worker: RemoteWorkerRecord,
452 """Same as above but for codepaths that already hold the lock."""
453 assert self.lock.locked()
455 self.end_per_bundle[uuid] = ts
456 self.in_flight_bundles_by_worker[worker].remove(uuid)
457 if not was_cancelled:
458 start = self.start_per_bundle[uuid]
459 assert start is not None
460 bundle_latency = ts - start
461 x = self.finished_bundle_timings_per_worker.get(worker, [])
462 x.append(bundle_latency)
463 self.finished_bundle_timings_per_worker[worker] = x
464 self.finished_bundle_timings.append(bundle_latency)
466 def record_processing_began(self, uuid: str):
467 """Record when work on a bundle begins."""
469 self.start_per_bundle[uuid] = time.time()
471 def total_in_flight(self) -> int:
472 """How many bundles are in flight currently?"""
473 assert self.lock.locked()
475 for worker in self.known_workers:
476 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
477 return total_in_flight
479 def total_idle(self) -> int:
480 """How many idle workers are there currently?"""
481 assert self.lock.locked()
482 return self.worker_count - self.total_in_flight()
485 assert self.lock.locked()
487 total_finished = len(self.finished_bundle_timings)
488 total_in_flight = self.total_in_flight()
489 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
491 if len(self.finished_bundle_timings) > 1:
492 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
494 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
495 f'✅={total_finished}/{self.total_bundles_submitted}, '
496 f'💻n={total_in_flight}/{self.worker_count}\n'
500 f'⏱={ts-self.start_time:.1f}s, '
501 f'✅={total_finished}/{self.total_bundles_submitted}, '
502 f'💻n={total_in_flight}/{self.worker_count}\n'
505 for worker in self.known_workers:
506 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
507 timings = self.finished_bundle_timings_per_worker.get(worker, [])
511 qworker = numpy.quantile(timings, [0.5, 0.95])
512 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
516 ret += f' ...finished {count} total bundle(s) so far\n'
517 in_flight = len(self.in_flight_bundles_by_worker[worker])
519 ret += f' ...{in_flight} bundles currently in flight:\n'
520 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
521 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
522 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
523 if self.start_per_bundle[bundle_uuid] is not None:
524 sec = ts - self.start_per_bundle[bundle_uuid]
525 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
527 ret += f' {details} setting up / copying data...'
530 if qworker is not None:
532 ret += f'{bg("red")}>💻p95{reset()} '
533 if details is not None:
534 details.slower_than_local_p95 = True
536 if details is not None:
537 details.slower_than_local_p95 = False
541 ret += f'{bg("red")}>∀p95{reset()} '
542 if details is not None:
543 details.slower_than_global_p95 = True
545 details.slower_than_global_p95 = False
549 def periodic_dump(self, total_bundles_submitted: int) -> None:
550 assert self.lock.locked()
551 self.total_bundles_submitted = total_bundles_submitted
553 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
555 self.last_periodic_dump = ts
558 class RemoteWorkerSelectionPolicy(ABC):
559 """A policy for selecting a remote worker base class."""
562 self.workers: Optional[List[RemoteWorkerRecord]] = None
564 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
565 self.workers = workers
568 def is_worker_available(self) -> bool:
572 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
576 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
577 """A remote worker selector that uses weighted RNG."""
580 def is_worker_available(self) -> bool:
582 for worker in self.workers:
588 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
591 for worker in self.workers:
592 if worker.machine != machine_to_avoid:
594 for _ in range(worker.count * worker.weight):
595 grabbag.append(worker)
597 if len(grabbag) == 0:
598 logger.debug('There are no available workers that avoid %s', machine_to_avoid)
600 for worker in self.workers:
602 for _ in range(worker.count * worker.weight):
603 grabbag.append(worker)
605 if len(grabbag) == 0:
606 logger.warning('There are no available workers?!')
609 worker = random.sample(grabbag, 1)[0]
610 assert worker.count > 0
612 logger.debug('Selected worker %s', worker)
616 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
617 """A remote worker selector that just round robins."""
619 def __init__(self) -> None:
624 def is_worker_available(self) -> bool:
626 for worker in self.workers:
632 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
636 worker = self.workers[x]
640 if x >= len(self.workers):
643 logger.debug('Selected worker %s', worker)
646 if x >= len(self.workers):
649 logger.warning('Unexpectedly could not find a worker, retrying...')
654 class RemoteExecutor(BaseExecutor):
655 """An executor that uses processes on remote machines to do work. This
656 works by creating "bundles" of work with pickled code in each to be
657 executed. Each bundle is assigned a remote worker based on some policy
658 heuristics. Once assigned to a remote worker, a local subprocess is
659 created. It copies the pickled code to the remote machine via ssh/scp
660 and then starts up work on the remote machine again using ssh. When
661 the work is complete it copies the results back to the local machine.
663 So there is essentially one "controller" machine (which may also be
664 in the remote executor pool and therefore do task work in addition to
665 controlling) and N worker machines. This code runs on the controller
666 whereas on the worker machines we invoke pickled user code via a
667 shim in :file:`remote_worker.py`.
669 Some redundancy and safety provisions are made; e.g. slower than
670 expected tasks have redundant backups created and if a task fails
671 repeatedly we consider it poisoned and give up on it.
675 The network overhead / latency of copying work from the
676 controller machine to the remote workers is relatively high.
677 This executor probably only makes sense to use with
678 computationally expensive tasks such as jobs that will execute
679 for ~30 seconds or longer.
681 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
686 workers: List[RemoteWorkerRecord],
687 policy: RemoteWorkerSelectionPolicy,
692 workers: A list of remote workers we can call on to do tasks.
693 policy: A policy for selecting remote workers for tasks.
697 self.workers = workers
699 self.worker_count = 0
700 for worker in self.workers:
701 self.worker_count += worker.count
702 if self.worker_count <= 0:
703 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
705 raise RemoteExecutorException(msg)
706 self.policy.register_worker_pool(self.workers)
707 self.cv = threading.Condition()
708 logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
709 self._helper_executor = fut.ThreadPoolExecutor(
710 thread_name_prefix="remote_executor_helper",
711 max_workers=self.worker_count,
713 self.status = RemoteExecutorStatus(self.worker_count)
714 self.total_bundles_submitted = 0
715 self.backup_lock = threading.Lock()
716 self.last_backup = None
718 self.heartbeat_thread,
719 self.heartbeat_stop_event,
720 ) = self._run_periodic_heartbeat()
721 self.already_shutdown = False
724 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
726 We create a background thread to invoke :meth:`_heartbeat` regularly
727 while we are scheduling work. It does some accounting such as
728 looking for slow bundles to tag for backup creation, checking for
729 unexpected failures, and printing a fancy message on stdout.
731 while not stop_event.is_set():
733 logger.debug('Running periodic heartbeat code...')
735 logger.debug('Periodic heartbeat thread shutting down.')
737 def _heartbeat(self) -> None:
738 # Note: this is invoked on a background thread, not an
739 # executor thread. Be careful what you do with it b/c it
740 # needs to get back and dump status again periodically.
741 with self.status.lock:
742 self.status.periodic_dump(self.total_bundles_submitted)
744 # Look for bundles to reschedule via executor.submit
745 if config.config['executors_schedule_remote_backups']:
746 self._maybe_schedule_backup_bundles()
748 def _maybe_schedule_backup_bundles(self):
749 """Maybe schedule backup bundles if we see a very slow bundle."""
751 assert self.status.lock.locked()
752 num_done = len(self.status.finished_bundle_timings)
753 num_idle_workers = self.worker_count - self.task_count
757 and num_idle_workers > 0
758 and (self.last_backup is None or (now - self.last_backup > 9.0))
759 and self.backup_lock.acquire(blocking=False)
762 assert self.backup_lock.locked()
764 bundle_to_backup = None
769 ) in self.status.in_flight_bundles_by_worker.items():
771 # Prefer to schedule backups of bundles running on
774 for record in self.workers:
775 if worker.machine == record.machine:
776 base_score = float(record.weight)
777 base_score = 1.0 / base_score
779 base_score = int(base_score)
782 for uuid in bundle_uuids:
783 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
786 and bundle.src_bundle is None
787 and bundle.backup_bundles is not None
791 # Schedule backups of bundles running
792 # longer; especially those that are
794 start_ts = self.status.start_per_bundle[uuid]
795 if start_ts is not None:
796 runtime = now - start_ts
798 logger.debug('score[%s] => %.1f # latency boost', bundle, score)
800 if bundle.slower_than_local_p95:
802 logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
804 if bundle.slower_than_global_p95:
806 logger.debug('score[%s] => %.1f # >global p95', bundle, score)
808 # Prefer backups of bundles that don't
809 # have backups already.
810 backup_count = len(bundle.backup_bundles)
811 if backup_count == 0:
813 elif backup_count == 1:
815 elif backup_count == 2:
820 'score[%s] => %.1f # {backup_count} dup backup factor',
825 if score != 0 and (best_score is None or score > best_score):
826 bundle_to_backup = bundle
827 assert bundle is not None
828 assert bundle.backup_bundles is not None
829 assert bundle.src_bundle is None
832 # Note: this is all still happening on the heartbeat
833 # runner thread. That's ok because
834 # _schedule_backup_for_bundle uses the executor to
835 # submit the bundle again which will cause it to be
836 # picked up by a worker thread and allow this thread
837 # to return to run future heartbeats.
838 if bundle_to_backup is not None:
839 self.last_backup = now
841 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
845 self._schedule_backup_for_bundle(bundle_to_backup)
847 self.backup_lock.release()
849 def _is_worker_available(self) -> bool:
850 """Is there a worker available currently?"""
851 return self.policy.is_worker_available()
853 def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
854 """Try to acquire a worker."""
855 return self.policy.acquire_worker(machine_to_avoid)
857 def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
858 """Find a worker or block until one becomes available."""
860 while not self._is_worker_available():
862 worker = self._acquire_worker(machine_to_avoid)
863 if worker is not None:
865 msg = "We should never reach this point in the code"
869 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
870 """Release a previously acquired worker."""
871 worker = bundle.worker
872 assert worker is not None
873 logger.debug('Released worker %s', worker)
874 self.status.record_release_worker(
882 self.adjust_task_count(-1)
884 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
885 """See if a particular bundle is cancelled. Do not block."""
886 with self.status.lock:
887 if bundle.is_cancelled.wait(timeout=0.0):
888 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
889 bundle.was_cancelled = True
893 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
894 """Find a worker for bundle or block until one is available."""
896 self.adjust_task_count(+1)
898 hostname = bundle.hostname
899 avoid_machine = override_avoid_machine
900 is_original = bundle.src_bundle is None
902 # Try not to schedule a backup on the same host as the original.
903 if avoid_machine is None and bundle.src_bundle is not None:
904 avoid_machine = bundle.src_bundle.machine
906 while worker is None:
907 worker = self._find_available_worker_or_block(avoid_machine)
908 assert worker is not None
910 # Ok, found a worker.
911 bundle.worker = worker
912 machine = bundle.machine = worker.machine
913 username = bundle.username = worker.username
914 self.status.record_acquire_worker(worker, uuid)
915 logger.debug('%s: Running bundle on %s...', bundle, worker)
917 # Before we do any work, make sure the bundle is still viable.
918 # It may have been some time between when it was submitted and
919 # now due to lack of worker availability and someone else may
920 # have already finished it.
921 if self._check_if_cancelled(bundle):
923 return self._process_work_result(bundle)
924 except Exception as e:
925 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
926 self._release_worker(bundle)
928 # Weird. We are the original owner of this
929 # bundle. For it to have been cancelled, a backup
930 # must have already started and completed before
931 # we even for started. Moreover, the backup says
932 # it is done but we can't find the results it
933 # should have copied over. Reschedule the whole
937 '%s: We are the original owner thread and yet there are '
938 'no results for this bundle. This is unexpected and bad.',
941 return self._emergency_retry_nasty_bundle(bundle)
943 # We're a backup and our bundle is cancelled
944 # before we even got started. Do nothing and let
945 # the original bundle's thread worry about either
946 # finding the results or complaining about it.
949 # Send input code / data to worker machine if it's not local.
950 if hostname not in machine:
952 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
953 start_ts = time.time()
954 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
956 xfer_latency = time.time() - start_ts
957 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
958 except Exception as e:
959 self._release_worker(bundle)
961 # Weird. We tried to copy the code to the worker
962 # and it failed... And we're the original bundle.
966 "%s: Failed to send instructions to the worker machine?! "
967 "This is not expected; we\'re the original bundle so this shouldn\'t "
968 "be a race condition. Attempting an emergency retry...",
971 return self._emergency_retry_nasty_bundle(bundle)
973 # This is actually expected; we're a backup.
974 # There's a race condition where someone else
975 # already finished the work and removed the source
976 # code_file before we could copy it. Ignore.
978 '%s: Failed to send instructions to the worker machine... '
979 'We\'re a backup and this may be caused by the original (or '
980 'some other backup) already finishing this work. Ignoring.',
985 # Kick off the work. Note that if this fails we let
986 # _wait_for_process deal with it.
987 self.status.record_processing_began(uuid)
989 f'{SSH} {bundle.username}@{bundle.machine} '
990 f'"source py38-venv/bin/activate &&'
991 f' /home/scott/lib/python_modules/remote_worker.py'
992 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
994 logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
995 p = cmd_in_background(cmd, silent=True)
997 logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
998 return self._wait_for_process(p, bundle, 0)
1000 def _wait_for_process(
1001 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1003 """At this point we've copied the bundle's pickled code to the remote
1004 worker and started an ssh process that should be invoking the
1005 remote worker to have it execute the user's code. See how
1006 that's going and wait for it to complete or fail. Note that
1007 this code is recursive: there are codepaths where we decide to
1008 stop waiting for an ssh process (because another backup seems
1009 to have finished) but then fail to fetch or parse the results
1010 from that backup and thus call ourselves to continue waiting
1011 on an active ssh process. This is the purpose of the depth
1012 argument: to curtail potential infinite recursion by giving up
1016 p: the Popen record of the ssh job
1017 bundle: the bundle of work being executed remotely
1018 depth: how many retries we've made so far. Starts at zero.
1022 machine = bundle.machine
1023 assert p is not None
1024 pid = p.pid # pid of the ssh process
1027 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1030 self._release_worker(bundle)
1031 return self._emergency_retry_nasty_bundle(bundle)
1033 # Spin until either the ssh job we scheduled finishes the
1034 # bundle or some backup worker signals that they finished it
1038 p.wait(timeout=0.25)
1039 except subprocess.TimeoutExpired:
1040 if self._check_if_cancelled(bundle):
1041 logger.info('%s: looks like another worker finished bundle...', bundle)
1044 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1048 # If we get here we believe the bundle is done; either the ssh
1049 # subprocess finished (hopefully successfully) or we noticed
1050 # that some other worker seems to have completed the bundle
1051 # before us and we're bailing out.
1053 ret = self._process_work_result(bundle)
1054 if ret is not None and p is not None:
1058 # Something went wrong; e.g. we could not copy the results
1059 # back, cleanup after ourselves on the remote machine, or
1060 # unpickle the results we got from the remove machine. If we
1061 # still have an active ssh subprocess, keep waiting on it.
1062 # Otherwise, time for an emergency reschedule.
1063 except Exception as e:
1065 logger.error('%s: Something unexpected just happened...', bundle)
1068 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1070 return self._wait_for_process(p, bundle, depth + 1)
1072 self._release_worker(bundle)
1073 return self._emergency_retry_nasty_bundle(bundle)
1075 def _process_work_result(self, bundle: BundleDetails) -> Any:
1076 """A bundle seems to be completed. Check on the results."""
1078 with self.status.lock:
1079 is_original = bundle.src_bundle is None
1080 was_cancelled = bundle.was_cancelled
1081 username = bundle.username
1082 machine = bundle.machine
1083 result_file = bundle.result_file
1084 code_file = bundle.code_file
1086 # Whether original or backup, if we finished first we must
1087 # fetch the results if the computation happened on a
1089 bundle.end_ts = time.time()
1090 if not was_cancelled:
1091 assert bundle.machine is not None
1092 if bundle.hostname not in bundle.machine:
1093 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1095 "%s: Fetching results back from %s@%s via %s",
1102 # If either of these throw they are handled in
1103 # _wait_for_process.
1108 except Exception as e:
1115 # Cleanup remote /tmp files.
1117 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1119 logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1120 dur = bundle.end_ts - bundle.start_ts
1121 self.histogram.add_item(dur)
1123 # Only the original worker should unpickle the file contents
1124 # though since it's the only one whose result matters. The
1125 # original is also the only job that may delete result_file
1126 # from disk. Note that the original may have been cancelled
1127 # if one of the backups finished first; it still must read the
1128 # result from disk. It still does that here with is_cancelled
1131 logger.debug("%s: Unpickling %s.", bundle, result_file)
1133 with open(result_file, 'rb') as rb:
1134 serialized = rb.read()
1135 result = cloudpickle.loads(serialized)
1136 except Exception as e:
1138 logger.error('Failed to load %s... this is bad news.', result_file)
1139 self._release_worker(bundle)
1141 # Re-raise the exception; the code in _wait_for_process may
1142 # decide to _emergency_retry_nasty_bundle here.
1144 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1145 os.remove(result_file)
1146 os.remove(code_file)
1148 # Notify any backups that the original is done so they
1149 # should stop ASAP. Do this whether or not we
1150 # finished first since there could be more than one
1152 if bundle.backup_bundles is not None:
1153 for backup in bundle.backup_bundles:
1155 '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1157 backup.is_cancelled.set()
1159 # This is a backup job and, by now, we have already fetched
1160 # the bundle results.
1162 # Backup results don't matter, they just need to leave the
1163 # result file in the right place for their originals to
1164 # read/unpickle later.
1167 # Tell the original to stop if we finished first.
1168 if not was_cancelled:
1169 orig_bundle = bundle.src_bundle
1170 assert orig_bundle is not None
1172 '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1174 orig_bundle.is_cancelled.set()
1175 self._release_worker(bundle, was_cancelled=was_cancelled)
1178 def _create_original_bundle(self, pickle, function_name: str):
1179 """Creates a bundle that is not a backup of any other bundle but
1180 rather represents a user task.
1183 uuid = string_utils.generate_uuid(omit_dashes=True)
1184 code_file = f'/tmp/{uuid}.code.bin'
1185 result_file = f'/tmp/{uuid}.result.bin'
1187 logger.debug('Writing pickled code to %s', code_file)
1188 with open(code_file, 'wb') as wb:
1191 bundle = BundleDetails(
1192 pickled_code=pickle,
1194 function_name=function_name,
1198 hostname=platform.node(),
1199 code_file=code_file,
1200 result_file=result_file,
1202 start_ts=time.time(),
1204 slower_than_local_p95=False,
1205 slower_than_global_p95=False,
1207 is_cancelled=threading.Event(),
1208 was_cancelled=False,
1212 self.status.record_bundle_details(bundle)
1213 logger.debug('%s: Created an original bundle', bundle)
1216 def _create_backup_bundle(self, src_bundle: BundleDetails):
1217 """Creates a bundle that is a backup of another bundle that is
1218 running too slowly."""
1220 assert self.status.lock.locked()
1221 assert src_bundle.backup_bundles is not None
1222 n = len(src_bundle.backup_bundles)
1223 uuid = src_bundle.uuid + f'_backup#{n}'
1225 backup_bundle = BundleDetails(
1226 pickled_code=src_bundle.pickled_code,
1228 function_name=src_bundle.function_name,
1232 hostname=src_bundle.hostname,
1233 code_file=src_bundle.code_file,
1234 result_file=src_bundle.result_file,
1236 start_ts=time.time(),
1238 slower_than_local_p95=False,
1239 slower_than_global_p95=False,
1240 src_bundle=src_bundle,
1241 is_cancelled=threading.Event(),
1242 was_cancelled=False,
1243 backup_bundles=None, # backup backups not allowed
1246 src_bundle.backup_bundles.append(backup_bundle)
1247 self.status.record_bundle_details_already_locked(backup_bundle)
1248 logger.debug('%s: Created a backup bundle', backup_bundle)
1249 return backup_bundle
1251 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1252 """Schedule a backup of src_bundle."""
1254 assert self.status.lock.locked()
1255 assert src_bundle is not None
1256 backup_bundle = self._create_backup_bundle(src_bundle)
1258 '%s/%s: Scheduling backup for execution...',
1260 backup_bundle.function_name,
1262 self._helper_executor.submit(self._launch, backup_bundle)
1264 # Results from backups don't matter; if they finish first
1265 # they will move the result_file to this machine and let
1266 # the original pick them up and unpickle them (and return
1269 def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1270 """Something unexpectedly failed with bundle. Either retry it
1271 from the beginning or throw in the towel and give up on it."""
1273 is_original = bundle.src_bundle is None
1274 bundle.worker = None
1275 avoid_last_machine = bundle.machine
1276 bundle.machine = None
1277 bundle.username = None
1278 bundle.failure_count += 1
1284 if bundle.failure_count > retry_limit:
1286 '%s: Tried this bundle too many times already (%dx); giving up.',
1291 raise RemoteExecutorException(
1292 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1296 '%s: At least it\'s only a backup; better luck with the others.', bundle
1300 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1303 return self._launch(bundle, avoid_last_machine)
1306 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1307 """Submit work to be done. This is the user entry point of this
1309 if self.already_shutdown:
1310 raise Exception('Submitted work after shutdown.')
1311 pickle = _make_cloud_pickle(function, *args, **kwargs)
1312 bundle = self._create_original_bundle(pickle, function.__name__)
1313 self.total_bundles_submitted += 1
1314 return self._helper_executor.submit(self._launch, bundle)
1317 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1318 """Shutdown the executor."""
1319 if not self.already_shutdown:
1320 logging.debug('Shutting down RemoteExecutor %s', self.title)
1321 self.heartbeat_stop_event.set()
1322 self.heartbeat_thread.join()
1323 self._helper_executor.shutdown(wait)
1325 print(self.histogram.__repr__(label_formatter='%ds'))
1326 self.already_shutdown = True
1330 class DefaultExecutors(object):
1331 """A container for a default thread, process and remote executor.
1332 These are not created until needed and we take care to clean up
1333 before process exit automatically for the caller's convenience.
1334 Instead of creating your own executor, consider using the one
1335 from this pool. e.g.::
1337 @par.parallelize(method=par.Method.PROCESS)
1339 solutions: List[Work],
1346 def start_do_work(all_work: List[Work]):
1348 logger.debug('Sharding work into groups of 10.')
1349 for subset in list_utils.shard(all_work, 10):
1350 shards.append([x for x in subset])
1352 logger.debug('Kicking off helper pool.')
1354 for n, shard in enumerate(shards):
1357 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1360 smart_future.wait_all(results)
1362 # Note: if you forget to do this it will clean itself up
1363 # during program termination including tearing down any
1364 # active ssh connections.
1365 executors.DefaultExecutors().process_pool().shutdown()
1369 self.thread_executor: Optional[ThreadExecutor] = None
1370 self.process_executor: Optional[ProcessExecutor] = None
1371 self.remote_executor: Optional[RemoteExecutor] = None
1374 def _ping(host) -> bool:
1375 logger.debug('RUN> ping -c 1 %s', host)
1377 x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1382 def thread_pool(self) -> ThreadExecutor:
1383 if self.thread_executor is None:
1384 self.thread_executor = ThreadExecutor()
1385 return self.thread_executor
1387 def process_pool(self) -> ProcessExecutor:
1388 if self.process_executor is None:
1389 self.process_executor = ProcessExecutor()
1390 return self.process_executor
1392 def remote_pool(self) -> RemoteExecutor:
1393 if self.remote_executor is None:
1394 logger.info('Looking for some helper machines...')
1395 pool: List[RemoteWorkerRecord] = []
1396 if self._ping('cheetah.house'):
1397 logger.info('Found cheetah.house')
1401 machine='cheetah.house',
1406 if self._ping('meerkat.cabin'):
1407 logger.info('Found meerkat.cabin')
1411 machine='meerkat.cabin',
1416 if self._ping('wannabe.house'):
1417 logger.info('Found wannabe.house')
1421 machine='wannabe.house',
1426 if self._ping('puma.cabin'):
1427 logger.info('Found puma.cabin')
1431 machine='puma.cabin',
1436 if self._ping('backup.house'):
1437 logger.info('Found backup.house')
1441 machine='backup.house',
1447 # The controller machine has a lot to do; go easy on it.
1449 if record.machine == platform.node() and record.count > 1:
1450 logger.info('Reducing workload for %s.', record.machine)
1451 record.count = max(int(record.count / 2), 1)
1453 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1454 policy.register_worker_pool(pool)
1455 self.remote_executor = RemoteExecutor(pool, policy)
1456 return self.remote_executor
1458 def shutdown(self) -> None:
1459 if self.thread_executor is not None:
1460 self.thread_executor.shutdown(wait=True, quiet=True)
1461 self.thread_executor = None
1462 if self.process_executor is not None:
1463 self.process_executor.shutdown(wait=True, quiet=True)
1464 self.process_executor = None
1465 if self.remote_executor is not None:
1466 self.remote_executor.shutdown(wait=True, quiet=True)
1467 self.remote_executor = None