From 5c212d7639f62fcb936f9d7a0bbe704a9f7b213d Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Tue, 8 Feb 2022 13:08:52 -0800 Subject: [PATCH] Hook in pylint to the pre-commit hook and start to fix some of its warnings. It seems pickier than flake8 which is probably a good thing. --- acl.py | 12 ++++--- ansi.py | 27 +++++++++------ argparse_utils.py | 29 ++++++++++++---- arper.py | 42 ++++++++++++----------- base_presence.py | 30 ++++++++++------ bootstrap.py | 81 ++++++++++++++++++++++---------------------- camera_utils.py | 17 ++++++---- unscrambler.py | 34 ++++++++----------- waitable_presence.py | 2 +- 9 files changed, 154 insertions(+), 120 deletions(-) diff --git a/acl.py b/acl.py index a1ff405..cf59d6a 100644 --- a/acl.py +++ b/acl.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +"""This module defines various flavors of Access Control Lists.""" + import enum import fnmatch import logging @@ -41,23 +43,23 @@ class SimpleACL(ABC): def __call__(self, x: Any) -> bool: """Returns True if x is allowed, False otherwise.""" - logger.debug(f'SimpleACL checking {x}') + logger.debug('SimpleACL checking %s', x) if self.order_to_check_allow_deny == Order.ALLOW_DENY: logger.debug('Checking allowed first...') if self.check_allowed(x): - logger.debug(f'{x} was allowed explicitly.') + logger.debug('%s was allowed explicitly.', x) return True logger.debug('Checking denied next...') if self.check_denied(x): - logger.debug(f'{x} was denied explicitly.') + logger.debug('%s was denied explicitly.', x) return False elif self.order_to_check_allow_deny == Order.DENY_ALLOW: logger.debug('Checking denied first...') if self.check_denied(x): - logger.debug(f'{x} was denied explicitly.') + logger.debug('%s was denied explicitly.', x) return False if self.check_allowed(x): - logger.debug(f'{x} was allowed explicitly.') + logger.debug('%s was allowed explicitly.', x) return True logger.debug( diff --git a/ansi.py b/ansi.py index 1633fdd..03f8fd2 100755 --- a/ansi.py +++ b/ansi.py @@ -1,5 +1,9 @@ #!/usr/bin/env python3 +"""A bunch of color names mapped into RGB tuples and some methods for +setting the text color, background, etc... using ANSI escape +sequences.""" + import difflib import io import logging @@ -11,6 +15,8 @@ from typing import Any, Callable, Dict, Iterable, Optional, Tuple from overrides import overrides import logging_utils +import string_utils + logger = logging.getLogger(__name__) @@ -1641,7 +1647,7 @@ def strike_through() -> str: def is_16color(num: int) -> bool: - return num == 255 or num == 128 + return num in (255, 128) def is_216color(num: int) -> bool: @@ -1751,8 +1757,6 @@ def fg( b'G1szODs1OzIxbQ==\\n' """ - import string_utils - if name is not None and name == 'reset': return '\033[39m' @@ -1810,8 +1814,6 @@ def pick_contrasting_color( (0, 0, 0) """ - import string_utils - if name is not None and string_utils.is_full_string(name): rgb = _find_color_by_name(name) else: @@ -1832,7 +1834,7 @@ def guess_name(name: str) -> str: max_ratio = r best_guess = possibility assert best_guess is not None - logger.debug(f"Best guess at color name is {best_guess}") + logger.debug("Best guess at color name is %s", best_guess) return best_guess @@ -1854,8 +1856,6 @@ def bg( b'G1s0ODs1OzE5Nm0=\\n' """ - import string_utils - if name is not None and name == 'reset': return '\033[49m' @@ -1886,7 +1886,11 @@ def bg( class StdoutInterceptor(io.TextIOBase): + """An interceptor for data written to stdout. Use as a context. + + """ def __init__(self): + super().__init__() self.saved_stdout: io.TextIO = None self.buf = '' @@ -1906,12 +1910,16 @@ class StdoutInterceptor(io.TextIOBase): class ProgrammableColorizer(StdoutInterceptor): + """A colorizing interceptor; pass it re.Patterns -> methods that do + something (usually add color to) the match. + + """ def __init__( self, patterns: Iterable[Tuple[re.Pattern, Callable[[Any, re.Pattern], str]]], ): super().__init__() - self.patterns = [_ for _ in patterns] + self.patterns = list(patterns) @overrides def write(self, s: str): @@ -1921,7 +1929,6 @@ class ProgrammableColorizer(StdoutInterceptor): if __name__ == '__main__': - def main() -> None: import doctest diff --git a/argparse_utils.py b/argparse_utils.py index 5a270f6..045d882 100644 --- a/argparse_utils.py +++ b/argparse_utils.py @@ -1,5 +1,7 @@ #!/usr/bin/python3 +"""Helpers for commandline argument parsing.""" + import argparse import datetime import logging @@ -15,13 +17,28 @@ logger = logging.getLogger(__name__) class ActionNoYes(argparse.Action): + """An argparse Action that allows for commandline arguments like this: + + cfg.add_argument( + '--enable_the_thing', + action=ActionNoYes, + default=False, + help='Should we enable the thing?' + ) + + This creates cmdline arguments: + + --enable_the_thing + --no_enable_the_thing + + """ def __init__(self, option_strings, dest, default=None, required=False, help=None): if default is None: msg = 'You must provide a default with Yes/No action' logger.critical(msg) raise ValueError(msg) if len(option_strings) != 1: - msg = 'Only single argument is allowed with YesNo action' + msg = 'Only single argument is allowed with NoYes action' logger.critical(msg) raise ValueError(msg) opt = option_strings[0] @@ -81,8 +98,8 @@ def valid_bool(v: Any) -> bool: try: return to_bool(v) - except Exception: - raise argparse.ArgumentTypeError(v) + except Exception as e: + raise argparse.ArgumentTypeError(v) from e def valid_ip(ip: str) -> str: @@ -247,10 +264,10 @@ def valid_duration(txt: str) -> datetime.timedelta: try: secs = parse_duration(txt) - except Exception as e: - raise argparse.ArgumentTypeError(e) - finally: return datetime.timedelta(seconds=secs) + except Exception as e: + logger.exception(e) + raise argparse.ArgumentTypeError(e) from e if __name__ == '__main__': diff --git a/arper.py b/arper.py index 3b308ca..a665137 100644 --- a/arper.py +++ b/arper.py @@ -71,11 +71,11 @@ class Arper(persistent.Persistent): if len(self.state) < config.config['arper_min_entries_to_be_valid']: raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.') if cached_supplimental_state is not None: - logger.debug(f'Also added {len(cached_supplimental_state)} supplimental entries.') + logger.debug('Also added %d supplimental entries.', len(cached_supplimental_state)) for mac, ip in cached_supplimental_state.items(): self.state[mac] = ip for mac, ip in self.state.items(): - logger.debug(f'{mac} <-> {ip}') + logger.debug('%s <-> %s', mac, ip) def update_from_arp_scan(self): network_spec = site_config.get_config().network @@ -92,7 +92,7 @@ class Arper(persistent.Persistent): mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() - logger.debug(f'ARPER: {mac} => {ip}') + logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def update_from_arp(self): @@ -106,7 +106,7 @@ class Arper(persistent.Persistent): mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() - logger.debug(f'ARPER: {mac} => {ip}') + logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: @@ -117,21 +117,26 @@ class Arper(persistent.Persistent): return self.state.inverse.get(ip, None) @classmethod - def load_state(cls, cache_file: str, freshness_threshold_sec: int, state: BiDict): + def load_state( + cls, + cache_file: str, + freshness_threshold_sec: int, + state: BiDict, + ): if not file_utils.file_is_readable(cache_file): - logger.debug(f'Can\'t read {cache_file}') - return None + logger.debug('Can\'t read %s', cache_file) + return if persistent.was_file_written_within_n_seconds( cache_file, freshness_threshold_sec, ): - logger.debug(f'Loading state from {cache_file}') + logger.debug('Loading state from %s', cache_file) count = 0 with open(cache_file, 'r') as rf: contents = rf.readlines() for line in contents: line = line[:-1] - logger.debug(f'ARPER:{cache_file}> {line}') + logger.debug('ARPER:%s> %s', cache_file, line) (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() @@ -139,15 +144,15 @@ class Arper(persistent.Persistent): state[mac] = ip count += 1 else: - logger.debug(f'{cache_file} is too stale.') + logger.debug('%s is too stale.', cache_file) @classmethod @overrides def load(cls) -> Any: - local_state = BiDict() + local_state: BiDict = BiDict() cache_file = config.config['arper_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() - logger.debug(f'Trying to load main arper cache from {cache_file}...') + logger.debug('Trying to load main arper cache from %s...', cache_file) cls.load_state(cache_file, max_staleness, local_state) if len(local_state) <= config.config['arper_min_entries_to_be_valid']: msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.' @@ -158,14 +163,11 @@ class Arper(persistent.Persistent): except Exception: pass - supplimental_state = BiDict() + supplimental_state: BiDict = BiDict() cache_file = config.config['arper_supplimental_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() - logger.debug(f'Trying to suppliment arper state from {cache_file}...') + logger.debug('Trying to suppliment arper state from %s', cache_file) cls.load_state(cache_file, max_staleness, supplimental_state) - if len(supplimental_state) == 0: - supplimental_state = None - if len(local_state) > 0: return cls(local_state, supplimental_state) return None @@ -173,7 +175,7 @@ class Arper(persistent.Persistent): @overrides def save(self) -> bool: if len(self.state) > config.config['arper_min_entries_to_be_valid']: - logger.debug(f'Persisting state to {config.config["arper_cache_location"]}') + logger.debug('Persisting state to %s', config.config["arper_cache_location"]) with file_utils.FileWriter(config.config['arper_cache_location']) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() @@ -181,6 +183,8 @@ class Arper(persistent.Persistent): return True else: logger.warning( - f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.' + 'Only saw %d entries; needed at least %d to bother persisting.', + len(self.state), + config.config["arper_min_entries_to_be_valid"], ) return False diff --git a/base_presence.py b/base_presence.py index fa035fd..f18b870 100755 --- a/base_presence.py +++ b/base_presence.py @@ -67,7 +67,7 @@ class PresenceDetection(object): ], } self.run_location = site_config.get_location() - logger.debug(f"run_location is {self.run_location}") + logger.debug("base_presence run_location is %s", self.run_location) self.weird_mac_at_cabin = False self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} @@ -85,7 +85,7 @@ class PresenceDetection(object): > config.config['presence_tolerable_staleness_seconds'].total_seconds() ): logger.debug( - f"It's been {delta.total_seconds()}s since last update; refreshing now." + "It's been %ss since last update; refreshing now.", delta.total_seconds() ) self.update() @@ -157,17 +157,17 @@ class PresenceDetection(object): line = line.strip() if len(line) == 0: continue - logger.debug(f'{location}> {line}') + logger.debug('%s> %s', location, line) if "cabin_" in line: continue if location == Location.CABIN: - logger.debug(f'Cabin count: {cabin_count}') + logger.debug('Cabin count: %d', cabin_count) cabin_count += 1 try: - (mac, count, ip_name, mfg, ts) = line.split(",") + (mac, _, ip_name, _, ts) = line.split(",") # type: ignore except Exception as e: - logger.error(f'SKIPPED BAD LINE> {line}') logger.exception(e) + logger.error('SKIPPED BAD LINE> %s', line) continue mac = mac.strip() (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp( @@ -201,7 +201,7 @@ class PresenceDetection(object): msg = f"Can't see {self.dark_locations} right now; answer confidence impacted" logger.warning(msg) warnings.warn(msg, stacklevel=2) - logger.debug(f'Looking for {name}...') + logger.debug('Looking for %s...', name) if name is Person.UNKNOWN: if self.weird_mac_at_cabin: @@ -214,31 +214,39 @@ class PresenceDetection(object): votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 + location = None for mac in self.devices_by_person[name]: if mac not in self.names_by_mac: continue mac_name = self.names_by_mac[mac] - logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') + logger.debug( + 'Looking for %s... check for mac %s (%s)', + name, mac, mac_name + ) for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] - logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') + logger.debug( + 'Seen %s (%s) at %s since %s', + mac, mac_name, location, ts + ) tiebreaks[location] = ts ( most_recent_location, - first_seen_ts, + _, ) = dict_utils.item_with_max_value(tiebreaks) bonus = credit v = votes.get(most_recent_location, 0) votes[most_recent_location] = v + bonus - logger.debug(f'{name}: {location} gets {bonus} votes.') + logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus) credit = int(credit * 0.2) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: (location, value) = dict_utils.item_with_max_value(votes) if value > 2001: + assert location return location return Location.UNKNOWN diff --git a/bootstrap.py b/bootstrap.py index 2df9588..3574786 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -6,7 +6,6 @@ import logging import os import sys from inspect import stack -from typing import List import config import logging_utils @@ -18,23 +17,23 @@ from argparse_utils import ActionNoYes logger = logging.getLogger(__name__) -args = config.add_commandline_args( +cfg = config.add_commandline_args( f'Bootstrap ({__file__})', 'Args related to python program bootstrapper and Swiss army knife', ) -args.add_argument( +cfg.add_argument( '--debug_unhandled_exceptions', action=ActionNoYes, default=False, help='Break into pdb on top level unhandled exceptions.', ) -args.add_argument( +cfg.add_argument( '--show_random_seed', action=ActionNoYes, default=False, help='Should we display (and log.debug) the global random seed?', ) -args.add_argument( +cfg.add_argument( '--set_random_seed', type=int, nargs=1, @@ -42,32 +41,32 @@ args.add_argument( metavar='SEED_INT', help='Override the global random seed with a particular number.', ) -args.add_argument( +cfg.add_argument( '--dump_all_objects', action=ActionNoYes, default=False, help='Should we dump the Python import tree before main?', ) -args.add_argument( +cfg.add_argument( '--audit_import_events', action=ActionNoYes, default=False, help='Should we audit all import events?', ) -args.add_argument( +cfg.add_argument( '--run_profiler', action=ActionNoYes, default=False, help='Should we run cProfile on this code?', ) -args.add_argument( +cfg.add_argument( '--trace_memory', action=ActionNoYes, default=False, help='Should we record/report on memory utilization?', ) -original_hook = sys.excepthook +ORIGINAL_EXCEPTION_HOOK = sys.excepthook def handle_uncaught_exception(exc_type, exc_value, exc_tb): @@ -77,7 +76,6 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb): maybe attaches a debugger. """ - global original_hook msg = f'Unhandled top level exception {exc_type}' logger.exception(msg) print(msg, file=sys.stderr) @@ -87,7 +85,7 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb): else: if not sys.stderr.isatty() or not sys.stdin.isatty(): # stdin or stderr is redirected, just do the normal thing - original_hook(exc_type, exc_value, exc_tb) + ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb) else: # a terminal is attached and stderr is not redirected, maybe debug. import traceback @@ -99,7 +97,7 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb): logger.info("Invoking the debugger...") pdb.pm() else: - original_hook(exc_type, exc_value, exc_tb) + ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb) class ImportInterceptor(importlib.abc.MetaPathFinder): @@ -120,17 +118,18 @@ class ImportInterceptor(importlib.abc.MetaPathFinder): fname = 'unknown' self.module_by_filename_cache[fname] = mod - def should_ignore_filename(self, filename: str) -> bool: + @staticmethod + def should_ignore_filename(filename: str) -> bool: return 'importlib' in filename or 'six.py' in filename def find_module(self, fullname, path): raise Exception("This method has been deprecated since Python 3.4, please upgrade.") - def find_spec(self, loaded_module, path=None, target=None): + def find_spec(self, loaded_module, path=None, _=None): s = stack() for x in range(3, len(s)): filename = s[x].filename - if self.should_ignore_filename(filename): + if ImportInterceptor.should_ignore_filename(filename): continue loading_function = s[x].function @@ -171,15 +170,14 @@ class ImportInterceptor(importlib.abc.MetaPathFinder): # # Also note: move bootstrap up in the global import list to catch # more import events and have a more complete record. -import_interceptor = None +IMPORT_INTERCEPTOR = None for arg in sys.argv: if arg == '--audit_import_events': - import_interceptor = ImportInterceptor() - sys.meta_path.insert(0, import_interceptor) + IMPORT_INTERCEPTOR = ImportInterceptor() + sys.meta_path.insert(0, IMPORT_INTERCEPTOR) def dump_all_objects() -> None: - global import_interceptor messages = {} all_modules = sys.modules for obj in object.__subclasses__(): @@ -199,8 +197,8 @@ def dump_all_objects() -> None: mod_file = mod.__file__ else: mod_file = 'unknown' - if import_interceptor is not None: - import_path = import_interceptor.find_importer(mod_name) + if IMPORT_INTERCEPTOR is not None: + import_path = IMPORT_INTERCEPTOR.find_importer(mod_name) else: import_path = 'unknown' msg = f'{class_mod_name}::{klass} ({mod_file})' @@ -245,17 +243,18 @@ def initialize(entry_point): # Maybe log some info about the python interpreter itself. logger.debug( - f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}' + 'Platform: %s, maxint=0x%x, byteorder=%s', + sys.platform, sys.maxsize, sys.byteorder ) - logger.debug(f'Python interpreter version: {sys.version}') - logger.debug(f'Python implementation: {sys.implementation}') - logger.debug(f'Python C API version: {sys.api_version}') - logger.debug(f'Python path: {sys.path}') + logger.debug('Python interpreter version: %s', sys.version) + logger.debug('Python implementation: %s', sys.implementation) + logger.debug('Python C API version: %s', sys.api_version) + logger.debug('Python path: %s', sys.path) # Log something about the site_config, many things use it. import site_config - logger.debug(f'Global site_config: {site_config.get_config()}') + logger.debug('Global site_config: %s', site_config.get_config()) # Allow programs that don't bother to override the random seed # to be replayed via the commandline. @@ -274,7 +273,7 @@ def initialize(entry_point): random.seed(random_seed) # Do it, invoke the user's code. Pay attention to how long it takes. - logger.debug(f'Starting {entry_point.__name__} (program entry point)') + logger.debug('Starting %s (program entry point)', entry_point.__name__) ret = None import stopwatch @@ -294,7 +293,7 @@ def initialize(entry_point): with stopwatch.Timer() as t: ret = entry_point(*args, **kwargs) - logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.') + logger.debug('%s (program entry point) returned %s.', entry_point.__name__, ret) if config.config['trace_memory']: snapshot = tracemalloc.take_snapshot() @@ -308,27 +307,27 @@ def initialize(entry_point): dump_all_objects() if config.config['audit_import_events']: - global import_interceptor - if import_interceptor is not None: - print(import_interceptor.tree) + if IMPORT_INTERCEPTOR is not None: + print(IMPORT_INTERCEPTOR.tree) walltime = t() (utime, stime, cutime, cstime, elapsed_time) = os.times() logger.debug( '\n' - f'user: {utime}s\n' - f'system: {stime}s\n' - f'child user: {cutime}s\n' - f'child system: {cstime}s\n' - f'machine uptime: {elapsed_time}s\n' - f'walltime: {walltime}s' + 'user: %.4fs\n' + 'system: %.4fs\n' + 'child user: %.4fs\n' + 'child system: %.4fs\n' + 'machine uptime: %.4fs\n' + 'walltime: %.4fs', + utime, stime, cutime, cstime, elapsed_time, walltime ) # If it doesn't return cleanly, call attention to the return value. if ret is not None and ret != 0: - logger.error(f'Exit {ret}') + logger.error('Exit %s', ret) else: - logger.debug(f'Exit {ret}') + logger.debug('Exit %s', ret) sys.exit(ret) return initialize_wrapper diff --git a/camera_utils.py b/camera_utils.py index f5d295b..c789ed6 100644 --- a/camera_utils.py +++ b/camera_utils.py @@ -50,7 +50,7 @@ def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata: weird_orange_count += 1 elif is_near(pixel[0], 0) and is_near(pixel[1], 0): hs_zero_count += 1 - logger.debug(f"hszero#={hs_zero_count}, weird_orange={weird_orange_count}") + logger.debug("hszero#=%d, weird_orange=%d", hs_zero_count, weird_orange_count) return SanityCheckImageMetadata( hs_zero_count > (num_pixels * 0.75), weird_orange_count > (num_pixels * 0.75), @@ -67,23 +67,26 @@ def fetch_camera_image_from_video_server( url = ( f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg" ) - logger.debug(f'Fetching image from {url}') + logger.debug('Fetching image from %s', url) try: response = requests.get(url, stream=False, timeout=10.0) if response.ok: raw = response.content - logger.debug(f'Read {len(response.content)} byte image from HTTP server') + logger.debug('Read %d byte image from HTTP server', len(response.content)) tmp = np.frombuffer(raw, dtype="uint8") logger.debug( - f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.' + 'Translated raw content into %s %s with element type %s', + tmp.shape, type(tmp), type(tmp[0]), ) jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR) logger.debug( - f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}' + 'Decoded into %s jpeg %s with element type %s', + jpg.shape, type(jpg), type(jpg[0][0]) ) hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV) logger.debug( - f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}' + 'Converted JPG into %s HSV HSV %s with element type %s', + hsv.shape, type(hsv), type(hsv[0][0]) ) (_, is_bad_image) = sanity_check_image(hsv) if not is_bad_image: @@ -125,7 +128,7 @@ def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) - """Fetch the raw webcam image straight from the webcam's RTSP stream.""" hostname = camera_name_to_hostname(camera_name) stream = f"rtsp://camera:IaLaIok@{hostname}:554/live" - logger.debug(f'Fetching image from RTSP stream {stream}') + logger.debug('Fetching image from RTSP stream %s', stream) try: cmd = [ "/usr/bin/timeout", diff --git a/unscrambler.py b/unscrambler.py index 9df82f6..c5bc9b5 100644 --- a/unscrambler.py +++ b/unscrambler.py @@ -128,14 +128,12 @@ class Unscrambler(object): # 52 bits @staticmethod - def _compute_word_fingerprint(word: str, population: Mapping[str, int]) -> int: + def _compute_word_fingerprint(population: Mapping[str, int]) -> int: fp = 0 for pair in sorted(population.items(), key=lambda x: x[1], reverse=True): letter = pair[0] if letter in fprint_feature_bit: - count = pair[1] - if count > 3: - count = 3 + count = min(pair[1], 3) shift = fprint_feature_bit[letter] s = count << shift fp |= s @@ -144,25 +142,23 @@ class Unscrambler(object): # 32 bits @staticmethod def _compute_word_letter_sig( - letter_sigs: Mapping[str, int], + lsigs: Mapping[str, int], word: str, population: Mapping[str, int], ) -> int: sig = 0 for pair in sorted(population.items(), key=lambda x: x[1], reverse=True): letter = pair[0] - if letter not in letter_sigs: + if letter not in lsigs: continue - s = letter_sigs[letter] + s = lsigs[letter] count = pair[1] if count > 1: s <<= count s |= count s &= letters_mask sig ^= s - length = len(word) - if length > 31: - length = 31 + length = min(len(word), 31) sig ^= length << 8 sig &= letters_mask return sig @@ -189,7 +185,7 @@ class Unscrambler(object): """ population = list_utils.population_counts(word) - fprint = Unscrambler._compute_word_fingerprint(word, population) + fprint = Unscrambler._compute_word_fingerprint(population) letter_sig = Unscrambler._compute_word_letter_sig(letter_sigs, word, population) assert fprint & letter_sig == 0 sig = fprint | letter_sig @@ -197,7 +193,7 @@ class Unscrambler(object): @staticmethod def repopulate( - letter_sigs: Dict[str, int], + lsigs: Dict[str, int], dictfile: str = '/usr/share/dict/words', indexfile: str = '/usr/share/dict/sparse_index', ) -> None: @@ -211,13 +207,13 @@ class Unscrambler(object): for word in f: word = word.replace('\n', '') word = word.lower() - sig = Unscrambler.compute_word_sig(letter_sigs, word) - logger.debug("%s => 0x%x" % (word, sig)) + sig = Unscrambler.compute_word_sig(word) + logger.debug("%s => 0x%x", word, sig) if word in seen: continue seen.add(word) if sig in words_by_sigs: - words_by_sigs[sig] += ",%s" % word + words_by_sigs[sig] += f",{word}" else: words_by_sigs[sig] = word with open(indexfile, 'w') as f: @@ -252,13 +248,11 @@ class Unscrambler(object): """ ret = {} - (exact, location) = list_utils.binary_search(self.sigs, sig) + (_, location) = list_utils.binary_search(self.sigs, sig) start = location - window_size - if start < 0: - start = 0 + start = max(start, 0) end = location + 1 + window_size - if end > len(self.words): - end = len(self.words) + end = min(end, len(self.words)) for x in range(start, end): word = self.words[x] diff --git a/waitable_presence.py b/waitable_presence.py index 46d7cbe..9247a67 100644 --- a/waitable_presence.py +++ b/waitable_presence.py @@ -74,7 +74,7 @@ class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTra raise Exception(f'Unknown update type {update_id} in {__file__}') def poll_presence(self, now: datetime.datetime) -> None: - logger.debug(f'Checking presence in {self.location} now...') + logger.debug('Checking presence in %s now...', self.location) self.detector.update() if self.detector.is_anyone_in_location_now(self.location): self.someone_is_home = True -- 2.47.1