class SimpleACL(ABC):
"""A simple Access Control List interface."""
- def __init__(
- self, *, order_to_check_allow_deny: Order, default_answer: bool
- ):
+ def __init__(self, *, order_to_check_allow_deny: Order, default_answer: bool):
if order_to_check_allow_deny not in (
Order.ALLOW_DENY,
Order.DENY_ALLOW,
green = 0
if blue is None:
blue = 0
- if (
- is_16color(red) and is_16color(green) and is_16color(blue)
- ) or force_16color:
+ if (is_16color(red) and is_16color(green) and is_16color(blue)) or force_16color:
logger.debug("Using 16-color strategy")
return fg_16color(red, green, blue)
if (
green = 0
if blue is None:
blue = 0
- if (
- is_16color(red) and is_16color(green) and is_16color(blue)
- ) or force_16color:
+ if (is_16color(red) and is_16color(green) and is_16color(blue)) or force_16color:
logger.debug("Using 16-color strategy")
return bg_16color(red, green, blue)
if (
class ActionNoYes(argparse.Action):
- def __init__(
- self, option_strings, dest, default=None, required=False, help=None
- ):
+ def __init__(self, option_strings, dest, default=None, required=False, help=None):
if default is None:
msg = 'You must provide a default with Yes/No action'
logger.critical(msg)
@overrides
def __call__(self, parser, namespace, values, option_strings=None):
- if option_strings.startswith('--no-') or option_strings.startswith(
- '--no_'
- ):
+ if option_strings.startswith('--no-') or option_strings.startswith('--no_'):
setattr(namespace, self.dest, False)
else:
setattr(namespace, self.dest, True)
mac = mac.lower()
ip = ip.strip()
cached_state[mac] = ip
- if (
- len(cached_state)
- > config.config['arper_min_entries_to_be_valid']
- ):
+ if len(cached_state) > config.config['arper_min_entries_to_be_valid']:
return cls(cached_state)
else:
msg = f'{cache_file} is invalid: only {len(cached_state)} entries. Deleting it.'
@overrides
def save(self) -> bool:
if len(self.state) > config.config['arper_min_entries_to_be_valid']:
- logger.debug(
- f'Persisting state to {config.config["arper_cache_location"]}'
- )
- with file_utils.FileWriter(
- config.config['arper_cache_location']
- ) as wf:
+ logger.debug(f'Persisting state to {config.config["arper_cache_location"]}')
+ with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
for (mac, ip) in self.state.items():
mac = mac.lower()
print(f'{mac}, {ip}', file=wf)
delta = now - self.last_update
if (
delta.total_seconds()
- > config.config[
- 'presence_tolerable_staleness_seconds'
- ].total_seconds()
+ > config.config['presence_tolerable_staleness_seconds'].total_seconds()
):
logger.debug(
f"It's been {delta.total_seconds()}s since last update; refreshing now."
warnings.warn(msg, stacklevel=2)
self.dark_locations.add(Location.HOUSE)
- def read_persisted_macs_file(
- self, filename: str, location: Location
- ) -> None:
+ def read_persisted_macs_file(self, filename: str, location: Location) -> None:
if location is Location.UNKNOWN:
return
with open(filename, "r") as rf:
logger.exception(e)
continue
mac = mac.strip()
- (self.location_ts_by_mac[location])[
- mac
- ] = datetime.datetime.fromtimestamp(int(ts.strip()))
+ (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
+ int(ts.strip())
+ )
ip_name = ip_name.strip()
match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
if match is not None:
def is_anyone_in_location_now(self, location: Location) -> bool:
self.maybe_update()
if location in self.dark_locations:
- raise Exception(
- f"Can't see {location} right now; answer undefined."
- )
+ raise Exception(f"Can't see {location} right now; answer undefined.")
for person in Person:
if person is not None:
loc = self.where_is_person_now(person)
def where_is_person_now(self, name: Person) -> Location:
self.maybe_update()
if len(self.dark_locations) > 0:
- msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
+ msg = (
+ f"Can't see {self.dark_locations} right now; answer confidence impacted"
+ )
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
logger.debug(f'Looking for {name}...')
if mac not in self.names_by_mac:
continue
mac_name = self.names_by_mac[mac]
- logger.debug(
- f'Looking for {name}... check for mac {mac} ({mac_name})'
- )
+ logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
for location in self.location_ts_by_mac:
if mac in self.location_ts_by_mac[location]:
ts = (self.location_ts_by_mac[location])[mac]
- logger.debug(
- f'Seen {mac} ({mac_name}) at {location} since {ts}'
- )
+ logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
tiebreaks[location] = ts
(
v = votes.get(most_recent_location, 0)
votes[most_recent_location] = v + bonus
logger.debug(f'{name}: {location} gets {bonus} votes.')
- credit = int(
- credit * 0.2
- ) # Note: list most important devices first
+ credit = int(credit * 0.2) # Note: list most important devices first
if credit <= 0:
credit = 1
if len(votes) > 0:
loading_module = self.module_by_filename_cache[filename]
else:
self.repopulate_modules_by_filename()
- loading_module = self.module_by_filename_cache.get(
- filename, 'unknown'
- )
+ loading_module = self.module_by_filename_cache.get(filename, 'unknown')
path = self.tree_node_by_module.get(loading_module, [])
path.extend([loaded_module])
with stopwatch.Timer() as t:
ret = entry_point(*args, **kwargs)
- logger.debug(
- f'{entry_point.__name__} (program entry point) returned {ret}.'
- )
+ logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.')
if config.config['dump_all_objects']:
dump_all_objects()
response = requests.get(url, stream=False, timeout=10.0)
if response.ok:
raw = response.content
- logger.debug(
- f'Read {len(response.content)} byte image from HTTP server'
- )
+ logger.debug(f'Read {len(response.content)} byte image from HTTP server')
tmp = np.frombuffer(raw, dtype="uint8")
logger.debug(
f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.'
camera_name, width=width, quality=quality
)
if raw is None:
- logger.debug(
- "Reading from video server failed; trying direct RTSP stream"
- )
+ logger.debug("Reading from video server failed; trying direct RTSP stream")
raw = fetch_camera_image_from_rtsp_stream(camera_name, width=width)
if raw is not None and len(raw) > 0:
tmp = np.frombuffer(raw, dtype="uint8")
jpg=jpg,
hsv=hsv,
)
- msg = (
- "Failed to retieve image from both video server and direct RTSP stream"
- )
+ msg = "Failed to retieve image from both video server and direct RTSP stream"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
return RawJpgHsv(None, None, None)
if loadfile is not None:
if saw_other_args:
- msg = (
- f'Augmenting commandline arguments with those from {loadfile}.'
- )
+ msg = f'Augmenting commandline arguments with those from {loadfile}.'
print(msg, file=sys.stderr)
saved_messages.append(msg)
if not os.path.exists(loadfile):
sys.exit(-1)
with open(loadfile, 'r') as rf:
newargs = rf.readlines()
- newargs = [
- arg.strip('\n') for arg in newargs if 'config_savefile' not in arg
- ]
+ newargs = [arg.strip('\n') for arg in newargs if 'config_savefile' not in arg]
sys.argv += newargs
# Parse (possibly augmented, possibly completely overwritten)
lambda c: c * 1.8 + 32.0,
"°F",
),
- "Celsius": Converter(
- "Celsius", "temperature", lambda c: c, lambda c: c, "°C"
- ),
+ "Celsius": Converter("Celsius", "temperature", lambda c: c, lambda c: c, "°C"),
"Kelvin": Converter(
"Kelvin",
"temperature",
return _convert(magnitude, src, dst)
-def _convert(
- magnitude: Number, from_unit: Converter, to_unit: Converter
-) -> float:
+def _convert(magnitude: Number, from_unit: Converter, to_unit: Converter) -> float:
canonical = from_unit.to_canonical(magnitude)
converted = to_unit.from_canonical(canonical)
return float(converted)
return not is_timezone_aware(dt)
-def replace_timezone(
- dt: datetime.datetime, tz: datetime.tzinfo
-) -> datetime.datetime:
+def replace_timezone(dt: datetime.datetime, tz: datetime.tzinfo) -> datetime.datetime:
"""
Replaces the timezone on a datetime object directly (leaving
the year, month, day, hour, minute, second, micro, etc... alone).
)
-def replace_time_timezone(
- t: datetime.time, tz: datetime.tzinfo
-) -> datetime.time:
+def replace_time_timezone(t: datetime.time, tz: datetime.tzinfo) -> datetime.time:
"""
Replaces the timezone on a datetime.time directly without performing
any translation.
return t.replace(tzinfo=tz)
-def translate_timezone(
- dt: datetime.datetime, tz: datetime.tzinfo
-) -> datetime.datetime:
+def translate_timezone(dt: datetime.datetime, tz: datetime.tzinfo) -> datetime.datetime:
"""
Translates dt into a different timezone by adjusting the year, month,
day, hour, minute, second, micro, etc... appropriately. The returned
return wrapper_invocation_logged
-def rate_limited(
- n_calls: int, *, per_period_in_seconds: float = 1.0
-) -> Callable:
+def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
"""Limit invocation of a wrapped function to n calls per period.
Thread safe. In testing this was relatively fair with multiple
threads using it though that hasn't been measured.
@functools.wraps(func)
def wrapper_debug_count_calls(*args, **kwargs):
wrapper_debug_count_calls.num_calls += 1
- msg = (
- f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
- )
+ msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
print(msg)
logger.info(msg)
return func(*args, **kwargs)
@functools.wraps(func)
def wrapper_delay(*args, **kwargs):
if when & DelayWhen.BEFORE_CALL:
- logger.debug(
- f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
- )
+ logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}")
time.sleep(seconds)
retval = func(*args, **kwargs)
if when & DelayWhen.AFTER_CALL:
- logger.debug(
- f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
- )
+ logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}")
time.sleep(seconds)
return retval
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
- logger.debug(
- f"Memoizing {cache_key} => {value} for {func.__name__}"
- )
+ logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}")
wrapper_memoized.cache[cache_key] = value
else:
logger.debug(f"Returning memoized value for {func.__name__}")
if random.uniform(0, 1) < sample_rate:
return f(*args, **kwargs)
else:
- logger.debug(
- f"@call_with_sample_rate skipping a call to {f.__name__}"
- )
+ logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}")
return _call_with_sample_rate
return DeferredOperand.resolve(self) is DeferredOperand.resolve(other)
def is_not(self, other):
- return DeferredOperand.resolve(self) is not DeferredOperand.resolve(
- other
- )
+ return DeferredOperand.resolve(self) is not DeferredOperand.resolve(other)
def __abs__(self):
return abs(DeferredOperand.resolve(self))
def __getattr__(self, method_name):
def method(*args, **kwargs):
- return getattr(DeferredOperand.resolve(self), method_name)(
- *args, **kwargs
- )
+ return getattr(DeferredOperand.resolve(self), method_name)(*args, **kwargs)
return method
return min(d.keys())
-def parallel_lists_to_dict(
- keys: List[Any], values: List[Any]
-) -> Dict[Any, Any]:
+def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
"""Given two parallel lists (keys and values), create and return
a dict.
"""
if len(keys) != len(values):
- raise Exception(
- "Parallel keys and values lists must have the same length"
- )
+ raise Exception("Parallel keys and values lists must have the same length")
return dict(zip(keys, values))
mtime = file_utils.get_file_raw_mtime(filename)
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
- logger.debug(
- f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})'
- )
+ logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
self.mtime_by_filename[filename] = mtime
self.md5_by_filename[filename] = md5
subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
"""
- return subprocess.check_call(
- ["/bin/bash", "-c", command], timeout=timeout_seconds
- )
+ return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
)
-def cmd_in_background(
- command: str, *, silent: bool = False
-) -> subprocess.Popen:
+def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
args = shlex.split(command)
if silent:
subproc = subprocess.Popen(
def kill_subproc() -> None:
try:
if subproc.poll() is None:
- logger.info(
- "At exit handler: killing {}: {}".format(subproc, command)
- )
+ logger.info("At exit handler: killing {}: {}".format(subproc, command))
subproc.terminate()
subproc.wait(timeout=10.0)
except BaseException as be:
self.adjust_task_count(+1)
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
- result.add_done_callback(
- lambda _: self.histogram.add_item(time.time() - start)
- )
+ result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
return result
@overrides
self.finished_bundle_timings_per_worker: Dict[
RemoteWorkerRecord, List[float]
] = {}
- self.in_flight_bundles_by_worker: Dict[
- RemoteWorkerRecord, Set[str]
- ] = {}
+ self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
# as a memory fence for modifications to bundle.
self.lock: threading.Lock = threading.Lock()
- def record_acquire_worker(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
with self.lock:
self.record_bundle_details_already_locked(details)
- def record_bundle_details_already_locked(
- self, details: BundleDetails
- ) -> None:
+ def record_bundle_details_already_locked(self, details: BundleDetails) -> None:
assert self.lock.locked()
self.bundle_details_by_uuid[details.uuid] = details
was_cancelled: bool,
) -> None:
with self.lock:
- self.record_release_worker_already_locked(
- worker, uuid, was_cancelled
- )
+ self.record_release_worker_already_locked(worker, uuid, was_cancelled)
def record_release_worker_already_locked(
self,
ret += f' ...{in_flight} bundles currently in flight:\n'
for bundle_uuid in self.in_flight_bundles_by_worker[worker]:
details = self.bundle_details_by_uuid.get(bundle_uuid, None)
- pid = (
- str(details.pid)
- if (details and details.pid != 0)
- else "TBD"
- )
+ pid = str(details.pid) if (details and details.pid != 0) else "TBD"
if self.start_per_bundle[bundle_uuid] is not None:
sec = ts - self.start_per_bundle[bundle_uuid]
ret += f' (pid={pid}): {details} for {sec:.1f}s so far '
assert self.lock.locked()
self.total_bundles_submitted = total_bundles_submitted
ts = time.time()
- if (
- self.last_periodic_dump is None
- or ts - self.last_periodic_dump > 5.0
- ):
+ if self.last_periodic_dump is None or ts - self.last_periodic_dump > 5.0:
print(self)
self.last_periodic_dump = ts
pass
@abstractmethod
- def acquire_worker(
- self, machine_to_avoid=None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
pass
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid=None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
for worker in self.workers:
for x in range(0, worker.count):
break
for uuid in bundle_uuids:
- bundle = self.status.bundle_details_by_uuid.get(
- uuid, None
- )
+ bundle = self.status.bundle_details_by_uuid.get(uuid, None)
if (
bundle is not None
and bundle.src_bundle is None
logger.critical(msg)
raise Exception(msg)
- def release_worker(
- self, bundle: BundleDetails, *, was_cancelled=True
- ) -> None:
+ def release_worker(self, bundle: BundleDetails, *, was_cancelled=True) -> None:
worker = bundle.worker
assert worker is not None
logger.debug(f'Released worker {worker}')
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ cmd = (
+ f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ )
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.debug(
- f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s."
- )
+ logger.debug(f"{bundle}: Copying to {worker} took {xfer_latency:.1f}s.")
except Exception as e:
self.release_worker(bundle)
if is_original:
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"'
)
- logger.debug(
- f'{bundle}: Executing {cmd} in the background to kick off work...'
- )
+ logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug(
# Re-raise the exception; the code in wait_for_process may
# decide to emergency_retry_nasty_bundle here.
raise Exception(e)
- logger.debug(
- f'Removing local (master) {code_file} and {result_file}.'
- )
+ logger.debug(f'Removing local (master) {code_file} and {result_file}.')
os.remove(f'{result_file}')
os.remove(f'{code_file}')
return get_file_timestamp_timedelta(filename, lambda x: x.st_mtime)
-def describe_file_timestamp(
- filename: str, extractor, *, brief=False
-) -> Optional[str]:
+def describe_file_timestamp(filename: str, extractor, *, brief=False) -> Optional[str]:
from datetime_utils import describe_duration, describe_duration_briefly
age = get_file_timestamp_age_seconds(filename, extractor)
audio_transcription=audio_transcription,
)
else:
- message = (
- f'HTTP request to {url} with {payload} failed; code {r.status_code}'
- )
+ message = f'HTTP request to {url} with {payload} failed; code {r.status_code}'
logger.error(message)
return GoogleResponse(
success=False,
right_end="",
)
label = f'{label_formatter}..{label_formatter}' % (start, end)
- txt += (
- f'{label:20}: '
- + bar
- + f"({pop/self.count*100.0:5.2f}% n={pop})\n"
- )
+ txt += f'{label:20}: ' + bar + f"({pop/self.count*100.0:5.2f}% n={pop})\n"
if start == last_bucket_start:
break
return txt
import decorator_utils
-cfg = config.add_commandline_args(
- f'Lockfile ({__file__})', 'Args related to lockfiles'
-)
+cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
cfg.add_argument(
'--lockfile_held_duration_warning_threshold_sec',
type=float,
duration
>= config.config['lockfile_held_duration_warning_threshold_sec']
):
- str_duration = datetime_utils.describe_duration_briefly(
- duration
- )
+ str_duration = datetime_utils.describe_duration_briefly(duration)
msg = f'Held {self.lockfile} for {str_duration}'
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
import argparse_utils
import config
-cfg = config.add_commandline_args(
- f'Logging ({__file__})', 'Args related to logging'
-)
+cfg = config.add_commandline_args(f'Logging ({__file__})', 'Args related to logging')
cfg.add_argument(
'--logging_config_file',
type=argparse_utils.valid_filename,
if id1 not in squelched_logging_counts:
return True
threshold = squelched_logging_counts[id1]
- logsite = (
- f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
- )
+ logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
count = self.counters[logsite]
self.counters[logsite] += 1
return count < threshold
if config.config['logging_syslog']:
if sys.platform not in ('win32', 'cygwin'):
if config.config['logging_syslog_facility']:
- facility_name = (
- 'LOG_' + config.config['logging_syslog_facility']
- )
- facility = SysLogHandler.__dict__.get(
- facility_name, SysLogHandler.LOG_USER
- )
+ facility_name = 'LOG_' + config.config['logging_syslog_facility']
+ facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER)
handler = SysLogHandler(facility=facility, address='/dev/log')
handler.setFormatter(
MillisecondAwareFormatter(
level_name = logging._levelToName.get(
default_logging_level, str(default_logging_level)
)
- logger.debug(
- f'Initialized global logging; default logging level is {level_name}.'
- )
+ logger.debug(f'Initialized global logging; default logging level is {level_name}.')
if (
config.config['logging_clear_preexisting_handlers']
and preexisting_handlers_count > 0
self.logger = logger
if filenames is not None:
- self.f = [
- open(filename, 'wb', buffering=0) for filename in filenames
- ]
+ self.f = [open(filename, 'wb', buffering=0) for filename in filenames]
else:
if destination_bitv & OutputMultiplexer.FILENAMES:
- raise ValueError(
- "Filenames argument is required if bitv & FILENAMES"
- )
+ raise ValueError("Filenames argument is required if bitv & FILENAMES")
self.f = None
if handles is not None:
self.h = [handle for handle in handles]
else:
if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
- raise ValueError(
- "Handle argument is required if bitv & FILEHANDLES"
- )
+ raise ValueError("Handle argument is required if bitv & FILEHANDLES")
self.h = None
self.set_destination_bitv(destination_bitv)
def set_destination_bitv(self, destination_bitv: int):
if destination_bitv & self.Destination.FILENAMES and self.f is None:
- raise ValueError(
- "Filename argument is required if bitv & FILENAMES"
- )
+ raise ValueError("Filename argument is required if bitv & FILENAMES")
if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
- raise ValueError(
- "Handle argument is required if bitv & FILEHANDLES"
- )
+ raise ValueError("Handle argument is required if bitv & FILEHANDLES")
self.destination_bitv = destination_bitv
def print(self, *args, **kwargs):
end = "\n"
if end == '\n':
buf += '\n'
- if (
- self.destination_bitv & self.Destination.FILENAMES
- and self.f is not None
- ):
+ if self.destination_bitv & self.Destination.FILENAMES and self.f is not None:
for _ in self.f:
_.write(buf.encode('utf-8'))
_.flush()
- if (
- self.destination_bitv & self.Destination.FILEHANDLES
- and self.h is not None
- ):
+ if self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None:
for _ in self.h:
_.write(buf)
_.flush()
docid: str # a unique idenfier for the document
tags: Set[str] # an optional set of tags
- properties: List[
- Tuple[str, str]
- ] # an optional set of key->value properties
+ properties: List[Tuple[str, str]] # an optional set of key->value properties
reference: Any # an optional reference to something else
def __init__(self) -> None:
self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set)
- self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(
- set
- )
+ self.docids_by_property: Dict[Tuple[str, str], Set[str]] = defaultdict(set)
self.docids_with_property: Dict[str, Set[str]] = defaultdict(set)
self.documents_by_docid: Dict[str, Document] = {}
"""Invert a set of docids."""
return set(
- [
- docid
- for docid in self.documents_by_docid.keys()
- if docid not in original
- ]
+ [docid for docid in self.documents_by_docid.keys() if docid not in original]
)
def get_doc(self, docid: str) -> Optional[Document]:
ok = True
break
if not ok:
- raise ParseError(
- "Unbalanced parenthesis in query expression"
- )
+ raise ParseError("Unbalanced parenthesis in query expression")
# and, or, not
else:
raise ParseError(f"Unexpected query {tag}")
elif self.op is Operation.DISJUNCTION:
if len(evaled_operands) != 2:
- raise ParseError(
- "Operation.DISJUNCTION (or) expects two operands."
- )
+ raise ParseError("Operation.DISJUNCTION (or) expects two operands.")
retval.update(evaled_operands[0])
retval.update(evaled_operands[1])
elif self.op is Operation.CONJUNCTION:
if len(evaled_operands) != 2:
- raise ParseError(
- "Operation.CONJUNCTION (and) expects two operands."
- )
+ raise ParseError("Operation.CONJUNCTION (and) expects two operands.")
retval.update(evaled_operands[0])
retval = retval.intersection(evaled_operands[1])
elif self.op is Operation.INVERSION:
if len(evaled_operands) != 1:
- raise ParseError(
- "Operation.INVERSION (not) expects one operand."
- )
+ raise ParseError("Operation.INVERSION (not) expects one operand.")
_ = evaled_operands[0]
if isinstance(_, set):
retval.update(self.corpus.invert_docid_set(_))
def parallelize(
- _funct: typing.Optional[typing.Callable] = None,
- *,
- method: Method = Method.THREAD
+ _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
) -> typing.Callable:
"""Usage:
mtime = file_utils.get_file_mtime_as_datetime(filename)
now = datetime.datetime.now()
- return (
- mtime.month == now.month
- and mtime.day == now.day
- and mtime.year == now.year
- )
+ return mtime.month == now.month and mtime.day == now.day and mtime.year == now.year
def was_file_written_within_n_seconds(
# Otherwise, try to load it from persisted state.
was_loaded = False
- logger.debug(
- f'Attempting to load {cls.__name__} from persisted state.'
- )
+ logger.debug(f'Attempting to load {cls.__name__} from persisted state.')
self.instance = cls.load()
if not self.instance:
msg = 'Loading from cache failed.'
logger.warning(msg)
- logger.debug(
- f'Attempting to instantiate {cls.__name__} directly.'
- )
+ logger.debug(f'Attempting to instantiate {cls.__name__} directly.')
self.instance = cls(*args, **kwargs)
else:
logger.debug(
result = result.replace('3', 'e')
for x in string.punctuation:
result = result.replace(x, "")
- chunks = [
- self.stemmer.stem(word) for word in nltk.word_tokenize(result)
- ]
+ chunks = [self.stemmer.stem(word) for word in nltk.word_tokenize(result)]
return ' '.join(chunks)
def tokenize(self, text: str):
network_netmask='255.255.255.0',
network_router_ip='10.0.0.1',
presence_location=Location.HOUSE,
- is_anyone_present=lambda x=Location.HOUSE: is_anyone_present_wrapper(
- x
- ),
+ is_anyone_present=lambda x=Location.HOUSE: is_anyone_present_wrapper(x),
arper_minimum_device_count=50,
)
elif location == 'CABIN':
network_netmask='255.255.255.0',
network_router_ip='192.168.0.1',
presence_location=Location.CABIN,
- is_anyone_present=lambda x=Location.CABIN: is_anyone_present_wrapper(
- x
- ),
+ is_anyone_present=lambda x=Location.CABIN: is_anyone_present_wrapper(x),
arper_minimum_device_count=15,
)
else:
for update_id in sorted(self.last_reminder_ts.keys()):
if force_all_updates_to_run:
logger.debug('Forcing all updates to run')
- self.update(
- update_id, self.now, self.last_reminder_ts[update_id]
- )
+ self.update(update_id, self.now, self.last_reminder_ts[update_id])
self.last_reminder_ts[update_id] = self.now
return
refresh_secs = self.update_ids_to_update_secs[update_id]
last_run = self.last_reminder_ts[update_id]
if last_run is None: # Never run before
- logger.debug(
- f'id {update_id} has never been run; running it now'
- )
- self.update(
- update_id, self.now, self.last_reminder_ts[update_id]
- )
+ logger.debug(f'id {update_id} has never been run; running it now')
+ self.update(update_id, self.now, self.last_reminder_ts[update_id])
self.last_reminder_ts[update_id] = self.now
else:
delta = self.now - last_run
This may block for as long as self.sleep_delay.
"""
- logger.debug(
- 'Setting shutdown event and waiting for background thread.'
- )
+ logger.debug('Setting shutdown event and waiting for background thread.')
self.should_terminate.set()
self.updater_thread.join()
logger.debug('Background thread terminated.')
return retval
-def justify_string_by_chunk(
- string: str, width: int = 80, padding: str = " "
-) -> str:
+def justify_string_by_chunk(string: str, width: int = 80, padding: str = " ") -> str:
"""
Justifies a string.
def wrapper(funct: Callable):
@functools.wraps(funct)
- def inner_wrapper(
- *a, **kwa
- ) -> Tuple[threading.Thread, threading.Event]:
+ def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
should_terminate = threading.Event()
should_terminate.clear()
newargs = (*a, should_terminate)
should_terminate = threading.Event()
should_terminate.clear()
newargs = (should_terminate, *args)
- thread = threading.Thread(
- target=helper_thread, args=newargs, kwargs=kwargs
- )
+ thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
thread.start()
logger.debug(f'Started thread {thread.name} tid={thread.ident}')
return (thread, should_terminate)
pass
@abstractmethod
- def save_performance_data(
- self, method_id: str, data: Dict[str, List[float]]
- ):
+ def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
pass
@abstractmethod
with open(self.filename, 'rb') as f:
return pickle.load(f)
- def save_performance_data(
- self, method_id: str, data: Dict[str, List[float]]
- ):
+ def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
for trace in self.traces_to_delete:
if trace in data:
data[trace] = []
results.close()
return ret
- def save_performance_data(
- self, method_id: str, data: Dict[str, List[float]]
- ):
+ def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
self.delete_performance_data(method_id)
for (method_id, perf_data) in data.items():
sql = 'INSERT INTO runtimes_by_function (function, runtime) VALUES '
)
helper = DatabasePerfRegressionDataPersister(dbspec)
else:
- raise Exception(
- 'Unknown/unexpected --unittests_persistance_strategy value'
- )
+ raise Exception('Unknown/unexpected --unittests_persistance_strategy value')
func_id = function_utils.function_identifier(func)
func_name = func.__name__
stdev = statistics.stdev(hist)
logger.debug(f'For {func_name}, performance stdev={stdev}')
slowest = hist[-1]
- logger.debug(
- f'For {func_name}, slowest perf on record is {slowest:f}s'
- )
+ logger.debug(f'For {func_name}, slowest perf on record is {slowest:f}s')
limit = slowest + stdev * 4
- logger.debug(
- f'For {func_name}, max acceptable runtime is {limit:f}s'
- )
- logger.debug(
- f'For {func_name}, actual observed runtime was {run_time:f}s'
- )
+ logger.debug(f'For {func_name}, max acceptable runtime is {limit:f}s')
+ logger.debug(f'For {func_name}, actual observed runtime was {run_time:f}s')
if run_time > limit and not config.config['unittests_ignore_perf']:
msg = f'''{func_id} performance has regressed unacceptably.
{slowest:f}s is the slowest runtime on record in {len(hist)} perf samples.
# 52 bits
@staticmethod
- def _compute_word_fingerprint(
- word: str, population: Mapping[str, int]
- ) -> int:
+ def _compute_word_fingerprint(word: str, population: Mapping[str, int]) -> int:
fp = 0
- for pair in sorted(
- population.items(), key=lambda x: x[1], reverse=True
- ):
+ for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
letter = pair[0]
if letter in fprint_feature_bit:
count = pair[1]
population: Mapping[str, int],
) -> int:
sig = 0
- for pair in sorted(
- population.items(), key=lambda x: x[1], reverse=True
- ):
+ for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
letter = pair[0]
if letter not in letter_sigs:
continue
"""
population = list_utils.population_counts(word)
fprint = Unscrambler._compute_word_fingerprint(word, population)
- letter_sig = Unscrambler._compute_word_letter_sig(
- letter_sigs, word, population
- )
+ letter_sig = Unscrambler._compute_word_letter_sig(letter_sigs, word, population)
assert fprint & letter_sig == 0
sig = fprint | letter_sig
return sig
"""
sig = Unscrambler.compute_word_sig(word)
- return self.lookup_by_sig(
- sig, include_fuzzy_matches=include_fuzzy_matches
- )
+ return self.lookup_by_sig(sig, include_fuzzy_matches=include_fuzzy_matches)
def lookup_by_sig(
self, sig: int, *, include_fuzzy_matches: bool = False
logger = logging.getLogger(__name__)
-class WaitablePresenceDetectorWithMemory(
- state_tracker.WaitableAutomaticStateTracker
-):
+class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTracker):
"""
This is a waitable class that keeps a PresenceDetector internally
and periodically polls it to detect changes in presence in a
def check_detector(self) -> None:
if len(self.detector.dark_locations) > 0:
- logger.debug(
- 'PresenceDetector is incomplete; trying to reinitialize...'
- )
+ logger.debug('PresenceDetector is incomplete; trying to reinitialize...')
self.detector = base_presence.PresenceDetection()
def is_someone_home(self) -> Tuple[bool, datetime.datetime]: