2 # -*- coding: utf-8 -*-
4 # © Copyright 2021-2022, Scott Gasch
6 """Defines three executors: a thread executor for doing work using a
7 threadpool, a process executor for doing work in other processes on
8 the same machine and a remote executor for farming out work to other
11 Also defines DefaultExecutors which is a container for references to
12 global executors / worker pools with automatic shutdown semantics."""
14 from __future__ import annotations
15 import concurrent.futures as fut
24 from abc import ABC, abstractmethod
25 from collections import defaultdict
26 from dataclasses import dataclass, fields
27 from typing import Any, Callable, Dict, List, Optional, Set
29 import cloudpickle # type: ignore
31 from overrides import overrides
35 import histogram as hist
38 from ansi import bg, fg, reset, underline
39 from decorator_utils import singleton
40 from exec_utils import cmd_exitcode, cmd_in_background, run_silently
41 from thread_utils import background_thread
43 logger = logging.getLogger(__name__)
45 parser = config.add_commandline_args(
46 f"Executors ({__file__})", "Args related to processing executors."
49 '--executors_threadpool_size',
52 help='Number of threads in the default threadpool, leave unset for default',
56 '--executors_processpool_size',
59 help='Number of processes in the default processpool, leave unset for default',
63 '--executors_schedule_remote_backups',
65 action=argparse_utils.ActionNoYes,
66 help='Should we schedule duplicative backup work if a remote bundle is slow',
69 '--executors_max_bundle_failures',
73 help='Maximum number of failures before giving up on a bundle',
76 '--remote_worker_records_file',
79 help='Path of the remote worker records file (JSON)',
80 default=f'{os.environ.get("HOME", ".")}/.remote_worker_records',
84 SSH = '/usr/bin/ssh -oForwardX11=no'
85 SCP = '/usr/bin/scp -C'
88 def _make_cloud_pickle(fun, *args, **kwargs):
89 """Internal helper to create cloud pickles."""
90 logger.debug("Making cloudpickled bundle at %s", fun.__name__)
91 return cloudpickle.dumps((fun, args, kwargs))
94 class BaseExecutor(ABC):
95 """The base executor interface definition. The interface for
96 :class:`ProcessExecutor`, :class:`RemoteExecutor`, and
97 :class:`ThreadExecutor`.
100 def __init__(self, *, title=''):
102 self.histogram = hist.SimpleHistogram(
103 hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
108 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
112 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
115 def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
116 """Shutdown the executor and return True if the executor is idle
117 (i.e. there are no pending or active tasks). Return False
118 otherwise. Note: this should only be called by the launcher
122 if self.task_count == 0:
123 self.shutdown(wait=True, quiet=quiet)
127 def adjust_task_count(self, delta: int) -> None:
128 """Change the task count. Note: do not call this method from a
129 worker, it should only be called by the launcher process /
133 self.task_count += delta
134 logger.debug('Adjusted task count by %d to %d.', delta, self.task_count)
136 def get_task_count(self) -> int:
137 """Change the task count. Note: do not call this method from a
138 worker, it should only be called by the launcher process /
142 return self.task_count
145 class ThreadExecutor(BaseExecutor):
146 """A threadpool executor. This executor uses python threads to
147 schedule tasks. Note that, at least as of python3.10, because of
148 the global lock in the interpreter itself, these do not
149 parallelize very well so this class is useful mostly for non-CPU
152 See also :class:`ProcessExecutor` and :class:`RemoteExecutor`.
155 def __init__(self, max_workers: Optional[int] = None):
158 if max_workers is not None:
159 workers = max_workers
160 elif 'executors_threadpool_size' in config.config:
161 workers = config.config['executors_threadpool_size']
162 if workers is not None:
163 logger.debug('Creating threadpool executor with %d workers', workers)
165 logger.debug('Creating a default sized threadpool executor')
166 self._thread_pool_executor = fut.ThreadPoolExecutor(
167 max_workers=workers, thread_name_prefix="thread_executor_helper"
169 self.already_shutdown = False
171 # This is run on a different thread; do not adjust task count here.
173 def run_local_bundle(fun, *args, **kwargs):
174 logger.debug("Running local bundle at %s", fun.__name__)
175 result = fun(*args, **kwargs)
179 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
180 if self.already_shutdown:
181 raise Exception('Submitted work after shutdown.')
182 self.adjust_task_count(+1)
184 newargs.append(function)
188 result = self._thread_pool_executor.submit(
189 ThreadExecutor.run_local_bundle, *newargs, **kwargs
191 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
192 result.add_done_callback(lambda _: self.adjust_task_count(-1))
196 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
197 if not self.already_shutdown:
198 logger.debug('Shutting down threadpool executor %s', self.title)
199 self._thread_pool_executor.shutdown(wait)
201 print(self.histogram.__repr__(label_formatter='%ds'))
202 self.already_shutdown = True
205 class ProcessExecutor(BaseExecutor):
206 """An executor which runs tasks in child processes.
208 See also :class:`ThreadExecutor` and :class:`RemoteExecutor`.
211 def __init__(self, max_workers=None):
214 if max_workers is not None:
215 workers = max_workers
216 elif 'executors_processpool_size' in config.config:
217 workers = config.config['executors_processpool_size']
218 if workers is not None:
219 logger.debug('Creating processpool executor with %d workers.', workers)
221 logger.debug('Creating a default sized processpool executor')
222 self._process_executor = fut.ProcessPoolExecutor(
225 self.already_shutdown = False
227 # This is run in another process; do not adjust task count here.
229 def run_cloud_pickle(pickle):
230 fun, args, kwargs = cloudpickle.loads(pickle)
231 logger.debug("Running pickled bundle at %s", fun.__name__)
232 result = fun(*args, **kwargs)
236 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
237 if self.already_shutdown:
238 raise Exception('Submitted work after shutdown.')
240 self.adjust_task_count(+1)
241 pickle = _make_cloud_pickle(function, *args, **kwargs)
242 result = self._process_executor.submit(ProcessExecutor.run_cloud_pickle, pickle)
243 result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
244 result.add_done_callback(lambda _: self.adjust_task_count(-1))
248 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
249 if not self.already_shutdown:
250 logger.debug('Shutting down processpool executor %s', self.title)
251 self._process_executor.shutdown(wait)
253 print(self.histogram.__repr__(label_formatter='%ds'))
254 self.already_shutdown = True
256 def __getstate__(self):
257 state = self.__dict__.copy()
258 state['_process_executor'] = None
262 class RemoteExecutorException(Exception):
263 """Thrown when a bundle cannot be executed despite several retries."""
269 class RemoteWorkerRecord:
270 """A record of info about a remote worker."""
273 """Username we can ssh into on this machine to run work."""
276 """Machine address / name."""
279 """Relative probability for the weighted policy to select this
280 machine for scheduling work."""
283 """If this machine is selected, what is the maximum number of task
284 that it can handle?"""
287 return hash((self.username, self.machine))
290 return f'{self.username}@{self.machine}'
295 """All info necessary to define some unit of work that needs to be
296 done, where it is being run, its state, whether it is an original
297 bundle of a backup bundle, how many times it has failed, etc...
301 """The code to run, cloud pickled"""
304 """A unique identifier"""
307 """The name of the function we pickled"""
309 worker: Optional[RemoteWorkerRecord]
310 """The remote worker running this bundle or None if none (yet)"""
312 username: Optional[str]
313 """The remote username running this bundle or None if none (yet)"""
315 machine: Optional[str]
316 """The remote machine running this bundle or None if none (yet)"""
319 """The controller machine"""
322 """A unique filename to hold the work to be done"""
325 """Where the results should be placed / read from"""
328 """The process id of the local subprocess watching the ssh connection
329 to the remote machine"""
337 slower_than_local_p95: bool
338 """Currently slower then 95% of other bundles on remote host"""
340 slower_than_global_p95: bool
341 """Currently slower than 95% of other bundles globally"""
343 src_bundle: Optional[BundleDetails]
344 """If this is a backup bundle, this points to the original bundle
345 that it's backing up. None otherwise."""
347 is_cancelled: threading.Event
348 """An event that can be signaled to indicate this bundle is cancelled.
349 This is set when another copy (backup or original) of this work has
350 completed successfully elsewhere."""
353 """True if this bundle was cancelled, False if it finished normally"""
355 backup_bundles: Optional[List[BundleDetails]]
356 """If we've created backups of this bundle, this is the list of them"""
359 """How many times has this bundle failed already?"""
363 if uuid[-9:-2] == '_backup':
365 suffix = f'{uuid[-6:]}_b{self.uuid[-1:]}'
369 # We colorize the uuid based on some bits from it to make them
370 # stand out in the logging and help a reader correlate log messages
371 # related to the same bundle.
378 fg('marigold yellow'),
381 fg('cornflower blue'),
382 fg('turquoise blue'),
384 fg('lavender purple'),
387 c = colorz[int(uuid[-2:], 16) % len(colorz)]
388 function_name = self.function_name if self.function_name is not None else 'nofname'
389 machine = self.machine if self.machine is not None else 'nomachine'
390 return f'{c}{suffix}/{function_name}/{machine}{reset()}'
393 class RemoteExecutorStatus:
394 """A status 'scoreboard' for a remote executor tracking various
395 metrics and able to render a periodic dump of global state.
398 def __init__(self, total_worker_count: int) -> None:
402 total_worker_count: number of workers in the pool
405 self.worker_count: int = total_worker_count
406 self.known_workers: Set[RemoteWorkerRecord] = set()
407 self.start_time: float = time.time()
408 self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
409 self.end_per_bundle: Dict[str, float] = defaultdict(float)
410 self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
411 self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
412 self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
413 self.finished_bundle_timings: List[float] = []
414 self.last_periodic_dump: Optional[float] = None
415 self.total_bundles_submitted: int = 0
417 # Protects reads and modification using self. Also used
418 # as a memory fence for modifications to bundle.
419 self.lock: threading.Lock = threading.Lock()
421 def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
422 """Record that bundle with uuid is assigned to a particular worker.
425 worker: the record of the worker to which uuid is assigned
426 uuid: the uuid of a bundle that has been assigned to a worker
429 self.record_acquire_worker_already_locked(worker, uuid)
431 def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
432 """Same as above but an entry point that doesn't acquire the lock
433 for codepaths where it's already held."""
434 assert self.lock.locked()
435 self.known_workers.add(worker)
436 self.start_per_bundle[uuid] = None
437 x = self.in_flight_bundles_by_worker.get(worker, set())
439 self.in_flight_bundles_by_worker[worker] = x
441 def record_bundle_details(self, details: BundleDetails) -> None:
442 """Register the details about a bundle of work."""
444 self.record_bundle_details_already_locked(details)
446 def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
447 """Same as above but for codepaths that already hold the lock."""
448 assert self.lock.locked()
449 self.bundle_details_by_uuid[details.uuid] = details
451 def record_release_worker(
453 worker: RemoteWorkerRecord,
457 """Record that a bundle has released a worker."""
459 self.record_release_worker_already_locked(worker, uuid, was_cancelled)
461 def record_release_worker_already_locked(
463 worker: RemoteWorkerRecord,
467 """Same as above but for codepaths that already hold the lock."""
468 assert self.lock.locked()
470 self.end_per_bundle[uuid] = ts
471 self.in_flight_bundles_by_worker[worker].remove(uuid)
472 if not was_cancelled:
473 start = self.start_per_bundle[uuid]
474 assert start is not None
475 bundle_latency = ts - start
476 x = self.finished_bundle_timings_per_worker.get(worker, [])
477 x.append(bundle_latency)
478 self.finished_bundle_timings_per_worker[worker] = x
479 self.finished_bundle_timings.append(bundle_latency)
481 def record_processing_began(self, uuid: str):
482 """Record when work on a bundle begins."""
484 self.start_per_bundle[uuid] = time.time()
486 def total_in_flight(self) -> int:
487 """How many bundles are in flight currently?"""
488 assert self.lock.locked()
490 for worker in self.known_workers:
491 total_in_flight += len(self.in_flight_bundles_by_worker[worker])
492 return total_in_flight
494 def total_idle(self) -> int:
495 """How many idle workers are there currently?"""
496 assert self.lock.locked()
497 return self.worker_count - self.total_in_flight()
500 assert self.lock.locked()
502 total_finished = len(self.finished_bundle_timings)
503 total_in_flight = self.total_in_flight()
504 ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
506 if len(self.finished_bundle_timings) > 1:
507 qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
509 f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
510 f'✅={total_finished}/{self.total_bundles_submitted}, '
511 f'💻n={total_in_flight}/{self.worker_count}\n'
515 f'⏱={ts-self.start_time:.1f}s, '
516 f'✅={total_finished}/{self.total_bundles_submitted}, '
517 f'💻n={total_in_flight}/{self.worker_count}\n'
520 for worker in self.known_workers:
521 ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
522 timings = self.finished_bundle_timings_per_worker.get(worker, [])
526 qworker = numpy.quantile(timings, [0.5, 0.95])
527 ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
531 ret += f' ...finished {count} total bundle(s) so far\n'
532 in_flight = len(self.in_flight_bundles_by_worker[worker])
534 ret += f' ...{in_flight} bundles currently in flight:\n'
535 for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
536 details = self.bundle_details_by_uuid.get(bundle_uuid, None)
537 pid = str(details.pid) if (details and details.pid != 0) else "TBD"
538 if self.start_per_bundle[bundle_uuid] is not None:
539 sec = ts - self.start_per_bundle[bundle_uuid]
540 ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
542 ret += f' {details} setting up / copying data...'
545 if qworker is not None:
547 ret += f'{bg("red")}>💻p95{reset()} '
548 if details is not None:
549 details.slower_than_local_p95 = True
551 if details is not None:
552 details.slower_than_local_p95 = False
556 ret += f'{bg("red")}>∀p95{reset()} '
557 if details is not None:
558 details.slower_than_global_p95 = True
560 details.slower_than_global_p95 = False
564 def periodic_dump(self, total_bundles_submitted: int) -> None:
565 assert self.lock.locked()
566 self.total_bundles_submitted = total_bundles_submitted
568 if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
570 self.last_periodic_dump = ts
573 class RemoteWorkerSelectionPolicy(ABC):
574 """A policy for selecting a remote worker base class."""
577 self.workers: Optional[List[RemoteWorkerRecord]] = None
579 def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
580 self.workers = workers
583 def is_worker_available(self) -> bool:
587 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
591 class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
592 """A remote worker selector that uses weighted RNG."""
595 def is_worker_available(self) -> bool:
597 for worker in self.workers:
603 def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
606 for worker in self.workers:
607 if worker.machine != machine_to_avoid:
609 for _ in range(worker.count * worker.weight):
610 grabbag.append(worker)
612 if len(grabbag) == 0:
613 logger.debug('There are no available workers that avoid %s', machine_to_avoid)
615 for worker in self.workers:
617 for _ in range(worker.count * worker.weight):
618 grabbag.append(worker)
620 if len(grabbag) == 0:
621 logger.warning('There are no available workers?!')
624 worker = random.sample(grabbag, 1)[0]
625 assert worker.count > 0
627 logger.debug('Selected worker %s', worker)
631 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
632 """A remote worker selector that just round robins."""
634 def __init__(self) -> None:
639 def is_worker_available(self) -> bool:
641 for worker in self.workers:
647 def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
651 worker = self.workers[x]
655 if x >= len(self.workers):
658 logger.debug('Selected worker %s', worker)
661 if x >= len(self.workers):
664 logger.warning('Unexpectedly could not find a worker, retrying...')
669 class RemoteExecutor(BaseExecutor):
670 """An executor that uses processes on remote machines to do work. This
671 works by creating "bundles" of work with pickled code in each to be
672 executed. Each bundle is assigned a remote worker based on some policy
673 heuristics. Once assigned to a remote worker, a local subprocess is
674 created. It copies the pickled code to the remote machine via ssh/scp
675 and then starts up work on the remote machine again using ssh. When
676 the work is complete it copies the results back to the local machine.
678 So there is essentially one "controller" machine (which may also be
679 in the remote executor pool and therefore do task work in addition to
680 controlling) and N worker machines. This code runs on the controller
681 whereas on the worker machines we invoke pickled user code via a
682 shim in :file:`remote_worker.py`.
684 Some redundancy and safety provisions are made; e.g. slower than
685 expected tasks have redundant backups created and if a task fails
686 repeatedly we consider it poisoned and give up on it.
690 The network overhead / latency of copying work from the
691 controller machine to the remote workers is relatively high.
692 This executor probably only makes sense to use with
693 computationally expensive tasks such as jobs that will execute
694 for ~30 seconds or longer.
696 See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
701 workers: List[RemoteWorkerRecord],
702 policy: RemoteWorkerSelectionPolicy,
707 workers: A list of remote workers we can call on to do tasks.
708 policy: A policy for selecting remote workers for tasks.
712 self.workers = workers
714 self.worker_count = 0
715 for worker in self.workers:
716 self.worker_count += worker.count
717 if self.worker_count <= 0:
718 msg = f"We need somewhere to schedule work; count was {self.worker_count}"
720 raise RemoteExecutorException(msg)
721 self.policy.register_worker_pool(self.workers)
722 self.cv = threading.Condition()
723 logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
724 self._helper_executor = fut.ThreadPoolExecutor(
725 thread_name_prefix="remote_executor_helper",
726 max_workers=self.worker_count,
728 self.status = RemoteExecutorStatus(self.worker_count)
729 self.total_bundles_submitted = 0
730 self.backup_lock = threading.Lock()
731 self.last_backup = None
733 self.heartbeat_thread,
734 self.heartbeat_stop_event,
735 ) = self._run_periodic_heartbeat()
736 self.already_shutdown = False
739 def _run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
741 We create a background thread to invoke :meth:`_heartbeat` regularly
742 while we are scheduling work. It does some accounting such as
743 looking for slow bundles to tag for backup creation, checking for
744 unexpected failures, and printing a fancy message on stdout.
746 while not stop_event.is_set():
748 logger.debug('Running periodic heartbeat code...')
750 logger.debug('Periodic heartbeat thread shutting down.')
752 def _heartbeat(self) -> None:
753 # Note: this is invoked on a background thread, not an
754 # executor thread. Be careful what you do with it b/c it
755 # needs to get back and dump status again periodically.
756 with self.status.lock:
757 self.status.periodic_dump(self.total_bundles_submitted)
759 # Look for bundles to reschedule via executor.submit
760 if config.config['executors_schedule_remote_backups']:
761 self._maybe_schedule_backup_bundles()
763 def _maybe_schedule_backup_bundles(self):
764 """Maybe schedule backup bundles if we see a very slow bundle."""
766 assert self.status.lock.locked()
767 num_done = len(self.status.finished_bundle_timings)
768 num_idle_workers = self.worker_count - self.task_count
772 and num_idle_workers > 0
773 and (self.last_backup is None or (now - self.last_backup > 9.0))
774 and self.backup_lock.acquire(blocking=False)
777 assert self.backup_lock.locked()
779 bundle_to_backup = None
784 ) in self.status.in_flight_bundles_by_worker.items():
786 # Prefer to schedule backups of bundles running on
789 for record in self.workers:
790 if worker.machine == record.machine:
791 base_score = float(record.weight)
792 base_score = 1.0 / base_score
794 base_score = int(base_score)
797 for uuid in bundle_uuids:
798 bundle = self.status.bundle_details_by_uuid.get(uuid, None)
801 and bundle.src_bundle is None
802 and bundle.backup_bundles is not None
806 # Schedule backups of bundles running
807 # longer; especially those that are
809 start_ts = self.status.start_per_bundle[uuid]
810 if start_ts is not None:
811 runtime = now - start_ts
813 logger.debug('score[%s] => %.1f # latency boost', bundle, score)
815 if bundle.slower_than_local_p95:
817 logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
819 if bundle.slower_than_global_p95:
821 logger.debug('score[%s] => %.1f # >global p95', bundle, score)
823 # Prefer backups of bundles that don't
824 # have backups already.
825 backup_count = len(bundle.backup_bundles)
826 if backup_count == 0:
828 elif backup_count == 1:
830 elif backup_count == 2:
835 'score[%s] => %.1f # {backup_count} dup backup factor',
840 if score != 0 and (best_score is None or score > best_score):
841 bundle_to_backup = bundle
842 assert bundle is not None
843 assert bundle.backup_bundles is not None
844 assert bundle.src_bundle is None
847 # Note: this is all still happening on the heartbeat
848 # runner thread. That's ok because
849 # _schedule_backup_for_bundle uses the executor to
850 # submit the bundle again which will cause it to be
851 # picked up by a worker thread and allow this thread
852 # to return to run future heartbeats.
853 if bundle_to_backup is not None:
854 self.last_backup = now
856 '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
860 self._schedule_backup_for_bundle(bundle_to_backup)
862 self.backup_lock.release()
864 def _is_worker_available(self) -> bool:
865 """Is there a worker available currently?"""
866 return self.policy.is_worker_available()
868 def _acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
869 """Try to acquire a worker."""
870 return self.policy.acquire_worker(machine_to_avoid)
872 def _find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
873 """Find a worker or block until one becomes available."""
875 while not self._is_worker_available():
877 worker = self._acquire_worker(machine_to_avoid)
878 if worker is not None:
880 msg = "We should never reach this point in the code"
884 def _release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
885 """Release a previously acquired worker."""
886 worker = bundle.worker
887 assert worker is not None
888 logger.debug('Released worker %s', worker)
889 self.status.record_release_worker(
897 self.adjust_task_count(-1)
899 def _check_if_cancelled(self, bundle: BundleDetails) -> bool:
900 """See if a particular bundle is cancelled. Do not block."""
901 with self.status.lock:
902 if bundle.is_cancelled.wait(timeout=0.0):
903 logger.debug('Bundle %s is cancelled, bail out.', bundle.uuid)
904 bundle.was_cancelled = True
908 def _launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
909 """Find a worker for bundle or block until one is available."""
911 self.adjust_task_count(+1)
913 hostname = bundle.hostname
914 avoid_machine = override_avoid_machine
915 is_original = bundle.src_bundle is None
917 # Try not to schedule a backup on the same host as the original.
918 if avoid_machine is None and bundle.src_bundle is not None:
919 avoid_machine = bundle.src_bundle.machine
921 while worker is None:
922 worker = self._find_available_worker_or_block(avoid_machine)
923 assert worker is not None
925 # Ok, found a worker.
926 bundle.worker = worker
927 machine = bundle.machine = worker.machine
928 username = bundle.username = worker.username
929 self.status.record_acquire_worker(worker, uuid)
930 logger.debug('%s: Running bundle on %s...', bundle, worker)
932 # Before we do any work, make sure the bundle is still viable.
933 # It may have been some time between when it was submitted and
934 # now due to lack of worker availability and someone else may
935 # have already finished it.
936 if self._check_if_cancelled(bundle):
938 return self._process_work_result(bundle)
939 except Exception as e:
940 logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
941 self._release_worker(bundle)
943 # Weird. We are the original owner of this
944 # bundle. For it to have been cancelled, a backup
945 # must have already started and completed before
946 # we even for started. Moreover, the backup says
947 # it is done but we can't find the results it
948 # should have copied over. Reschedule the whole
952 '%s: We are the original owner thread and yet there are '
953 'no results for this bundle. This is unexpected and bad.',
956 return self._emergency_retry_nasty_bundle(bundle)
958 # We're a backup and our bundle is cancelled
959 # before we even got started. Do nothing and let
960 # the original bundle's thread worry about either
961 # finding the results or complaining about it.
964 # Send input code / data to worker machine if it's not local.
965 if hostname not in machine:
967 cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
968 start_ts = time.time()
969 logger.info("%s: Copying work to %s via %s.", bundle, worker, cmd)
971 xfer_latency = time.time() - start_ts
972 logger.debug("%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency)
973 except Exception as e:
974 self._release_worker(bundle)
976 # Weird. We tried to copy the code to the worker
977 # and it failed... And we're the original bundle.
981 "%s: Failed to send instructions to the worker machine?! "
982 "This is not expected; we\'re the original bundle so this shouldn\'t "
983 "be a race condition. Attempting an emergency retry...",
986 return self._emergency_retry_nasty_bundle(bundle)
988 # This is actually expected; we're a backup.
989 # There's a race condition where someone else
990 # already finished the work and removed the source
991 # code_file before we could copy it. Ignore.
993 '%s: Failed to send instructions to the worker machine... '
994 'We\'re a backup and this may be caused by the original (or '
995 'some other backup) already finishing this work. Ignoring.',
1000 # Kick off the work. Note that if this fails we let
1001 # _wait_for_process deal with it.
1002 self.status.record_processing_began(uuid)
1004 f'{SSH} {bundle.username}@{bundle.machine} '
1005 f'"source py39-venv/bin/activate &&'
1006 f' /home/scott/lib/python_modules/remote_worker.py'
1007 f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
1009 logger.debug('%s: Executing %s in the background to kick off work...', bundle, cmd)
1010 p = cmd_in_background(cmd, silent=True)
1012 logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
1013 return self._wait_for_process(p, bundle, 0)
1015 def _wait_for_process(
1016 self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
1018 """At this point we've copied the bundle's pickled code to the remote
1019 worker and started an ssh process that should be invoking the
1020 remote worker to have it execute the user's code. See how
1021 that's going and wait for it to complete or fail. Note that
1022 this code is recursive: there are codepaths where we decide to
1023 stop waiting for an ssh process (because another backup seems
1024 to have finished) but then fail to fetch or parse the results
1025 from that backup and thus call ourselves to continue waiting
1026 on an active ssh process. This is the purpose of the depth
1027 argument: to curtail potential infinite recursion by giving up
1031 p: the Popen record of the ssh job
1032 bundle: the bundle of work being executed remotely
1033 depth: how many retries we've made so far. Starts at zero.
1037 machine = bundle.machine
1038 assert p is not None
1039 pid = p.pid # pid of the ssh process
1042 "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
1045 self._release_worker(bundle)
1046 return self._emergency_retry_nasty_bundle(bundle)
1048 # Spin until either the ssh job we scheduled finishes the
1049 # bundle or some backup worker signals that they finished it
1053 p.wait(timeout=0.25)
1054 except subprocess.TimeoutExpired:
1055 if self._check_if_cancelled(bundle):
1056 logger.info('%s: looks like another worker finished bundle...', bundle)
1059 logger.info("%s: pid %d (%s) is finished!", bundle, pid, machine)
1063 # If we get here we believe the bundle is done; either the ssh
1064 # subprocess finished (hopefully successfully) or we noticed
1065 # that some other worker seems to have completed the bundle
1066 # before us and we're bailing out.
1068 ret = self._process_work_result(bundle)
1069 if ret is not None and p is not None:
1073 # Something went wrong; e.g. we could not copy the results
1074 # back, cleanup after ourselves on the remote machine, or
1075 # unpickle the results we got from the remove machine. If we
1076 # still have an active ssh subprocess, keep waiting on it.
1077 # Otherwise, time for an emergency reschedule.
1078 except Exception as e:
1080 logger.error('%s: Something unexpected just happened...', bundle)
1083 "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
1085 return self._wait_for_process(p, bundle, depth + 1)
1087 self._release_worker(bundle)
1088 return self._emergency_retry_nasty_bundle(bundle)
1090 def _process_work_result(self, bundle: BundleDetails) -> Any:
1091 """A bundle seems to be completed. Check on the results."""
1093 with self.status.lock:
1094 is_original = bundle.src_bundle is None
1095 was_cancelled = bundle.was_cancelled
1096 username = bundle.username
1097 machine = bundle.machine
1098 result_file = bundle.result_file
1099 code_file = bundle.code_file
1101 # Whether original or backup, if we finished first we must
1102 # fetch the results if the computation happened on a
1104 bundle.end_ts = time.time()
1105 if not was_cancelled:
1106 assert bundle.machine is not None
1107 if bundle.hostname not in bundle.machine:
1108 cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
1110 "%s: Fetching results back from %s@%s via %s",
1117 # If either of these throw they are handled in
1118 # _wait_for_process.
1123 except Exception as e:
1130 # Cleanup remote /tmp files.
1132 f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
1134 logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
1135 dur = bundle.end_ts - bundle.start_ts
1136 self.histogram.add_item(dur)
1138 # Only the original worker should unpickle the file contents
1139 # though since it's the only one whose result matters. The
1140 # original is also the only job that may delete result_file
1141 # from disk. Note that the original may have been cancelled
1142 # if one of the backups finished first; it still must read the
1143 # result from disk. It still does that here with is_cancelled
1146 logger.debug("%s: Unpickling %s.", bundle, result_file)
1148 with open(result_file, 'rb') as rb:
1149 serialized = rb.read()
1150 result = cloudpickle.loads(serialized)
1151 except Exception as e:
1153 logger.error('Failed to load %s... this is bad news.', result_file)
1154 self._release_worker(bundle)
1156 # Re-raise the exception; the code in _wait_for_process may
1157 # decide to _emergency_retry_nasty_bundle here.
1159 logger.debug('Removing local (master) %s and %s.', code_file, result_file)
1160 os.remove(result_file)
1161 os.remove(code_file)
1163 # Notify any backups that the original is done so they
1164 # should stop ASAP. Do this whether or not we
1165 # finished first since there could be more than one
1167 if bundle.backup_bundles is not None:
1168 for backup in bundle.backup_bundles:
1170 '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
1172 backup.is_cancelled.set()
1174 # This is a backup job and, by now, we have already fetched
1175 # the bundle results.
1177 # Backup results don't matter, they just need to leave the
1178 # result file in the right place for their originals to
1179 # read/unpickle later.
1182 # Tell the original to stop if we finished first.
1183 if not was_cancelled:
1184 orig_bundle = bundle.src_bundle
1185 assert orig_bundle is not None
1187 '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
1189 orig_bundle.is_cancelled.set()
1190 self._release_worker(bundle, was_cancelled=was_cancelled)
1193 def _create_original_bundle(self, pickle, function_name: str):
1194 """Creates a bundle that is not a backup of any other bundle but
1195 rather represents a user task.
1198 uuid = string_utils.generate_uuid(omit_dashes=True)
1199 code_file = f'/tmp/{uuid}.code.bin'
1200 result_file = f'/tmp/{uuid}.result.bin'
1202 logger.debug('Writing pickled code to %s', code_file)
1203 with open(code_file, 'wb') as wb:
1206 bundle = BundleDetails(
1207 pickled_code=pickle,
1209 function_name=function_name,
1213 hostname=platform.node(),
1214 code_file=code_file,
1215 result_file=result_file,
1217 start_ts=time.time(),
1219 slower_than_local_p95=False,
1220 slower_than_global_p95=False,
1222 is_cancelled=threading.Event(),
1223 was_cancelled=False,
1227 self.status.record_bundle_details(bundle)
1228 logger.debug('%s: Created an original bundle', bundle)
1231 def _create_backup_bundle(self, src_bundle: BundleDetails):
1232 """Creates a bundle that is a backup of another bundle that is
1233 running too slowly."""
1235 assert self.status.lock.locked()
1236 assert src_bundle.backup_bundles is not None
1237 n = len(src_bundle.backup_bundles)
1238 uuid = src_bundle.uuid + f'_backup#{n}'
1240 backup_bundle = BundleDetails(
1241 pickled_code=src_bundle.pickled_code,
1243 function_name=src_bundle.function_name,
1247 hostname=src_bundle.hostname,
1248 code_file=src_bundle.code_file,
1249 result_file=src_bundle.result_file,
1251 start_ts=time.time(),
1253 slower_than_local_p95=False,
1254 slower_than_global_p95=False,
1255 src_bundle=src_bundle,
1256 is_cancelled=threading.Event(),
1257 was_cancelled=False,
1258 backup_bundles=None, # backup backups not allowed
1261 src_bundle.backup_bundles.append(backup_bundle)
1262 self.status.record_bundle_details_already_locked(backup_bundle)
1263 logger.debug('%s: Created a backup bundle', backup_bundle)
1264 return backup_bundle
1266 def _schedule_backup_for_bundle(self, src_bundle: BundleDetails):
1267 """Schedule a backup of src_bundle."""
1269 assert self.status.lock.locked()
1270 assert src_bundle is not None
1271 backup_bundle = self._create_backup_bundle(src_bundle)
1273 '%s/%s: Scheduling backup for execution...',
1275 backup_bundle.function_name,
1277 self._helper_executor.submit(self._launch, backup_bundle)
1279 # Results from backups don't matter; if they finish first
1280 # they will move the result_file to this machine and let
1281 # the original pick them up and unpickle them (and return
1284 def _emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
1285 """Something unexpectedly failed with bundle. Either retry it
1286 from the beginning or throw in the towel and give up on it."""
1288 is_original = bundle.src_bundle is None
1289 bundle.worker = None
1290 avoid_last_machine = bundle.machine
1291 bundle.machine = None
1292 bundle.username = None
1293 bundle.failure_count += 1
1299 if bundle.failure_count > retry_limit:
1301 '%s: Tried this bundle too many times already (%dx); giving up.',
1306 raise RemoteExecutorException(
1307 f'{bundle}: This bundle can\'t be completed despite several backups and retries',
1311 '%s: At least it\'s only a backup; better luck with the others.', bundle
1315 msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
1318 return self._launch(bundle, avoid_last_machine)
1321 def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
1322 """Submit work to be done. This is the user entry point of this
1324 if self.already_shutdown:
1325 raise Exception('Submitted work after shutdown.')
1326 pickle = _make_cloud_pickle(function, *args, **kwargs)
1327 bundle = self._create_original_bundle(pickle, function.__name__)
1328 self.total_bundles_submitted += 1
1329 return self._helper_executor.submit(self._launch, bundle)
1332 def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
1333 """Shutdown the executor."""
1334 if not self.already_shutdown:
1335 logging.debug('Shutting down RemoteExecutor %s', self.title)
1336 self.heartbeat_stop_event.set()
1337 self.heartbeat_thread.join()
1338 self._helper_executor.shutdown(wait)
1340 print(self.histogram.__repr__(label_formatter='%ds'))
1341 self.already_shutdown = True
1344 class RemoteWorkerPoolProvider:
1346 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1350 @persistent.persistent_autoloaded_singleton() # type: ignore
1351 class ConfigRemoteWorkerPoolProvider(RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent):
1352 def __init__(self, json_remote_worker_pool: Dict[str, Any]):
1353 self.remote_worker_pool = []
1354 for record in json_remote_worker_pool['remote_worker_records']:
1355 self.remote_worker_pool.append(self.dataclassFromDict(RemoteWorkerRecord, record))
1356 assert len(self.remote_worker_pool) > 0
1359 def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
1360 fieldSet = {f.name for f in fields(clsName) if f.init}
1361 filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
1362 return clsName(**filteredArgDict)
1365 def get_remote_workers(self) -> List[RemoteWorkerRecord]:
1366 return self.remote_worker_pool
1369 def get_persistent_data(self) -> List[RemoteWorkerRecord]:
1370 return self.remote_worker_pool
1374 def get_filename() -> str:
1375 return config.config['remote_worker_records_file']
1379 def should_we_load_data(filename: str) -> bool:
1384 def should_we_save_data(filename: str) -> bool:
1389 class DefaultExecutors(object):
1390 """A container for a default thread, process and remote executor.
1391 These are not created until needed and we take care to clean up
1392 before process exit automatically for the caller's convenience.
1393 Instead of creating your own executor, consider using the one
1394 from this pool. e.g.::
1396 @par.parallelize(method=par.Method.PROCESS)
1398 solutions: List[Work],
1405 def start_do_work(all_work: List[Work]):
1407 logger.debug('Sharding work into groups of 10.')
1408 for subset in list_utils.shard(all_work, 10):
1409 shards.append([x for x in subset])
1411 logger.debug('Kicking off helper pool.')
1413 for n, shard in enumerate(shards):
1416 shard, n, shared_cache.get_name(), max_letter_pop_per_word
1419 smart_future.wait_all(results)
1421 # Note: if you forget to do this it will clean itself up
1422 # during program termination including tearing down any
1423 # active ssh connections.
1424 executors.DefaultExecutors().process_pool().shutdown()
1428 self.thread_executor: Optional[ThreadExecutor] = None
1429 self.process_executor: Optional[ProcessExecutor] = None
1430 self.remote_executor: Optional[RemoteExecutor] = None
1433 def _ping(host) -> bool:
1434 logger.debug('RUN> ping -c 1 %s', host)
1436 x = cmd_exitcode(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
1441 def thread_pool(self) -> ThreadExecutor:
1442 if self.thread_executor is None:
1443 self.thread_executor = ThreadExecutor()
1444 return self.thread_executor
1446 def process_pool(self) -> ProcessExecutor:
1447 if self.process_executor is None:
1448 self.process_executor = ProcessExecutor()
1449 return self.process_executor
1451 def remote_pool(self) -> RemoteExecutor:
1452 if self.remote_executor is None:
1453 logger.info('Looking for some helper machines...')
1454 provider = ConfigRemoteWorkerPoolProvider()
1455 all_machines = provider.get_remote_workers()
1458 # Make sure we can ping each machine.
1459 for record in all_machines:
1460 if self._ping(record.machine):
1461 logger.info('%s is alive / responding to pings', record.machine)
1464 # The controller machine has a lot to do; go easy on it.
1466 if record.machine == platform.node() and record.count > 1:
1467 logger.info('Reducing workload for %s.', record.machine)
1468 record.count = max(int(record.count / 2), 1)
1470 policy = WeightedRandomRemoteWorkerSelectionPolicy()
1471 policy.register_worker_pool(pool)
1472 self.remote_executor = RemoteExecutor(pool, policy)
1473 return self.remote_executor
1475 def shutdown(self) -> None:
1476 if self.thread_executor is not None:
1477 self.thread_executor.shutdown(wait=True, quiet=True)
1478 self.thread_executor = None
1479 if self.process_executor is not None:
1480 self.process_executor.shutdown(wait=True, quiet=True)
1481 self.process_executor = None
1482 if self.remote_executor is not None:
1483 self.remote_executor.shutdown(wait=True, quiet=True)
1484 self.remote_executor = None