)
-@persistent.persistent_autoloaded_singleton()
+@persistent.persistent_autoloaded_singleton() # type: ignore
class Arper(persistent.Persistent):
def __init__(self, cached_state: Optional[BiDict] = None) -> None:
self.state = BiDict()
from collections import defaultdict
import logging
import re
-from typing import Dict, List, Set
+from typing import Dict, List, Optional, Set
import warnings
# Note: this module is fairly early loaded. Be aware of dependencies.
] = defaultdict(dict)
self.names_by_mac: Dict[str, str] = {}
self.dark_locations: Set[Location] = set()
- self.last_update = None
+ self.last_update: Optional[datetime.datetime] = None
def maybe_update(self) -> None:
if self.last_update is None:
import functools
import logging
import os
+import importlib
from inspect import stack
+from typing import List
import sys
# This module is commonly used by others in here and should avoid
original_hook(exc_type, exc_value, exc_tb)
-class ImportInterceptor(object):
+class ImportInterceptor(importlib.abc.MetaPathFinder):
def __init__(self):
import collect.trie
def should_ignore_filename(self, filename: str) -> bool:
return 'importlib' in filename or 'six.py' in filename
+ def find_module(self, fullname, path):
+ raise Exception(
+ "This method has been deprecated since Python 3.4, please upgrade."
+ )
+
def find_spec(self, loaded_module, path=None, target=None):
s = stack()
for x in range(3, len(s)):
logger.debug(msg)
print(msg)
+ def invalidate_caches(self):
+ pass
+
def find_importer(self, module: str):
if module in self.tree_node_by_module:
node = self.tree_node_by_module[module]
for arg in sys.argv:
if arg == '--audit_import_events':
import_interceptor = ImportInterceptor()
- sys.meta_path = [import_interceptor] + sys.meta_path
+ sys.meta_path.insert(0, import_interceptor)
def dump_all_objects() -> None:
logger.info(msg)
return func(*args, **kwargs)
- wrapper_debug_count_calls.num_calls = 0
+ wrapper_debug_count_calls.num_calls = 0 # type: ignore
return wrapper_debug_count_calls
logger.debug(f"Returning memoized value for {func.__name__}")
return wrapper_memoized.cache[cache_key]
- wrapper_memoized.cache = dict()
+ wrapper_memoized.cache = dict() # type: ignore
return wrapper_memoized
def coalesce(
inputs: Iterator[Dict[Any, Any]],
*,
- aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list,
+ aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
) -> Dict[Any, Any]:
"""Merge N dicts into one dict containing the union of all keys /
values in the input dicts. When keys collide, apply the
['scott', '555-1212', '123 main st.', '12345']
"""
- r = ([], [])
+ r: Tuple[List[Any], List[Any]] = ([], [])
for (k, v) in d.items():
r[0].append(k)
r[1].append(v)
end_ts: float
slower_than_local_p95: bool
slower_than_global_p95: bool
- src_bundle: BundleDetails
+ src_bundle: Optional[BundleDetails]
is_cancelled: threading.Event
was_cancelled: bool
backup_bundles: Optional[List[BundleDetails]]
self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
self.start_time: float = time.time()
- self.start_per_bundle: Dict[str, float] = defaultdict(float)
+ self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
self.end_per_bundle[uuid] = ts
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
- bundle_latency = ts - self.start_per_bundle[uuid]
+ start = self.start_per_bundle[uuid]
+ assert start
+ bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
- self, p: subprocess.Popen, bundle: BundleDetails, depth: int
+ self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
+ assert p
pid = p.pid
if depth > 3:
logger.error(
# Tell the original to stop if we finished first.
if not was_cancelled:
+ orig_bundle = bundle.src_bundle
+ assert orig_bundle
logger.debug(
- f'{bundle}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+ f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
- bundle.src_bundle.is_cancelled.set()
+ orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> fut.Future:
+ def emergency_retry_nasty_bundle(
+ self, bundle: BundleDetails
+ ) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
from typing import Optional
import glob
from os.path import isfile, join, exists
-from typing import List
+from typing import List, TextIO
from uuid import uuid4
def set_file_raw_atime(filename: str, atime: float):
mtime = get_file_raw_mtime(filename)
+ assert mtime
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
atime = get_file_raw_atime(filename)
+ assert atime
os.utime(filename, (atime, mtime))
return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
-def touch_file(filename: str, *, mode: Optional[int] = 0o666) -> bool:
- return pathlib.Path(filename, mode=mode).touch()
+def touch_file(filename: str, *, mode: Optional[int] = 0o666):
+ pathlib.Path(filename, mode=mode).touch()
def expand_globs(in_filename: str):
self.filename = filename
uuid = uuid4()
self.tempfile = f'{filename}-{uuid}.tmp'
- self.handle = None
+ self.handle: Optional[TextIO] = None
- def __enter__(self) -> io.TextIOWrapper:
+ def __enter__(self) -> TextIO:
assert not does_path_exist(self.tempfile)
self.handle = open(self.tempfile, mode="w")
return self.handle
- def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
+ def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
if self.handle is not None:
self.handle.close()
cmd = f'/bin/mv -f {self.tempfile} {self.filename}'
return txt
max_label_width: Optional[int] = None
- lowest_start: int = None
- highest_end: int = None
+ lowest_start: Optional[int] = None
+ highest_end: Optional[int] = None
for bucket in sorted(self.buckets, key=lambda x: x[0]):
start = bucket[0]
if lowest_start is None:
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
+ assert mtime
now = datetime.datetime.now()
return mtime.month == now.month and mtime.day == now.day and mtime.year == now.year
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
+ assert mtime
now = datetime.datetime.now()
return (now - mtime).total_seconds() <= limit_seconds
self.instance = None
def __call__(self, cls: Persistent):
- @functools.wraps(cls)
def _load(*args, **kwargs):
# If class has already been loaded, act like a singleton
import concurrent.futures as fut
import logging
import traceback
-from typing import Callable, List, TypeVar
+from typing import Callable, List, Set, TypeVar
from overrides import overrides
):
real_futures = []
smart_future_by_real_future = {}
- completed_futures = set()
- for f in futures:
- assert type(f) == SmartFuture
- real_futures.append(f.wrapped_future)
- smart_future_by_real_future[f.wrapped_future] = f
+ completed_futures: Set[fut.Future] = set()
+ for x in futures:
+ assert type(x) == SmartFuture
+ real_futures.append(x.wrapped_future)
+ smart_future_by_real_future[x.wrapped_future] = x
while len(completed_futures) != len(real_futures):
newly_completed_futures = concurrent.futures.as_completed(real_futures)
log_exceptions: bool = True,
) -> None:
real_futures = []
- for f in futures:
- assert type(f) == SmartFuture
- real_futures.append(f.wrapped_future)
+ for x in futures:
+ assert type(x) == SmartFuture
+ real_futures.append(x.wrapped_future)
(done, not_done) = concurrent.futures.wait(
real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
def background_thread(
_funct: Optional[Callable],
-) -> Tuple[threading.Thread, threading.Event]:
+) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
"""A function decorator to create a background thread.
*** Please note: the decorated function must take an shutdown ***