from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
)
SSH = '/usr/bin/ssh -oForwardX11=no'
-SCP = '/usr/bin/scp'
+SCP = '/usr/bin/scp -C'
def make_cloud_pickle(fun, *args, **kwargs):
class BaseExecutor(ABC):
def __init__(self, *, title=''):
self.title = title
- self.task_count = 0
self.histogram = hist.SimpleHistogram(
hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
)
+ self.task_count = 0
@abstractmethod
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
pass
@abstractmethod
- def shutdown(self, wait: bool = True) -> None:
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
pass
+ def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
+ """Shutdown the executor and return True if the executor is idle
+ (i.e. there are no pending or active tasks). Return False
+ otherwise. Note: this should only be called by the launcher
+ process.
+
+ """
+ if self.task_count == 0:
+ self.shutdown(wait=True, quiet=quiet)
+ return True
+ return False
+
def adjust_task_count(self, delta: int) -> None:
+ """Change the task count. Note: do not call this method from a
+ worker, it should only be called by the launcher process /
+ thread / machine.
+
+ """
self.task_count += delta
- logger.debug(f'Executor current task count is {self.task_count}')
+ logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+
+ def get_task_count(self) -> int:
+ """Change the task count. Note: do not call this method from a
+ worker, it should only be called by the launcher process /
+ thread / machine.
+
+ """
+ return self.task_count
class ThreadExecutor(BaseExecutor):
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
+ self.already_shutdown = False
+ # This is run on a different thread; do not adjust task count here.
def run_local_bundle(self, fun, *args, **kwargs):
logger.debug(f"Running local bundle at {fun.__name__}")
- start = time.time()
result = fun(*args, **kwargs)
- end = time.time()
- self.adjust_task_count(-1)
- duration = end - start
- logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
- self.histogram.add_item(duration)
return result
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ if self.already_shutdown:
+ raise Exception('Submitted work after shutdown.')
self.adjust_task_count(+1)
newargs = []
newargs.append(function)
for arg in args:
newargs.append(arg)
- return self._thread_pool_executor.submit(
+ start = time.time()
+ result = self._thread_pool_executor.submit(
self.run_local_bundle, *newargs, **kwargs
)
+ result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+ result.add_done_callback(lambda _: self.adjust_task_count(-1))
+ return result
@overrides
- def shutdown(self, wait=True) -> None:
- logger.debug(f'Shutting down threadpool executor {self.title}')
- print(self.histogram)
- self._thread_pool_executor.shutdown(wait)
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ if not self.already_shutdown:
+ logger.debug(f'Shutting down threadpool executor {self.title}')
+ self._thread_pool_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
+ self.already_shutdown = True
class ProcessExecutor(BaseExecutor):
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
+ self.already_shutdown = False
+ # This is run in another process; do not adjust task count here.
def run_cloud_pickle(self, pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
logger.debug(f"Running pickled bundle at {fun.__name__}")
result = fun(*args, **kwargs)
- self.adjust_task_count(-1)
return result
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ if self.already_shutdown:
+ raise Exception('Submitted work after shutdown.')
start = time.time()
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+ result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self, wait=True) -> None:
- logger.debug(f'Shutting down processpool executor {self.title}')
- self._process_executor.shutdown(wait)
- print(self.histogram)
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ if not self.already_shutdown:
+ logger.debug(f'Shutting down processpool executor {self.title}')
+ self._process_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
+ self.already_shutdown = True
def __getstate__(self):
state = self.__dict__.copy()
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle: BundleDetails
+ src_bundle: Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str, float] = defaultdict(float)
+ self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
for worker in self.workers:
- for x in range(0, worker.count):
- for y in range(0, worker.weight):
- grabbag.append(worker)
-
- for _ in range(0, 5):
- random.shuffle(grabbag)
- worker = grabbag[0]
- if worker.machine != machine_to_avoid or _ > 2:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
+
+ if len(grabbag) == 0:
+ logger.debug(
+ f'There are no available workers that avoid {machine_to_avoid}...'
+ )
+ for worker in self.workers:
if worker.count > 0:
- worker.count -= 1
- logger.debug(f'Selected worker {worker}')
- return worker
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
+
+ if len(grabbag) == 0:
+ logger.warning('There are no available workers?!')
+ return None
+
+ worker = random.sample(grabbag, 1)[0]
+ assert worker.count > 0
+ worker.count -= 1
+ logger.debug(f'Chose worker {worker}')
+ return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
class RemoteExecutor(BaseExecutor):
def __init__(
- self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
+ self,
+ workers: List[RemoteWorkerRecord],
+ policy: RemoteWorkerSelectionPolicy,
) -> None:
super().__init__()
self.workers = workers
self.heartbeat_thread,
self.heartbeat_stop_event,
) = self.run_periodic_heartbeat()
+ self.already_shutdown = False
@background_thread
- def run_periodic_heartbeat(self, stop_event) -> None:
+ def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
while not stop_event.is_set():
time.sleep(5.0)
logger.debug('Running periodic heartbeat code...')
logger.debug('Periodic heartbeat thread shutting down.')
def heartbeat(self) -> None:
+ # Note: this is invoked on a background thread, not an
+ # executor thread. Be careful what you do with it b/c it
+ # needs to get back and dump status again periodically.
with self.status.lock:
- # Dump regular progress report
self.status.periodic_dump(self.total_bundles_submitted)
# Look for bundles to reschedule via executor.submit
if (
num_done > 2
and num_idle_workers > 1
- and (self.last_backup is None or (now - self.last_backup > 6.0))
+ and (self.last_backup is None or (now - self.last_backup > 9.0))
and self.backup_lock.acquire(blocking=False)
):
try:
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+ self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
+ assert p
pid = p.pid
if depth > 3:
logger.error(
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
- bundle.src_bundle.is_cancelled.set()
+ orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+ def emergency_retry_nasty_bundle(
+ self, bundle: BundleDetails
+ ) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
@overrides
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+ if self.already_shutdown:
+ raise Exception('Submitted work after shutdown.')
pickle = make_cloud_pickle(function, *args, **kwargs)
bundle = self.create_original_bundle(pickle, function.__name__)
self.total_bundles_submitted += 1
return self._helper_executor.submit(self.launch, bundle)
@overrides
- def shutdown(self, wait=True) -> None:
- logging.debug(f'Shutting down RemoteExecutor {self.title}')
- self.heartbeat_stop_event.set()
- self.heartbeat_thread.join()
- self._helper_executor.shutdown(wait)
- print(self.histogram)
+ def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+ if not self.already_shutdown:
+ logging.debug(f'Shutting down RemoteExecutor {self.title}')
+ self.heartbeat_stop_event.set()
+ self.heartbeat_thread.join()
+ self._helper_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
+ self.already_shutdown = True
@singleton
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
- weight=25,
+ weight=30,
count=6,
),
)
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
- weight=30,
+ weight=25,
count=10,
),
)
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
- weight=25,
+ weight=30,
count=6,
),
)
RemoteWorkerRecord(
username='scott',
machine='backup.house',
- weight=7,
+ weight=8,
count=2,
),
)
def shutdown(self) -> None:
if self.thread_executor is not None:
- self.thread_executor.shutdown()
+ self.thread_executor.shutdown(wait=True, quiet=True)
self.thread_executor = None
if self.process_executor is not None:
- self.process_executor.shutdown()
+ self.process_executor.shutdown(wait=True, quiet=True)
self.process_executor = None
if self.remote_executor is not None:
- self.remote_executor.shutdown()
+ self.remote_executor.shutdown(wait=True, quiet=True)
self.remote_executor = None