projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Used isort to sort imports. Also added to the git pre-commit hook.
[python_utils.git]
/
executors.py
diff --git
a/executors.py
b/executors.py
index e95ed716043b4962cd939b6d25885fd87826466a..34528a33c2d10236cd7527fe53ffd027fa8020ca 100644
(file)
--- a/
executors.py
+++ b/
executors.py
@@
-2,33
+2,32
@@
from __future__ import annotations
from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
@@
-248,7
+247,7
@@
class BundleDetails:
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle:
BundleDetails
+ src_bundle:
Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
@@
-288,7
+287,7
@@
class RemoteExecutorStatus:
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str,
float
] = defaultdict(float)
+ self.start_per_bundle: Dict[str,
Optional[float]
] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
@@
-345,7
+344,9
@@
class RemoteExecutorStatus:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
@@
-836,9
+837,10
@@
class RemoteExecutor(BaseExecutor):
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p:
subprocess.Popen
, bundle: BundleDetails, depth: int
+ self, p:
Optional[subprocess.Popen]
, bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
) -> Any:
machine = bundle.machine
+ assert p
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
@@
-981,10
+983,12
@@
class RemoteExecutor(BaseExecutor):
# Tell the original to stop if we finished first.
if not was_cancelled:
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle
logger.debug(
logger.debug(
- f'{bundle}: Notifying original {
bundle.src
_bundle.uuid} we beat them to it.'
+ f'{bundle}: Notifying original {
orig
_bundle.uuid} we beat them to it.'
)
)
-
bundle.src
_bundle.is_cancelled.set()
+
orig
_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
@@
-1068,7
+1072,9
@@
class RemoteExecutor(BaseExecutor):
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+ def emergency_retry_nasty_bundle(
+ self, bundle: BundleDetails
+ ) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine