projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Fix a bug, add some testcases.
[python_utils.git]
/
executors.py
diff --git
a/executors.py
b/executors.py
index 28507b0e8a547e9d1edc615daa5de7945edb457d..60bd166bb23ba63b227179c404d281681b35cad9 100644
(file)
--- a/
executors.py
+++ b/
executors.py
@@
-1,34
+1,42
@@
#!/usr/bin/env python3
#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+"""Defines three executors: a thread executor for doing work using a
+threadpool, a process executor for doing work in other processes on
+the same machine and a remote executor for farming out work to other
+machines.
+
+Also defines DefaultExecutors which is a container for references to
+global executors / worker pools with automatic shutdown semantics."""
from __future__ import annotations
from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
@@
-67,11
+75,13
@@
SCP = '/usr/bin/scp -C'
def make_cloud_pickle(fun, *args, **kwargs):
def make_cloud_pickle(fun, *args, **kwargs):
- logger.debug(
f"Making cloudpickled bundle at {fun.__name__}"
)
+ logger.debug(
"Making cloudpickled bundle at %s", fun.__name__
)
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
return cloudpickle.dumps((fun, args, kwargs))
class BaseExecutor(ABC):
+ """The base executor interface definition."""
+
def __init__(self, *, title=''):
self.title = title
self.histogram = hist.SimpleHistogram(
def __init__(self, *, title=''):
self.title = title
self.histogram = hist.SimpleHistogram(
@@
-84,10
+94,10
@@
class BaseExecutor(ABC):
pass
@abstractmethod
pass
@abstractmethod
- def shutdown(self,
wait: bool = Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
pass
pass
- def shutdown_if_idle(self) -> bool:
+ def shutdown_if_idle(self
, *, quiet: bool = False
) -> bool:
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
@@
-95,7
+105,7
@@
class BaseExecutor(ABC):
"""
if self.task_count == 0:
"""
if self.task_count == 0:
- self.shutdown()
+ self.shutdown(
wait=True, quiet=quiet
)
return True
return False
return True
return False
@@
-106,7
+116,7
@@
class BaseExecutor(ABC):
"""
self.task_count += delta
"""
self.task_count += delta
- logger.debug(
f'Adjusted task count by {delta} to {self.task_count}'
)
+ logger.debug(
'Adjusted task count by %d to %d.', delta, self.task_count
)
def get_task_count(self) -> int:
"""Change the task count. Note: do not call this method from a
def get_task_count(self) -> int:
"""Change the task count. Note: do not call this method from a
@@
-118,6
+128,8
@@
class BaseExecutor(ABC):
class ThreadExecutor(BaseExecutor):
class ThreadExecutor(BaseExecutor):
+ """A threadpool executor instance."""
+
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = None
def __init__(self, max_workers: Optional[int] = None):
super().__init__()
workers = None
@@
-125,15
+137,16
@@
class ThreadExecutor(BaseExecutor):
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
workers = max_workers
elif 'executors_threadpool_size' in config.config:
workers = config.config['executors_threadpool_size']
- logger.debug(
f'Creating threadpool executor with {workers} workers'
)
+ logger.debug(
'Creating threadpool executor with %d workers', workers
)
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
self.already_shutdown = False
# This is run on a different thread; do not adjust task count here.
self._thread_pool_executor = fut.ThreadPoolExecutor(
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
self.already_shutdown = False
# This is run on a different thread; do not adjust task count here.
- def run_local_bundle(self, fun, *args, **kwargs):
- logger.debug(f"Running local bundle at {fun.__name__}")
+ @staticmethod
+ def run_local_bundle(fun, *args, **kwargs):
+ logger.debug("Running local bundle at %s", fun.__name__)
result = fun(*args, **kwargs)
return result
result = fun(*args, **kwargs)
return result
@@
-148,22
+161,25
@@
class ThreadExecutor(BaseExecutor):
newargs.append(arg)
start = time.time()
result = self._thread_pool_executor.submit(
newargs.append(arg)
start = time.time()
result = self._thread_pool_executor.submit(
-
self
.run_local_bundle, *newargs, **kwargs
+
ThreadExecutor
.run_local_bundle, *newargs, **kwargs
)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
if not self.already_shutdown:
- logger.debug(f'Shutting down threadpool executor {self.title}')
- print(self.histogram.__repr__(label_formatter='%d'))
+ logger.debug('Shutting down threadpool executor %s', self.title)
self._thread_pool_executor.shutdown(wait)
self._thread_pool_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
class ProcessExecutor(BaseExecutor):
self.already_shutdown = True
class ProcessExecutor(BaseExecutor):
+ """A processpool executor."""
+
def __init__(self, max_workers=None):
super().__init__()
workers = None
def __init__(self, max_workers=None):
super().__init__()
workers = None
@@
-171,16
+187,17
@@
class ProcessExecutor(BaseExecutor):
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
workers = max_workers
elif 'executors_processpool_size' in config.config:
workers = config.config['executors_processpool_size']
- logger.debug(
f'Creating processpool executor with {workers} workers.'
)
+ logger.debug(
'Creating processpool executor with %d workers.', workers
)
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
self.already_shutdown = False
# This is run in another process; do not adjust task count here.
self._process_executor = fut.ProcessPoolExecutor(
max_workers=workers,
)
self.already_shutdown = False
# This is run in another process; do not adjust task count here.
- def run_cloud_pickle(self, pickle):
+ @staticmethod
+ def run_cloud_pickle(pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
fun, args, kwargs = cloudpickle.loads(pickle)
- logger.debug(
f"Running pickled bundle at {fun.__name__}"
)
+ logger.debug(
"Running pickled bundle at %s", fun.__name__
)
result = fun(*args, **kwargs)
return result
result = fun(*args, **kwargs)
return result
@@
-191,17
+208,18
@@
class ProcessExecutor(BaseExecutor):
start = time.time()
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
start = time.time()
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
- result = self._process_executor.submit(
self
.run_cloud_pickle, pickle)
+ result = self._process_executor.submit(
ProcessExecutor
.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
if not self.already_shutdown:
- logger.debug(
f'Shutting down processpool executor {self.title}'
)
+ logger.debug(
'Shutting down processpool executor %s', self.title
)
self._process_executor.shutdown(wait)
self._process_executor.shutdown(wait)
- print(self.histogram.__repr__(label_formatter='%d'))
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
def __getstate__(self):
self.already_shutdown = True
def __getstate__(self):
@@
-218,6
+236,8
@@
class RemoteExecutorException(Exception):
@dataclass
class RemoteWorkerRecord:
@dataclass
class RemoteWorkerRecord:
+ """A record of info about a remote worker."""
+
username: str
machine: str
weight: int
username: str
machine: str
weight: int
@@
-232,6
+252,12
@@
class RemoteWorkerRecord:
@dataclass
class BundleDetails:
@dataclass
class BundleDetails:
+ """All info necessary to define some unit of work that needs to be
+ done, where it is being run, its state, whether it is an original
+ bundle of a backup bundle, how many times it has failed, etc...
+
+ """
+
pickled_code: bytes
uuid: str
fname: str
pickled_code: bytes
uuid: str
fname: str
@@
-246,7
+272,7
@@
class BundleDetails:
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle:
BundleDetails
+ src_bundle:
Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
@@
-282,15
+308,15
@@
class BundleDetails:
class RemoteExecutorStatus:
class RemoteExecutorStatus:
+ """A status 'scoreboard' for a remote executor."""
+
def __init__(self, total_worker_count: int) -> None:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
def __init__(self, total_worker_count: int) -> None:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str,
float
] = defaultdict(float)
+ self.start_per_bundle: Dict[str,
Optional[float]
] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
@@
-305,9
+331,7
@@
class RemoteExecutorStatus:
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
@@
-343,8
+367,10
@@
class RemoteExecutorStatus:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
- x = self.finished_bundle_timings_per_worker.get(worker, list())
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
+ x = self.finished_bundle_timings_per_worker.get(worker, [])
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
self.finished_bundle_timings.append(bundle_latency)
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
self.finished_bundle_timings.append(bundle_latency)
@@
-439,7
+465,12
@@
class RemoteExecutorStatus:
class RemoteWorkerSelectionPolicy(ABC):
class RemoteWorkerSelectionPolicy(ABC):
- def register_worker_pool(self, workers):
+ """A policy for selecting a remote worker base class."""
+
+ def __init__(self):
+ self.workers: Optional[List[RemoteWorkerRecord]] = None
+
+ def register_worker_pool(self, workers: List[RemoteWorkerRecord]):
self.workers = workers
@abstractmethod
self.workers = workers
@abstractmethod
@@
-452,30
+483,33
@@
class RemoteWorkerSelectionPolicy(ABC):
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that uses weighted RNG."""
+
@overrides
def is_worker_available(self) -> bool:
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
return False
@overrides
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
- for worker in self.workers:
- if worker.machine != machine_to_avoid:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ if self.workers:
+ for worker in self.workers:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
- for worker in self.workers:
- if worker.count > 0:
- for _ in range(worker.count * worker.weight):
- grabbag.append(worker)
+ logger.debug('There are no available workers that avoid %s', machine_to_avoid)
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
if len(grabbag) == 0:
logger.warning('There are no available workers?!')
if len(grabbag) == 0:
logger.warning('There are no available workers?!')
@@
-484,46
+518,51
@@
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
worker = random.sample(grabbag, 1)[0]
assert worker.count > 0
worker.count -= 1
worker = random.sample(grabbag, 1)[0]
assert worker.count > 0
worker.count -= 1
- logger.debug(
f'Chose worker {worker}'
)
+ logger.debug(
'Selected worker %s', worker
)
return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
+ """A remote worker selector that just round robins."""
+
def __init__(self) -> None:
def __init__(self) -> None:
+ super().__init__()
self.index = 0
@overrides
def is_worker_available(self) -> bool:
self.index = 0
@overrides
def is_worker_available(self) -> bool:
- for worker in self.workers:
- if worker.count > 0:
- return True
+ if self.workers:
+ for worker in self.workers:
+ if worker.count > 0:
+ return True
return False
@overrides
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
- x = self.index
- while True:
- worker = self.workers[x]
- if worker.count > 0:
- worker.count -= 1
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
+ if self.workers:
+ x = self.index
+ while True:
+ worker = self.workers[x]
+ if worker.count > 0:
+ worker.count -= 1
+ x += 1
+ if x >= len(self.workers):
+ x = 0
+ self.index = x
+ logger.debug('Selected worker %s', worker)
+ return worker
x += 1
if x >= len(self.workers):
x = 0
x += 1
if x >= len(self.workers):
x = 0
- self.index = x
- logger.debug(f'Selected worker {worker}')
- return worker
- x += 1
- if x >= len(self.workers):
- x = 0
- if x == self.index:
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ if x == self.index:
+ logger.warning('Unexpectedly could not find a worker, retrying...')
+ return None
+ return None
class RemoteExecutor(BaseExecutor):
class RemoteExecutor(BaseExecutor):
+ """A remote work executor."""
+
def __init__(
self,
workers: List[RemoteWorkerRecord],
def __init__(
self,
workers: List[RemoteWorkerRecord],
@@
-541,9
+580,7
@@
class RemoteExecutor(BaseExecutor):
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug('Creating %d local threads, one per remote worker.', self.worker_count)
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
@@
-625,21
+662,15
@@
class RemoteExecutor(BaseExecutor):
if start_ts is not None:
runtime = now - start_ts
score += runtime
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug('score[%s] => %.1f # latency boost', bundle, score)
if bundle.slower_than_local_p95:
score += runtime / 2
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
if bundle.slower_than_global_p95:
score += runtime / 4
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug('score[%s] => %.1f # >global p95', bundle, score)
# Prefer backups of bundles that don't
# have backups already.
# Prefer backups of bundles that don't
# have backups already.
@@
-653,12
+684,12
@@
class RemoteExecutor(BaseExecutor):
else:
score = 0
logger.debug(
else:
score = 0
logger.debug(
- f'score[{bundle}] => {score} # {backup_count} dup backup factor'
+ 'score[%s] => %.1f # {backup_count} dup backup factor',
+ bundle,
+ score,
)
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
@@
-674,7
+705,9
@@
class RemoteExecutor(BaseExecutor):
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
if bundle_to_backup is not None:
self.last_backup = now
logger.info(
- f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+ '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+ bundle_to_backup,
+ best_score,
)
self.schedule_backup_for_bundle(bundle_to_backup)
finally:
)
self.schedule_backup_for_bundle(bundle_to_backup)
finally:
@@
-683,14
+716,10
@@
class RemoteExecutor(BaseExecutor):
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
with self.cv:
while not self.is_worker_available():
self.cv.wait()
@@
-704,7
+733,7
@@
class RemoteExecutor(BaseExecutor):
def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
- logger.debug(
f'Released worker {worker}'
)
+ logger.debug(
'Released worker %s', worker
)
self.status.record_release_worker(
worker,
bundle.uuid,
self.status.record_release_worker(
worker,
bundle.uuid,
@@
-718,7
+747,7
@@
class RemoteExecutor(BaseExecutor):
def check_if_cancelled(self, bundle: BundleDetails) -> bool:
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
def check_if_cancelled(self, bundle: BundleDetails) -> bool:
with self.status.lock:
if bundle.is_cancelled.wait(timeout=0.0):
- logger.debug(
f'Bundle {bundle.uuid} is cancelled, bail out.'
)
+ logger.debug(
'Bundle %s is cancelled, bail out.', bundle.uuid
)
bundle.was_cancelled = True
return True
return False
bundle.was_cancelled = True
return True
return False
@@
-737,14
+766,14
@@
class RemoteExecutor(BaseExecutor):
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker
is not None
# Ok, found a worker.
bundle.worker = worker
machine = bundle.machine = worker.machine
username = bundle.username = worker.username
self.status.record_acquire_worker(worker, uuid)
# Ok, found a worker.
bundle.worker = worker
machine = bundle.machine = worker.machine
username = bundle.username = worker.username
self.status.record_acquire_worker(worker, uuid)
- logger.debug(
f'{bundle}: Running bundle on {worker}...'
)
+ logger.debug(
'%s: Running bundle on %s...', bundle, worker
)
# Before we do any work, make sure the bundle is still viable.
# It may have been some time between when it was submitted and
# Before we do any work, make sure the bundle is still viable.
# It may have been some time between when it was submitted and
@@
-754,9
+783,7
@@
class RemoteExecutor(BaseExecutor):
try:
return self.process_work_result(bundle)
except Exception as e:
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning('%s: bundle says it\'s cancelled upfront but no results?!', bundle)
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
@@
-768,8
+795,9
@@
class RemoteExecutor(BaseExecutor):
# thing.
logger.exception(e)
logger.error(
# thing.
logger.exception(e)
logger.error(
- f'{bundle}: We are the original owner thread and yet there are '
- + 'no results for this bundle. This is unexpected and bad.'
+ '%s: We are the original owner thread and yet there are '
+ 'no results for this bundle. This is unexpected and bad.',
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
)
return self.emergency_retry_nasty_bundle(bundle)
else:
@@
-785,14
+813,12
@@
class RemoteExecutor(BaseExecutor):
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
start_ts = time.time()
- logger.info(
f"{bundle}: Copying work to {worker} via {cmd}."
)
+ logger.info(
"%s: Copying work to %s via %s.", bundle, worker, cmd
)
run_silently(cmd)
xfer_latency = time.time() - start_ts
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(
f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s."
)
+ logger.debug(
"%s: Copying to %s took %.1fs.", bundle, worker, xfer_latency
)
except Exception as e:
self.release_worker(bundle)
if is_original:
except Exception as e:
self.release_worker(bundle)
if is_original:
@@
-800,9
+826,10
@@
class RemoteExecutor(BaseExecutor):
# And we're the original bundle. We have to retry.
logger.exception(e)
logger.error(
# And we're the original bundle. We have to retry.
logger.exception(e)
logger.error(
- f"{bundle}: Failed to send instructions to the worker machine?! "
- + "This is not expected; we\'re the original bundle so this shouldn\'t "
- + "be a race condition. Attempting an emergency retry..."
+ "%s: Failed to send instructions to the worker machine?! "
+ "This is not expected; we\'re the original bundle so this shouldn\'t "
+ "be a race condition. Attempting an emergency retry...",
+ bundle,
)
return self.emergency_retry_nasty_bundle(bundle)
else:
)
return self.emergency_retry_nasty_bundle(bundle)
else:
@@
-810,10
+837,12
@@
class RemoteExecutor(BaseExecutor):
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
- msg = f'{bundle}: Failed to send instructions to the worker machine... '
- msg += 'We\'re a backup and this may be caused by the original (or some '
- msg += 'other backup) already finishing this work. Ignoring this.'
- logger.warning(msg)
+ logger.warning(
+ '%s: Failed to send instructions to the worker machine... '
+ 'We\'re a backup and this may be caused by the original (or '
+ 'some other backup) already finishing this work. Ignoring.',
+ bundle,
+ )
return None
# Kick off the work. Note that if this fails we let
return None
# Kick off the work. Note that if this fails we let
@@
-825,22
+854,21
@@
class RemoteExecutor(BaseExecutor):
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(
f'{bundle}: Executing {cmd} in the background to kick off work...'
)
+ logger.debug(
'%s: Executing %s in the background to kick off work...', bundle, cmd
)
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug('%s: Local ssh process pid=%d; remote worker is %s.', bundle, p.pid, machine)
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p:
subprocess.Popen
, bundle: BundleDetails, depth: int
+ self, p:
Optional[subprocess.Popen]
, bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
) -> Any:
machine = bundle.machine
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
- f"I've gotten repeated errors waiting on this bundle; giving up on pid={pid}."
+ "I've gotten repeated errors waiting on this bundle; giving up on pid=%d", pid
)
p.terminate()
self.release_worker(bundle)
)
p.terminate()
self.release_worker(bundle)
@@
-854,12
+882,10
@@
class RemoteExecutor(BaseExecutor):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info('%s: looks like another worker finished bundle...', bundle)
break
else:
break
else:
- logger.info(
f"{bundle}: pid {pid} ({machine}) is finished!"
)
+ logger.info(
"%s: pid %d (%s) is finished!", bundle, pid, machine
)
p = None
break
p = None
break
@@
-880,10
+906,11
@@
class RemoteExecutor(BaseExecutor):
# Otherwise, time for an emergency reschedule.
except Exception as e:
logger.exception(e)
# Otherwise, time for an emergency reschedule.
except Exception as e:
logger.exception(e)
- logger.error(
f'{bundle}: Something unexpected just happened...'
)
+ logger.error(
'%s: Something unexpected just happened...', bundle
)
if p is not None:
if p is not None:
- msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- logger.warning(msg)
+ logger.warning(
+ "%s: Failed to wrap up \"done\" bundle, re-waiting on active ssh.", bundle
+ )
return self.wait_for_process(p, bundle, depth + 1)
else:
self.release_worker(bundle)
return self.wait_for_process(p, bundle, depth + 1)
else:
self.release_worker(bundle)
@@
-907,7
+934,11
@@
class RemoteExecutor(BaseExecutor):
if bundle.hostname not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
if bundle.hostname not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
- f"{bundle}: Fetching results back from {username}@{machine} via {cmd}"
+ "%s: Fetching results back from %s@%s via %s",
+ bundle,
+ username,
+ machine,
+ cmd,
)
# If either of these throw they are handled in
)
# If either of these throw they are handled in
@@
-919,17
+950,14
@@
class RemoteExecutor(BaseExecutor):
except Exception as e:
attempts += 1
if attempts >= 3:
except Exception as e:
attempts += 1
if attempts >= 3:
- raise
(e)
+ raise
e
else:
break
run_silently(
else:
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
)
+ logger.debug('Fetching results back took %.2fs', time.time() - bundle.end_ts)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
@@
-940,23
+968,22
@@
class RemoteExecutor(BaseExecutor):
# if one of the backups finished first; it still must read the
# result from disk.
if is_original:
# if one of the backups finished first; it still must read the
# result from disk.
if is_original:
- logger.debug(
f"{bundle}: Unpickling {result_file}."
)
+ logger.debug(
"%s: Unpickling %s.", bundle, result_file
)
try:
with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
try:
with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
- msg = f'Failed to load {result_file}... this is bad news.'
- logger.critical(msg)
+ logger.error('Failed to load %s... this is bad news.', result_file)
self.release_worker(bundle)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
self.release_worker(bundle)
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
- raise
Exception(e)
- logger.debug(
f'Removing local (master) {code_file} and {result_file}.'
)
- os.remove(
f'{result_file}'
)
- os.remove(
f'{code_file}'
)
+ raise
e
+ logger.debug(
'Removing local (master) %s and %s.', code_file, result_file
)
+ os.remove(
result_file
)
+ os.remove(
code_file
)
# Notify any backups that the original is done so they
# should stop ASAP. Do this whether or not we
# Notify any backups that the original is done so they
# should stop ASAP. Do this whether or not we
@@
-965,7
+992,7
@@
class RemoteExecutor(BaseExecutor):
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
logger.debug(
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
+ '%s: Notifying backup %s that it\'s cancelled', bundle, backup.uuid
)
backup.is_cancelled.set()
)
backup.is_cancelled.set()
@@
-979,10
+1006,12
@@
class RemoteExecutor(BaseExecutor):
# Tell the original to stop if we finished first.
if not was_cancelled:
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
logger.debug(
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ '%s: Notifying original %s we beat them to it.', bundle, orig_bundle.uuid
)
)
-
bundle.src
_bundle.is_cancelled.set()
+
orig
_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
@@
-993,8
+1022,8
@@
class RemoteExecutor(BaseExecutor):
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
- logger.debug(
f'Writing pickled code to {code_file}'
)
- with open(
f'{code_file}'
, 'wb') as wb:
+ logger.debug(
'Writing pickled code to %s', code_file
)
+ with open(
code_file
, 'wb') as wb:
wb.write(pickle)
bundle = BundleDetails(
wb.write(pickle)
bundle = BundleDetails(
@@
-1019,7
+1048,7
@@
class RemoteExecutor(BaseExecutor):
failure_count=0,
)
self.status.record_bundle_details(bundle)
failure_count=0,
)
self.status.record_bundle_details(bundle)
- logger.debug(
f'{bundle}: Created an original bundle'
)
+ logger.debug(
'%s: Created an original bundle', bundle
)
return bundle
def create_backup_bundle(self, src_bundle: BundleDetails):
return bundle
def create_backup_bundle(self, src_bundle: BundleDetails):
@@
-1050,7
+1079,7
@@
class RemoteExecutor(BaseExecutor):
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
)
src_bundle.backup_bundles.append(backup_bundle)
self.status.record_bundle_details_already_locked(backup_bundle)
- logger.debug(
f'{backup_bundle}: Created a backup bundle'
)
+ logger.debug(
'%s: Created a backup bundle', backup_bundle
)
return backup_bundle
def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
return backup_bundle
def schedule_backup_for_bundle(self, src_bundle: BundleDetails):
@@
-1058,7
+1087,7
@@
class RemoteExecutor(BaseExecutor):
assert src_bundle is not None
backup_bundle = self.create_backup_bundle(src_bundle)
logger.debug(
assert src_bundle is not None
backup_bundle = self.create_backup_bundle(src_bundle)
logger.debug(
- f'{backup_bundle.uuid}/{backup_bundle.fname}: Scheduling backup for execution...'
+ '%s/%s: Scheduling backup for execution...', backup_bundle.uuid, backup_bundle.fname
)
self._helper_executor.submit(self.launch, backup_bundle)
)
self._helper_executor.submit(self.launch, backup_bundle)
@@
-1066,7
+1095,7
@@
class RemoteExecutor(BaseExecutor):
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
fut.Future
:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
Optional[fut.Future]
:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
@@
-1080,15
+1109,17
@@
class RemoteExecutor(BaseExecutor):
if bundle.failure_count > retry_limit:
logger.error(
if bundle.failure_count > retry_limit:
logger.error(
- f'{bundle}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+ '%s: Tried this bundle too many times already (%dx); giving up.',
+ bundle,
+ retry_limit,
)
if is_original:
raise RemoteExecutorException(
)
if is_original:
raise RemoteExecutorException(
- f'{bundle}: This bundle can\'t be completed despite several backups and retries'
+ f'{bundle}: This bundle can\'t be completed despite several backups and retries'
,
)
else:
logger.error(
)
else:
logger.error(
- f'{bundle}: At least it\'s only a backup; better luck with the others.'
+ '%s: At least it\'s only a backup; better luck with the others.', bundle
)
return None
else:
)
return None
else:
@@
-1107,29
+1138,35
@@
class RemoteExecutor(BaseExecutor):
return self._helper_executor.submit(self.launch, bundle)
@overrides
return self._helper_executor.submit(self.launch, bundle)
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
if not self.already_shutdown:
- logging.debug(
f'Shutting down RemoteExecutor {self.title}'
)
+ logging.debug(
'Shutting down RemoteExecutor %s', self.title
)
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
- print(self.histogram.__repr__(label_formatter='%d'))
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
@singleton
class DefaultExecutors(object):
self.already_shutdown = True
@singleton
class DefaultExecutors(object):
+ """A container for a default thread, process and remote executor.
+ These are not created until needed and we take care to clean up
+ before process exit.
+
+ """
+
def __init__(self):
self.thread_executor: Optional[ThreadExecutor] = None
self.process_executor: Optional[ProcessExecutor] = None
self.remote_executor: Optional[RemoteExecutor] = None
def __init__(self):
self.thread_executor: Optional[ThreadExecutor] = None
self.process_executor: Optional[ProcessExecutor] = None
self.remote_executor: Optional[RemoteExecutor] = None
- def ping(self, host) -> bool:
- logger.debug(f'RUN> ping -c 1 {host}')
+ @staticmethod
+ def ping(host) -> bool:
+ logger.debug('RUN> ping -c 1 %s', host)
try:
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
return x == 0
except Exception:
return False
@@
-1154,7
+1191,7
@@
class DefaultExecutors(object):
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
- weight=
30
,
+ weight=
24
,
count=6,
),
)
count=6,
),
)
@@
-1174,8
+1211,8
@@
class DefaultExecutors(object):
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
- weight=
25
,
- count=
10
,
+ weight=
14
,
+ count=
8
,
),
)
if self.ping('puma.cabin'):
),
)
if self.ping('puma.cabin'):
@@
-1184,7
+1221,7
@@
class DefaultExecutors(object):
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
- weight=
30
,
+ weight=
24
,
count=6,
),
)
count=6,
),
)
@@
-1194,7
+1231,7
@@
class DefaultExecutors(object):
RemoteWorkerRecord(
username='scott',
machine='backup.house',
RemoteWorkerRecord(
username='scott',
machine='backup.house',
- weight=
8
,
+ weight=
9
,
count=2,
),
)
count=2,
),
)
@@
-1202,7
+1239,7
@@
class DefaultExecutors(object):
# The controller machine has a lot to do; go easy on it.
for record in pool:
if record.machine == platform.node() and record.count > 1:
# The controller machine has a lot to do; go easy on it.
for record in pool:
if record.machine == platform.node() and record.count > 1:
- logger.info(
f'Reducing workload for {record.machine}.'
)
+ logger.info(
'Reducing workload for %s.', record.machine
)
record.count = 1
policy = WeightedRandomRemoteWorkerSelectionPolicy()
record.count = 1
policy = WeightedRandomRemoteWorkerSelectionPolicy()
@@
-1212,11
+1249,11
@@
class DefaultExecutors(object):
def shutdown(self) -> None:
if self.thread_executor is not None:
def shutdown(self) -> None:
if self.thread_executor is not None:
- self.thread_executor.shutdown()
+ self.thread_executor.shutdown(
wait=True, quiet=True
)
self.thread_executor = None
if self.process_executor is not None:
self.thread_executor = None
if self.process_executor is not None:
- self.process_executor.shutdown()
+ self.process_executor.shutdown(
wait=True, quiet=True
)
self.process_executor = None
if self.remote_executor is not None:
self.process_executor = None
if self.remote_executor is not None:
- self.remote_executor.shutdown()
+ self.remote_executor.shutdown(
wait=True, quiet=True
)
self.remote_executor = None
self.remote_executor = None