Hook in pylint to the pre-commit hook and start to fix some of its
authorScott Gasch <[email protected]>
Tue, 8 Feb 2022 21:08:52 +0000 (13:08 -0800)
committerScott Gasch <[email protected]>
Tue, 8 Feb 2022 21:08:52 +0000 (13:08 -0800)
warnings.  It seems pickier than flake8 which is probably a good
thing.

acl.py
ansi.py
argparse_utils.py
arper.py
base_presence.py
bootstrap.py
camera_utils.py
unscrambler.py
waitable_presence.py

diff --git a/acl.py b/acl.py
index a1ff4051d80087b3c55ee085a1c1dd47c7611d73..cf59d6a87d8266fe55259e9584c403346d6404f7 100644 (file)
--- a/acl.py
+++ b/acl.py
@@ -1,5 +1,7 @@
 #!/usr/bin/env python3
 
+"""This module defines various flavors of Access Control Lists."""
+
 import enum
 import fnmatch
 import logging
@@ -41,23 +43,23 @@ class SimpleACL(ABC):
 
     def __call__(self, x: Any) -> bool:
         """Returns True if x is allowed, False otherwise."""
-        logger.debug(f'SimpleACL checking {x}')
+        logger.debug('SimpleACL checking %s', x)
         if self.order_to_check_allow_deny == Order.ALLOW_DENY:
             logger.debug('Checking allowed first...')
             if self.check_allowed(x):
-                logger.debug(f'{x} was allowed explicitly.')
+                logger.debug('%s was allowed explicitly.', x)
                 return True
             logger.debug('Checking denied next...')
             if self.check_denied(x):
-                logger.debug(f'{x} was denied explicitly.')
+                logger.debug('%s was denied explicitly.', x)
                 return False
         elif self.order_to_check_allow_deny == Order.DENY_ALLOW:
             logger.debug('Checking denied first...')
             if self.check_denied(x):
-                logger.debug(f'{x} was denied explicitly.')
+                logger.debug('%s was denied explicitly.', x)
                 return False
             if self.check_allowed(x):
-                logger.debug(f'{x} was allowed explicitly.')
+                logger.debug('%s was allowed explicitly.', x)
                 return True
 
         logger.debug(
diff --git a/ansi.py b/ansi.py
index 1633fddbcb31714d3ae7342daca37c1c4034b4c6..03f8fd27473c4e295cdafbb9e7122b4cec296453 100755 (executable)
--- a/ansi.py
+++ b/ansi.py
@@ -1,5 +1,9 @@
 #!/usr/bin/env python3
 
+"""A bunch of color names mapped into RGB tuples and some methods for
+setting the text color, background, etc... using ANSI escape
+sequences."""
+
 import difflib
 import io
 import logging
@@ -11,6 +15,8 @@ from typing import Any, Callable, Dict, Iterable, Optional, Tuple
 from overrides import overrides
 
 import logging_utils
+import string_utils
+
 
 logger = logging.getLogger(__name__)
 
@@ -1641,7 +1647,7 @@ def strike_through() -> str:
 
 
 def is_16color(num: int) -> bool:
-    return num == 255 or num == 128
+    return num in (255, 128)
 
 
 def is_216color(num: int) -> bool:
@@ -1751,8 +1757,6 @@ def fg(
     b'G1szODs1OzIxbQ==\\n'
 
     """
-    import string_utils
-
     if name is not None and name == 'reset':
         return '\033[39m'
 
@@ -1810,8 +1814,6 @@ def pick_contrasting_color(
     (0, 0, 0)
 
     """
-    import string_utils
-
     if name is not None and string_utils.is_full_string(name):
         rgb = _find_color_by_name(name)
     else:
@@ -1832,7 +1834,7 @@ def guess_name(name: str) -> str:
             max_ratio = r
             best_guess = possibility
     assert best_guess is not None
-    logger.debug(f"Best guess at color name is {best_guess}")
+    logger.debug("Best guess at color name is %s", best_guess)
     return best_guess
 
 
@@ -1854,8 +1856,6 @@ def bg(
     b'G1s0ODs1OzE5Nm0=\\n'
 
     """
-    import string_utils
-
     if name is not None and name == 'reset':
         return '\033[49m'
 
@@ -1886,7 +1886,11 @@ def bg(
 
 
 class StdoutInterceptor(io.TextIOBase):
+    """An interceptor for data written to stdout.  Use as a context.
+
+    """
     def __init__(self):
+        super().__init__()
         self.saved_stdout: io.TextIO = None
         self.buf = ''
 
@@ -1906,12 +1910,16 @@ class StdoutInterceptor(io.TextIOBase):
 
 
 class ProgrammableColorizer(StdoutInterceptor):
+    """A colorizing interceptor; pass it re.Patterns -> methods that do
+    something (usually add color to) the match.
+
+    """
     def __init__(
         self,
         patterns: Iterable[Tuple[re.Pattern, Callable[[Any, re.Pattern], str]]],
     ):
         super().__init__()
-        self.patterns = [_ for _ in patterns]
+        self.patterns = list(patterns)
 
     @overrides
     def write(self, s: str):
@@ -1921,7 +1929,6 @@ class ProgrammableColorizer(StdoutInterceptor):
 
 
 if __name__ == '__main__':
-
     def main() -> None:
         import doctest
 
index 5a270f6ef22c1845be1bd6c59ab7fde1cb4cfe21..045d882a2b6c4b18f4a27fd0267a65dd2b4444e4 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/python3
 
+"""Helpers for commandline argument parsing."""
+
 import argparse
 import datetime
 import logging
@@ -15,13 +17,28 @@ logger = logging.getLogger(__name__)
 
 
 class ActionNoYes(argparse.Action):
+    """An argparse Action that allows for commandline arguments like this:
+
+        cfg.add_argument(
+            '--enable_the_thing',
+            action=ActionNoYes,
+            default=False,
+            help='Should we enable the thing?'
+        )
+
+    This creates cmdline arguments:
+
+        --enable_the_thing
+        --no_enable_the_thing
+
+    """
     def __init__(self, option_strings, dest, default=None, required=False, help=None):
         if default is None:
             msg = 'You must provide a default with Yes/No action'
             logger.critical(msg)
             raise ValueError(msg)
         if len(option_strings) != 1:
-            msg = 'Only single argument is allowed with YesNo action'
+            msg = 'Only single argument is allowed with NoYes action'
             logger.critical(msg)
             raise ValueError(msg)
         opt = option_strings[0]
@@ -81,8 +98,8 @@ def valid_bool(v: Any) -> bool:
 
     try:
         return to_bool(v)
-    except Exception:
-        raise argparse.ArgumentTypeError(v)
+    except Exception as e:
+        raise argparse.ArgumentTypeError(v) from e
 
 
 def valid_ip(ip: str) -> str:
@@ -247,10 +264,10 @@ def valid_duration(txt: str) -> datetime.timedelta:
 
     try:
         secs = parse_duration(txt)
-    except Exception as e:
-        raise argparse.ArgumentTypeError(e)
-    finally:
         return datetime.timedelta(seconds=secs)
+    except Exception as e:
+        logger.exception(e)
+        raise argparse.ArgumentTypeError(e) from e
 
 
 if __name__ == '__main__':
index 3b308ca038ad405d1d37671f4c558343d84e4593..a665137c1c71fd90637add9cba91e125e44a31a4 100644 (file)
--- a/arper.py
+++ b/arper.py
@@ -71,11 +71,11 @@ class Arper(persistent.Persistent):
         if len(self.state) < config.config['arper_min_entries_to_be_valid']:
             raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.')
         if cached_supplimental_state is not None:
-            logger.debug(f'Also added {len(cached_supplimental_state)} supplimental entries.')
+            logger.debug('Also added %d supplimental entries.', len(cached_supplimental_state))
             for mac, ip in cached_supplimental_state.items():
                 self.state[mac] = ip
         for mac, ip in self.state.items():
-            logger.debug(f'{mac} <-> {ip}')
+            logger.debug('%s <-> %s', mac, ip)
 
     def update_from_arp_scan(self):
         network_spec = site_config.get_config().network
@@ -92,7 +92,7 @@ class Arper(persistent.Persistent):
             mac = string_utils.extract_mac_address(line)
             if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
                 mac = mac.lower()
-                logger.debug(f'ARPER: {mac} => {ip}')
+                logger.debug('ARPER: %s => %s', mac, ip)
                 self.state[mac] = ip
 
     def update_from_arp(self):
@@ -106,7 +106,7 @@ class Arper(persistent.Persistent):
             mac = string_utils.extract_mac_address(line)
             if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
                 mac = mac.lower()
-                logger.debug(f'ARPER: {mac} => {ip}')
+                logger.debug('ARPER: %s => %s', mac, ip)
                 self.state[mac] = ip
 
     def get_ip_by_mac(self, mac: str) -> Optional[str]:
@@ -117,21 +117,26 @@ class Arper(persistent.Persistent):
         return self.state.inverse.get(ip, None)
 
     @classmethod
-    def load_state(cls, cache_file: str, freshness_threshold_sec: int, state: BiDict):
+    def load_state(
+            cls,
+            cache_file: str,
+            freshness_threshold_sec: int,
+            state: BiDict,
+    ):
         if not file_utils.file_is_readable(cache_file):
-            logger.debug(f'Can\'t read {cache_file}')
-            return None
+            logger.debug('Can\'t read %s', cache_file)
+            return
         if persistent.was_file_written_within_n_seconds(
             cache_file,
             freshness_threshold_sec,
         ):
-            logger.debug(f'Loading state from {cache_file}')
+            logger.debug('Loading state from %s', cache_file)
             count = 0
             with open(cache_file, 'r') as rf:
                 contents = rf.readlines()
                 for line in contents:
                     line = line[:-1]
-                    logger.debug(f'ARPER:{cache_file}> {line}')
+                    logger.debug('ARPER:%s> %s', cache_file, line)
                     (mac, ip) = line.split(',')
                     mac = mac.strip()
                     mac = mac.lower()
@@ -139,15 +144,15 @@ class Arper(persistent.Persistent):
                     state[mac] = ip
                     count += 1
         else:
-            logger.debug(f'{cache_file} is too stale.')
+            logger.debug('%s is too stale.', cache_file)
 
     @classmethod
     @overrides
     def load(cls) -> Any:
-        local_state = BiDict()
+        local_state: BiDict = BiDict()
         cache_file = config.config['arper_cache_location']
         max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
-        logger.debug(f'Trying to load main arper cache from {cache_file}...')
+        logger.debug('Trying to load main arper cache from %s...', cache_file)
         cls.load_state(cache_file, max_staleness, local_state)
         if len(local_state) <= config.config['arper_min_entries_to_be_valid']:
             msg = f'{cache_file} is invalid: only {len(local_state)} entries.  Deleting it.'
@@ -158,14 +163,11 @@ class Arper(persistent.Persistent):
             except Exception:
                 pass
 
-        supplimental_state = BiDict()
+        supplimental_state: BiDict = BiDict()
         cache_file = config.config['arper_supplimental_cache_location']
         max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
-        logger.debug(f'Trying to suppliment arper state from {cache_file}...')
+        logger.debug('Trying to suppliment arper state from %s', cache_file)
         cls.load_state(cache_file, max_staleness, supplimental_state)
-        if len(supplimental_state) == 0:
-            supplimental_state = None
-
         if len(local_state) > 0:
             return cls(local_state, supplimental_state)
         return None
@@ -173,7 +175,7 @@ class Arper(persistent.Persistent):
     @overrides
     def save(self) -> bool:
         if len(self.state) > config.config['arper_min_entries_to_be_valid']:
-            logger.debug(f'Persisting state to {config.config["arper_cache_location"]}')
+            logger.debug('Persisting state to %s', config.config["arper_cache_location"])
             with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
                 for (mac, ip) in self.state.items():
                     mac = mac.lower()
@@ -181,6 +183,8 @@ class Arper(persistent.Persistent):
             return True
         else:
             logger.warning(
-                f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.'
+                'Only saw %d entries; needed at least %d to bother persisting.',
+                len(self.state),
+                config.config["arper_min_entries_to_be_valid"],
             )
             return False
index fa035fd28f5645147dc0b5f68e2ee6dfe9589f88..f18b870daceaa4e97cf47d5c7b68f90da1a2ed24 100755 (executable)
@@ -67,7 +67,7 @@ class PresenceDetection(object):
             ],
         }
         self.run_location = site_config.get_location()
-        logger.debug(f"run_location is {self.run_location}")
+        logger.debug("base_presence run_location is %s", self.run_location)
         self.weird_mac_at_cabin = False
         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
         self.names_by_mac: Dict[str, str] = {}
@@ -85,7 +85,7 @@ class PresenceDetection(object):
                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
             ):
                 logger.debug(
-                    f"It's been {delta.total_seconds()}s since last update; refreshing now."
+                    "It's been %ss since last update; refreshing now.", delta.total_seconds()
                 )
                 self.update()
 
@@ -157,17 +157,17 @@ class PresenceDetection(object):
             line = line.strip()
             if len(line) == 0:
                 continue
-            logger.debug(f'{location}> {line}')
+            logger.debug('%s> %s', location, line)
             if "cabin_" in line:
                 continue
             if location == Location.CABIN:
-                logger.debug(f'Cabin count: {cabin_count}')
+                logger.debug('Cabin count: %d', cabin_count)
                 cabin_count += 1
             try:
-                (mac, count, ip_name, mfg, ts) = line.split(",")
+                (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
             except Exception as e:
-                logger.error(f'SKIPPED BAD LINE> {line}')
                 logger.exception(e)
+                logger.error('SKIPPED BAD LINE> %s', line)
                 continue
             mac = mac.strip()
             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
@@ -201,7 +201,7 @@ class PresenceDetection(object):
             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
             logger.warning(msg)
             warnings.warn(msg, stacklevel=2)
-        logger.debug(f'Looking for {name}...')
+        logger.debug('Looking for %s...', name)
 
         if name is Person.UNKNOWN:
             if self.weird_mac_at_cabin:
@@ -214,31 +214,39 @@ class PresenceDetection(object):
         votes: Dict[Location, int] = {}
         tiebreaks: Dict[Location, datetime.datetime] = {}
         credit = 10000
+        location = None
         for mac in self.devices_by_person[name]:
             if mac not in self.names_by_mac:
                 continue
             mac_name = self.names_by_mac[mac]
-            logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
+            logger.debug(
+                'Looking for %s... check for mac %s (%s)',
+                name, mac, mac_name
+            )
             for location in self.location_ts_by_mac:
                 if mac in self.location_ts_by_mac[location]:
                     ts = (self.location_ts_by_mac[location])[mac]
-                    logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
+                    logger.debug(
+                        'Seen %s (%s) at %s since %s',
+                        mac, mac_name, location, ts
+                    )
                     tiebreaks[location] = ts
 
             (
                 most_recent_location,
-                first_seen_ts,
+                _,
             ) = dict_utils.item_with_max_value(tiebreaks)
             bonus = credit
             v = votes.get(most_recent_location, 0)
             votes[most_recent_location] = v + bonus
-            logger.debug(f'{name}: {location} gets {bonus} votes.')
+            logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
             credit = int(credit * 0.2)  # Note: list most important devices first
             if credit <= 0:
                 credit = 1
         if len(votes) > 0:
             (location, value) = dict_utils.item_with_max_value(votes)
             if value > 2001:
+                assert location
                 return location
         return Location.UNKNOWN
 
index 2df95884957624f28ddec9cf36d46d9486234d1d..35747865b7aacc38fa61f6005af6c5bd5833ff69 100644 (file)
@@ -6,7 +6,6 @@ import logging
 import os
 import sys
 from inspect import stack
-from typing import List
 
 import config
 import logging_utils
@@ -18,23 +17,23 @@ from argparse_utils import ActionNoYes
 
 logger = logging.getLogger(__name__)
 
-args = config.add_commandline_args(
+cfg = config.add_commandline_args(
     f'Bootstrap ({__file__})',
     'Args related to python program bootstrapper and Swiss army knife',
 )
-args.add_argument(
+cfg.add_argument(
     '--debug_unhandled_exceptions',
     action=ActionNoYes,
     default=False,
     help='Break into pdb on top level unhandled exceptions.',
 )
-args.add_argument(
+cfg.add_argument(
     '--show_random_seed',
     action=ActionNoYes,
     default=False,
     help='Should we display (and log.debug) the global random seed?',
 )
-args.add_argument(
+cfg.add_argument(
     '--set_random_seed',
     type=int,
     nargs=1,
@@ -42,32 +41,32 @@ args.add_argument(
     metavar='SEED_INT',
     help='Override the global random seed with a particular number.',
 )
-args.add_argument(
+cfg.add_argument(
     '--dump_all_objects',
     action=ActionNoYes,
     default=False,
     help='Should we dump the Python import tree before main?',
 )
-args.add_argument(
+cfg.add_argument(
     '--audit_import_events',
     action=ActionNoYes,
     default=False,
     help='Should we audit all import events?',
 )
-args.add_argument(
+cfg.add_argument(
     '--run_profiler',
     action=ActionNoYes,
     default=False,
     help='Should we run cProfile on this code?',
 )
-args.add_argument(
+cfg.add_argument(
     '--trace_memory',
     action=ActionNoYes,
     default=False,
     help='Should we record/report on memory utilization?',
 )
 
-original_hook = sys.excepthook
+ORIGINAL_EXCEPTION_HOOK = sys.excepthook
 
 
 def handle_uncaught_exception(exc_type, exc_value, exc_tb):
@@ -77,7 +76,6 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb):
     maybe attaches a debugger.
 
     """
-    global original_hook
     msg = f'Unhandled top level exception {exc_type}'
     logger.exception(msg)
     print(msg, file=sys.stderr)
@@ -87,7 +85,7 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb):
     else:
         if not sys.stderr.isatty() or not sys.stdin.isatty():
             # stdin or stderr is redirected, just do the normal thing
-            original_hook(exc_type, exc_value, exc_tb)
+            ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
         else:
             # a terminal is attached and stderr is not redirected, maybe debug.
             import traceback
@@ -99,7 +97,7 @@ def handle_uncaught_exception(exc_type, exc_value, exc_tb):
                 logger.info("Invoking the debugger...")
                 pdb.pm()
             else:
-                original_hook(exc_type, exc_value, exc_tb)
+                ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
 
 
 class ImportInterceptor(importlib.abc.MetaPathFinder):
@@ -120,17 +118,18 @@ class ImportInterceptor(importlib.abc.MetaPathFinder):
                 fname = 'unknown'
             self.module_by_filename_cache[fname] = mod
 
-    def should_ignore_filename(self, filename: str) -> bool:
+    @staticmethod
+    def should_ignore_filename(filename: str) -> bool:
         return 'importlib' in filename or 'six.py' in filename
 
     def find_module(self, fullname, path):
         raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
 
-    def find_spec(self, loaded_module, path=None, target=None):
+    def find_spec(self, loaded_module, path=None, _=None):
         s = stack()
         for x in range(3, len(s)):
             filename = s[x].filename
-            if self.should_ignore_filename(filename):
+            if ImportInterceptor.should_ignore_filename(filename):
                 continue
 
             loading_function = s[x].function
@@ -171,15 +170,14 @@ class ImportInterceptor(importlib.abc.MetaPathFinder):
 #
 # Also note: move bootstrap up in the global import list to catch
 # more import events and have a more complete record.
-import_interceptor = None
+IMPORT_INTERCEPTOR = None
 for arg in sys.argv:
     if arg == '--audit_import_events':
-        import_interceptor = ImportInterceptor()
-        sys.meta_path.insert(0, import_interceptor)
+        IMPORT_INTERCEPTOR = ImportInterceptor()
+        sys.meta_path.insert(0, IMPORT_INTERCEPTOR)
 
 
 def dump_all_objects() -> None:
-    global import_interceptor
     messages = {}
     all_modules = sys.modules
     for obj in object.__subclasses__():
@@ -199,8 +197,8 @@ def dump_all_objects() -> None:
                 mod_file = mod.__file__
             else:
                 mod_file = 'unknown'
-            if import_interceptor is not None:
-                import_path = import_interceptor.find_importer(mod_name)
+            if IMPORT_INTERCEPTOR is not None:
+                import_path = IMPORT_INTERCEPTOR.find_importer(mod_name)
             else:
                 import_path = 'unknown'
             msg = f'{class_mod_name}::{klass} ({mod_file})'
@@ -245,17 +243,18 @@ def initialize(entry_point):
 
         # Maybe log some info about the python interpreter itself.
         logger.debug(
-            f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}'
+            'Platform: %s, maxint=0x%x, byteorder=%s',
+            sys.platform, sys.maxsize, sys.byteorder
         )
-        logger.debug(f'Python interpreter version: {sys.version}')
-        logger.debug(f'Python implementation: {sys.implementation}')
-        logger.debug(f'Python C API version: {sys.api_version}')
-        logger.debug(f'Python path: {sys.path}')
+        logger.debug('Python interpreter version: %s', sys.version)
+        logger.debug('Python implementation: %s', sys.implementation)
+        logger.debug('Python C API version: %s', sys.api_version)
+        logger.debug('Python path: %s', sys.path)
 
         # Log something about the site_config, many things use it.
         import site_config
 
-        logger.debug(f'Global site_config: {site_config.get_config()}')
+        logger.debug('Global site_config: %s', site_config.get_config())
 
         # Allow programs that don't bother to override the random seed
         # to be replayed via the commandline.
@@ -274,7 +273,7 @@ def initialize(entry_point):
         random.seed(random_seed)
 
         # Do it, invoke the user's code.  Pay attention to how long it takes.
-        logger.debug(f'Starting {entry_point.__name__} (program entry point)')
+        logger.debug('Starting %s (program entry point)', entry_point.__name__)
         ret = None
         import stopwatch
 
@@ -294,7 +293,7 @@ def initialize(entry_point):
             with stopwatch.Timer() as t:
                 ret = entry_point(*args, **kwargs)
 
-        logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.')
+        logger.debug('%s (program entry point) returned %s.', entry_point.__name__, ret)
 
         if config.config['trace_memory']:
             snapshot = tracemalloc.take_snapshot()
@@ -308,27 +307,27 @@ def initialize(entry_point):
             dump_all_objects()
 
         if config.config['audit_import_events']:
-            global import_interceptor
-            if import_interceptor is not None:
-                print(import_interceptor.tree)
+            if IMPORT_INTERCEPTOR is not None:
+                print(IMPORT_INTERCEPTOR.tree)
 
         walltime = t()
         (utime, stime, cutime, cstime, elapsed_time) = os.times()
         logger.debug(
             '\n'
-            f'user: {utime}s\n'
-            f'system: {stime}s\n'
-            f'child user: {cutime}s\n'
-            f'child system: {cstime}s\n'
-            f'machine uptime: {elapsed_time}s\n'
-            f'walltime: {walltime}s'
+            'user: %.4fs\n'
+            'system: %.4fs\n'
+            'child user: %.4fs\n'
+            'child system: %.4fs\n'
+            'machine uptime: %.4fs\n'
+            'walltime: %.4fs',
+            utime, stime, cutime, cstime, elapsed_time, walltime
         )
 
         # If it doesn't return cleanly, call attention to the return value.
         if ret is not None and ret != 0:
-            logger.error(f'Exit {ret}')
+            logger.error('Exit %s', ret)
         else:
-            logger.debug(f'Exit {ret}')
+            logger.debug('Exit %s', ret)
         sys.exit(ret)
 
     return initialize_wrapper
index f5d295b6ed7477815fe502eed82eaea8a2c4831c..c789ed61e19d304391c9d10393bcb22094b0b7d7 100644 (file)
@@ -50,7 +50,7 @@ def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata:
                 weird_orange_count += 1
             elif is_near(pixel[0], 0) and is_near(pixel[1], 0):
                 hs_zero_count += 1
-    logger.debug(f"hszero#={hs_zero_count}, weird_orange={weird_orange_count}")
+    logger.debug("hszero#=%d, weird_orange=%d", hs_zero_count, weird_orange_count)
     return SanityCheckImageMetadata(
         hs_zero_count > (num_pixels * 0.75),
         weird_orange_count > (num_pixels * 0.75),
@@ -67,23 +67,26 @@ def fetch_camera_image_from_video_server(
     url = (
         f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
     )
-    logger.debug(f'Fetching image from {url}')
+    logger.debug('Fetching image from %s', url)
     try:
         response = requests.get(url, stream=False, timeout=10.0)
         if response.ok:
             raw = response.content
-            logger.debug(f'Read {len(response.content)} byte image from HTTP server')
+            logger.debug('Read %d byte image from HTTP server', len(response.content))
             tmp = np.frombuffer(raw, dtype="uint8")
             logger.debug(
-                f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.'
+                'Translated raw content into %s %s with element type %s',
+                tmp.shape, type(tmp), type(tmp[0]),
             )
             jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR)
             logger.debug(
-                f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}'
+                'Decoded into %s jpeg %s with element type %s',
+                jpg.shape, type(jpg), type(jpg[0][0])
             )
             hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV)
             logger.debug(
-                f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}'
+                'Converted JPG into %s HSV HSV %s with element type %s',
+                hsv.shape, type(hsv), type(hsv[0][0])
             )
             (_, is_bad_image) = sanity_check_image(hsv)
             if not is_bad_image:
@@ -125,7 +128,7 @@ def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) -
     """Fetch the raw webcam image straight from the webcam's RTSP stream."""
     hostname = camera_name_to_hostname(camera_name)
     stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
-    logger.debug(f'Fetching image from RTSP stream {stream}')
+    logger.debug('Fetching image from RTSP stream %s', stream)
     try:
         cmd = [
             "/usr/bin/timeout",
index 9df82f61a516b78541feddf864ca49f9ce31668b..c5bc9b5f15200c0f909af2bd5f849be6fa60b809 100644 (file)
@@ -128,14 +128,12 @@ class Unscrambler(object):
 
     # 52 bits
     @staticmethod
-    def _compute_word_fingerprint(word: str, population: Mapping[str, int]) -> int:
+    def _compute_word_fingerprint(population: Mapping[str, int]) -> int:
         fp = 0
         for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
             letter = pair[0]
             if letter in fprint_feature_bit:
-                count = pair[1]
-                if count > 3:
-                    count = 3
+                count = min(pair[1], 3)
                 shift = fprint_feature_bit[letter]
                 s = count << shift
                 fp |= s
@@ -144,25 +142,23 @@ class Unscrambler(object):
     # 32 bits
     @staticmethod
     def _compute_word_letter_sig(
-        letter_sigs: Mapping[str, int],
+        lsigs: Mapping[str, int],
         word: str,
         population: Mapping[str, int],
     ) -> int:
         sig = 0
         for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
             letter = pair[0]
-            if letter not in letter_sigs:
+            if letter not in lsigs:
                 continue
-            s = letter_sigs[letter]
+            s = lsigs[letter]
             count = pair[1]
             if count > 1:
                 s <<= count
                 s |= count
             s &= letters_mask
             sig ^= s
-        length = len(word)
-        if length > 31:
-            length = 31
+        length = min(len(word), 31)
         sig ^= length << 8
         sig &= letters_mask
         return sig
@@ -189,7 +185,7 @@ class Unscrambler(object):
 
         """
         population = list_utils.population_counts(word)
-        fprint = Unscrambler._compute_word_fingerprint(word, population)
+        fprint = Unscrambler._compute_word_fingerprint(population)
         letter_sig = Unscrambler._compute_word_letter_sig(letter_sigs, word, population)
         assert fprint & letter_sig == 0
         sig = fprint | letter_sig
@@ -197,7 +193,7 @@ class Unscrambler(object):
 
     @staticmethod
     def repopulate(
-        letter_sigs: Dict[str, int],
+        lsigs: Dict[str, int],
         dictfile: str = '/usr/share/dict/words',
         indexfile: str = '/usr/share/dict/sparse_index',
     ) -> None:
@@ -211,13 +207,13 @@ class Unscrambler(object):
             for word in f:
                 word = word.replace('\n', '')
                 word = word.lower()
-                sig = Unscrambler.compute_word_sig(letter_sigs, word)
-                logger.debug("%s => 0x%x" % (word, sig))
+                sig = Unscrambler.compute_word_sig(word)
+                logger.debug("%s => 0x%x", word, sig)
                 if word in seen:
                     continue
                 seen.add(word)
                 if sig in words_by_sigs:
-                    words_by_sigs[sig] += ",%s" % word
+                    words_by_sigs[sig] += f",{word}"
                 else:
                     words_by_sigs[sig] = word
         with open(indexfile, 'w') as f:
@@ -252,13 +248,11 @@ class Unscrambler(object):
 
         """
         ret = {}
-        (exact, location) = list_utils.binary_search(self.sigs, sig)
+        (_, location) = list_utils.binary_search(self.sigs, sig)
         start = location - window_size
-        if start < 0:
-            start = 0
+        start = max(start, 0)
         end = location + 1 + window_size
-        if end > len(self.words):
-            end = len(self.words)
+        end = min(end, len(self.words))
 
         for x in range(start, end):
             word = self.words[x]
index 46d7cbe04898c18cd72019c32c4dd1e5545c322e..9247a67891307fb9b93e5695df68347577f86c9b 100644 (file)
@@ -74,7 +74,7 @@ class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTra
             raise Exception(f'Unknown update type {update_id} in {__file__}')
 
     def poll_presence(self, now: datetime.datetime) -> None:
-        logger.debug(f'Checking presence in {self.location} now...')
+        logger.debug('Checking presence in %s now...', self.location)
         self.detector.update()
         if self.detector.is_anyone_in_location_now(self.location):
             self.someone_is_home = True