from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle: BundleDetails
+ src_bundle: Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str, float] = defaultdict(float)
+ self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start is not None
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+ self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
- bundle.src_bundle.is_cancelled.set()
+ orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+ def emergency_retry_nasty_bundle(
+ self, bundle: BundleDetails
+ ) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine