from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
- assert start
+ assert start is not None
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
- assert p
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
- assert orig_bundle
+ assert orig_bundle is not None
logger.debug(
f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)