projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Ignore integration test results in code coverage report.
[python_utils.git]
/
executors.py
diff --git
a/executors.py
b/executors.py
index e95ed716043b4962cd939b6d25885fd87826466a..69330129c9b86ec8c9710b2586b24d7c2dfee8f6 100644
(file)
--- a/
executors.py
+++ b/
executors.py
@@
-1,34
+1,34
@@
#!/usr/bin/env python3
#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
from __future__ import annotations
from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
@@
-147,9
+147,7
@@
class ThreadExecutor(BaseExecutor):
for arg in args:
newargs.append(arg)
start = time.time()
for arg in args:
newargs.append(arg)
start = time.time()
- result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
- )
+ result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@@
-248,7
+246,7
@@
class BundleDetails:
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle:
BundleDetails
+ src_bundle:
Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
@@
-288,11
+286,9
@@
class RemoteExecutorStatus:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str,
float
] = defaultdict(float)
+ self.start_per_bundle: Dict[str,
Optional[float]
] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
@@
-307,9
+303,7
@@
class RemoteExecutorStatus:
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
@@
-345,7
+339,9
@@
class RemoteExecutorStatus:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
@@
-471,9
+467,7
@@
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
grabbag.append(worker)
if len(grabbag) == 0:
grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
+ logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
@@
-502,9
+496,7
@@
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
return False
@overrides
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
x = self.index
while True:
worker = self.workers[x]
x = self.index
while True:
worker = self.workers[x]
@@
-543,9
+535,7
@@
class RemoteExecutor(BaseExecutor):
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
@@
-627,21
+617,15
@@
class RemoteExecutor(BaseExecutor):
if start_ts is not None:
runtime = now - start_ts
score += runtime
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug(f'score[{bundle}] => {score} # latency boost')
if bundle.slower_than_local_p95:
score += runtime / 2
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >worker p95')
if bundle.slower_than_global_p95:
score += runtime / 4
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >global p95')
# Prefer backups of bundles that don't
# have backups already.
# Prefer backups of bundles that don't
# have backups already.
@@
-658,9
+642,7
@@
class RemoteExecutor(BaseExecutor):
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
@@
-685,14
+667,10
@@
class RemoteExecutor(BaseExecutor):
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
with self.cv:
while not self.is_worker_available():
self.cv.wait()
@@
-739,7
+717,7
@@
class RemoteExecutor(BaseExecutor):
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker
is not None
# Ok, found a worker.
bundle.worker = worker
# Ok, found a worker.
bundle.worker = worker
@@
-756,9
+734,7
@@
class RemoteExecutor(BaseExecutor):
try:
return self.process_work_result(bundle)
except Exception as e:
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
@@
-787,9
+763,7
@@
class RemoteExecutor(BaseExecutor):
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
@@
-830,15
+804,14
@@
class RemoteExecutor(BaseExecutor):
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p:
subprocess.Popen
, bundle: BundleDetails, depth: int
+ self, p:
Optional[subprocess.Popen]
, bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
) -> Any:
machine = bundle.machine
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
@@
-856,9
+829,7
@@
class RemoteExecutor(BaseExecutor):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info(f'{bundle}: looks like another worker finished bundle...')
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
@@
-926,12
+897,9
@@
class RemoteExecutor(BaseExecutor):
break
run_silently(
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
)
+ logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
@@
-966,9
+934,7
@@
class RemoteExecutor(BaseExecutor):
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
- logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
- )
+ logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
@@
-981,10
+947,10
@@
class RemoteExecutor(BaseExecutor):
# Tell the original to stop if we finished first.
if not was_cancelled:
# Tell the original to stop if we finished first.
if not was_cancelled:
- logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
- )
-
bundle.src
_bundle.is_cancelled.set()
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
+
logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
+
orig
_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
@@
-1068,7
+1034,7
@@
class RemoteExecutor(BaseExecutor):
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
fut.Future
:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
Optional[fut.Future]
:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
@@
-1130,9
+1096,7
@@
class DefaultExecutors(object):
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
return x == 0
except Exception:
return False