projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Add another profanity.
[python_utils.git]
/
executors.py
diff --git
a/executors.py
b/executors.py
index 633b11b3b616cc2536ea514d274cbf2874a99282..69330129c9b86ec8c9710b2586b24d7c2dfee8f6 100644
(file)
--- a/
executors.py
+++ b/
executors.py
@@
-1,34
+1,34
@@
#!/usr/bin/env python3
#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
from __future__ import annotations
from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
@@
-84,10
+84,10
@@
class BaseExecutor(ABC):
pass
@abstractmethod
pass
@abstractmethod
- def shutdown(self,
wait: bool = Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
pass
pass
- def shutdown_if_idle(self) -> bool:
+ def shutdown_if_idle(self
, *, quiet: bool = False
) -> bool:
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
"""Shutdown the executor and return True if the executor is idle
(i.e. there are no pending or active tasks). Return False
otherwise. Note: this should only be called by the launcher
@@
-95,7
+95,7
@@
class BaseExecutor(ABC):
"""
if self.task_count == 0:
"""
if self.task_count == 0:
- self.shutdown()
+ self.shutdown(
wait=True, quiet=quiet
)
return True
return False
return True
return False
@@
-147,19
+147,18
@@
class ThreadExecutor(BaseExecutor):
for arg in args:
newargs.append(arg)
start = time.time()
for arg in args:
newargs.append(arg)
start = time.time()
- result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
- )
+ result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
logger.debug(f'Shutting down threadpool executor {self.title}')
if not self.already_shutdown:
logger.debug(f'Shutting down threadpool executor {self.title}')
- print(self.histogram.__repr__(label_formatter='%ds'))
self._thread_pool_executor.shutdown(wait)
self._thread_pool_executor.shutdown(wait)
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
self.already_shutdown = True
@@
-197,11
+196,12
@@
class ProcessExecutor(BaseExecutor):
return result
@overrides
return result
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
logger.debug(f'Shutting down processpool executor {self.title}')
self._process_executor.shutdown(wait)
if not self.already_shutdown:
logger.debug(f'Shutting down processpool executor {self.title}')
self._process_executor.shutdown(wait)
- print(self.histogram.__repr__(label_formatter='%ds'))
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
def __getstate__(self):
self.already_shutdown = True
def __getstate__(self):
@@
-246,7
+246,7
@@
class BundleDetails:
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle:
BundleDetails
+ src_bundle:
Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
@@
-286,11
+286,9
@@
class RemoteExecutorStatus:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str,
float
] = defaultdict(float)
+ self.start_per_bundle: Dict[str,
Optional[float]
] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
@@
-305,9
+303,7
@@
class RemoteExecutorStatus:
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
@@
-343,7
+339,9
@@
class RemoteExecutorStatus:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
@@
-469,9
+467,7
@@
class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
grabbag.append(worker)
if len(grabbag) == 0:
grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
+ logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
@@
-500,9
+496,7
@@
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
return False
@overrides
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
x = self.index
while True:
worker = self.workers[x]
x = self.index
while True:
worker = self.workers[x]
@@
-541,9
+535,7
@@
class RemoteExecutor(BaseExecutor):
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
@@
-625,21
+617,15
@@
class RemoteExecutor(BaseExecutor):
if start_ts is not None:
runtime = now - start_ts
score += runtime
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug(f'score[{bundle}] => {score} # latency boost')
if bundle.slower_than_local_p95:
score += runtime / 2
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >worker p95')
if bundle.slower_than_global_p95:
score += runtime / 4
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >global p95')
# Prefer backups of bundles that don't
# have backups already.
# Prefer backups of bundles that don't
# have backups already.
@@
-656,9
+642,7
@@
class RemoteExecutor(BaseExecutor):
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
@@
-683,14
+667,10
@@
class RemoteExecutor(BaseExecutor):
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
with self.cv:
while not self.is_worker_available():
self.cv.wait()
@@
-737,7
+717,7
@@
class RemoteExecutor(BaseExecutor):
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker
is not None
# Ok, found a worker.
bundle.worker = worker
# Ok, found a worker.
bundle.worker = worker
@@
-754,9
+734,7
@@
class RemoteExecutor(BaseExecutor):
try:
return self.process_work_result(bundle)
except Exception as e:
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
@@
-785,9
+763,7
@@
class RemoteExecutor(BaseExecutor):
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
@@
-828,15
+804,14
@@
class RemoteExecutor(BaseExecutor):
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p:
subprocess.Popen
, bundle: BundleDetails, depth: int
+ self, p:
Optional[subprocess.Popen]
, bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
) -> Any:
machine = bundle.machine
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
@@
-854,9
+829,7
@@
class RemoteExecutor(BaseExecutor):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info(f'{bundle}: looks like another worker finished bundle...')
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
@@
-924,12
+897,9
@@
class RemoteExecutor(BaseExecutor):
break
run_silently(
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
)
+ logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
@@
-964,9
+934,7
@@
class RemoteExecutor(BaseExecutor):
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
- logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
- )
+ logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
@@
-979,10
+947,10
@@
class RemoteExecutor(BaseExecutor):
# Tell the original to stop if we finished first.
if not was_cancelled:
# Tell the original to stop if we finished first.
if not was_cancelled:
- logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
- )
-
bundle.src
_bundle.is_cancelled.set()
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
+
logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
+
orig
_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
@@
-1066,7
+1034,7
@@
class RemoteExecutor(BaseExecutor):
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
fut.Future
:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) ->
Optional[fut.Future]
:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
@@
-1107,13
+1075,14
@@
class RemoteExecutor(BaseExecutor):
return self._helper_executor.submit(self.launch, bundle)
@overrides
return self._helper_executor.submit(self.launch, bundle)
@overrides
- def shutdown(self,
wait=Tru
e) -> None:
+ def shutdown(self,
*, wait: bool = True, quiet: bool = Fals
e) -> None:
if not self.already_shutdown:
logging.debug(f'Shutting down RemoteExecutor {self.title}')
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
if not self.already_shutdown:
logging.debug(f'Shutting down RemoteExecutor {self.title}')
self.heartbeat_stop_event.set()
self.heartbeat_thread.join()
self._helper_executor.shutdown(wait)
- print(self.histogram.__repr__(label_formatter='%ds'))
+ if not quiet:
+ print(self.histogram.__repr__(label_formatter='%ds'))
self.already_shutdown = True
self.already_shutdown = True
@@
-1127,9
+1096,7
@@
class DefaultExecutors(object):
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
return x == 0
except Exception:
return False
@@
-1212,11
+1179,11
@@
class DefaultExecutors(object):
def shutdown(self) -> None:
if self.thread_executor is not None:
def shutdown(self) -> None:
if self.thread_executor is not None:
- self.thread_executor.shutdown()
+ self.thread_executor.shutdown(
wait=True, quiet=True
)
self.thread_executor = None
if self.process_executor is not None:
self.thread_executor = None
if self.process_executor is not None:
- self.process_executor.shutdown()
+ self.process_executor.shutdown(
wait=True, quiet=True
)
self.process_executor = None
if self.remote_executor is not None:
self.process_executor = None
if self.remote_executor is not None:
- self.remote_executor.shutdown()
+ self.remote_executor.shutdown(
wait=True, quiet=True
)
self.remote_executor = None
self.remote_executor = None