X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;ds=inline;f=executors.py;h=b4cb06b4c9ad80816f0bb13e05e23863846fa8c1;hb=53de665d1eb5a95333b2ef937a7045af8bfbe5e0;hp=990df03f19af253773c72e23eac201a9163e2931;hpb=a4bf4d05230474ad14243d67ac7f8c938f670e58;p=python_utils.git diff --git a/executors.py b/executors.py index 990df03..b4cb06b 100644 --- a/executors.py +++ b/executors.py @@ -2,33 +2,32 @@ from __future__ import annotations -from abc import ABC, abstractmethod import concurrent.futures as fut -from collections import defaultdict -from dataclasses import dataclass import logging -import numpy import os import platform import random import subprocess import threading import time -from typing import Any, Callable, Dict, List, Optional, Set import warnings +from abc import ABC, abstractmethod +from collections import defaultdict +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore +import numpy from overrides import overrides -from ansi import bg, fg, underline, reset import argparse_utils import config -from decorator_utils import singleton -from exec_utils import run_silently, cmd_in_background, cmd_with_timeout import histogram as hist +from ansi import bg, fg, reset, underline +from decorator_utils import singleton +from exec_utils import cmd_in_background, cmd_with_timeout, run_silently from thread_utils import background_thread - logger = logging.getLogger(__name__) parser = config.add_commandline_args( @@ -346,7 +345,7 @@ class RemoteExecutorStatus: self.in_flight_bundles_by_worker[worker].remove(uuid) if not was_cancelled: start = self.start_per_bundle[uuid] - assert start + assert start is not None bundle_latency = ts - start x = self.finished_bundle_timings_per_worker.get(worker, list()) x.append(bundle_latency) @@ -741,7 +740,7 @@ class RemoteExecutor(BaseExecutor): worker = None while worker is None: worker = self.find_available_worker_or_block(avoid_machine) - assert worker + assert worker is not None # Ok, found a worker. bundle.worker = worker @@ -841,7 +840,7 @@ class RemoteExecutor(BaseExecutor): self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int ) -> Any: machine = bundle.machine - assert p + assert p is not None pid = p.pid if depth > 3: logger.error( @@ -985,7 +984,7 @@ class RemoteExecutor(BaseExecutor): # Tell the original to stop if we finished first. if not was_cancelled: orig_bundle = bundle.src_bundle - assert orig_bundle + assert orig_bundle is not None logger.debug( f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.' )