projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Stop spamming the log from bg.
[python_utils.git]
/
executors.py
diff --git
a/executors.py
b/executors.py
index 990df03f19af253773c72e23eac201a9163e2931..b4cb06b4c9ad80816f0bb13e05e23863846fa8c1 100644
(file)
--- a/
executors.py
+++ b/
executors.py
@@
-2,33
+2,32
@@
from __future__ import annotations
from __future__ import annotations
-from abc import ABC, abstractmethod
import concurrent.futures as fut
import concurrent.futures as fut
-from collections import defaultdict
-from dataclasses import dataclass
import logging
import logging
-import numpy
import os
import platform
import random
import subprocess
import threading
import time
import os
import platform
import random
import subprocess
import threading
import time
-from typing import Any, Callable, Dict, List, Optional, Set
import warnings
import warnings
+from abc import ABC, abstractmethod
+from collections import defaultdict
+from dataclasses import dataclass
+from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
import cloudpickle # type: ignore
+import numpy
from overrides import overrides
from overrides import overrides
-from ansi import bg, fg, underline, reset
import argparse_utils
import config
import argparse_utils
import config
-from decorator_utils import singleton
-from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
import histogram as hist
import histogram as hist
+from ansi import bg, fg, reset, underline
+from decorator_utils import singleton
+from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
from thread_utils import background_thread
from thread_utils import background_thread
-
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
@@
-346,7
+345,7
@@
class RemoteExecutorStatus:
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
- assert start
+ assert start
is not None
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
@@
-741,7
+740,7
@@
class RemoteExecutor(BaseExecutor):
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker
is not None
# Ok, found a worker.
bundle.worker = worker
# Ok, found a worker.
bundle.worker = worker
@@
-841,7
+840,7
@@
class RemoteExecutor(BaseExecutor):
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
- assert p
+ assert p
is not None
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
@@
-985,7
+984,7
@@
class RemoteExecutor(BaseExecutor):
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
- assert orig_bundle
+ assert orig_bundle
is not None
logger.debug(
f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
logger.debug(
f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)