#!/usr/bin/env python3
+"""This module defines various flavors of Access Control Lists."""
+
import enum
import fnmatch
import logging
def __call__(self, x: Any) -> bool:
"""Returns True if x is allowed, False otherwise."""
- logger.debug(f'SimpleACL checking {x}')
+ logger.debug('SimpleACL checking %s', x)
if self.order_to_check_allow_deny == Order.ALLOW_DENY:
logger.debug('Checking allowed first...')
if self.check_allowed(x):
- logger.debug(f'{x} was allowed explicitly.')
+ logger.debug('%s was allowed explicitly.', x)
return True
logger.debug('Checking denied next...')
if self.check_denied(x):
- logger.debug(f'{x} was denied explicitly.')
+ logger.debug('%s was denied explicitly.', x)
return False
elif self.order_to_check_allow_deny == Order.DENY_ALLOW:
logger.debug('Checking denied first...')
if self.check_denied(x):
- logger.debug(f'{x} was denied explicitly.')
+ logger.debug('%s was denied explicitly.', x)
return False
if self.check_allowed(x):
- logger.debug(f'{x} was allowed explicitly.')
+ logger.debug('%s was allowed explicitly.', x)
return True
logger.debug(
#!/usr/bin/env python3
+"""A bunch of color names mapped into RGB tuples and some methods for
+setting the text color, background, etc... using ANSI escape
+sequences."""
+
import difflib
import io
import logging
from overrides import overrides
import logging_utils
+import string_utils
+
logger = logging.getLogger(__name__)
def is_16color(num: int) -> bool:
- return num == 255 or num == 128
+ return num in (255, 128)
def is_216color(num: int) -> bool:
b'G1szODs1OzIxbQ==\\n'
"""
- import string_utils
-
if name is not None and name == 'reset':
return '\033[39m'
(0, 0, 0)
"""
- import string_utils
-
if name is not None and string_utils.is_full_string(name):
rgb = _find_color_by_name(name)
else:
max_ratio = r
best_guess = possibility
assert best_guess is not None
- logger.debug(f"Best guess at color name is {best_guess}")
+ logger.debug("Best guess at color name is %s", best_guess)
return best_guess
b'G1s0ODs1OzE5Nm0=\\n'
"""
- import string_utils
-
if name is not None and name == 'reset':
return '\033[49m'
class StdoutInterceptor(io.TextIOBase):
+ """An interceptor for data written to stdout. Use as a context.
+
+ """
def __init__(self):
+ super().__init__()
self.saved_stdout: io.TextIO = None
self.buf = ''
class ProgrammableColorizer(StdoutInterceptor):
+ """A colorizing interceptor; pass it re.Patterns -> methods that do
+ something (usually add color to) the match.
+
+ """
def __init__(
self,
patterns: Iterable[Tuple[re.Pattern, Callable[[Any, re.Pattern], str]]],
):
super().__init__()
- self.patterns = [_ for _ in patterns]
+ self.patterns = list(patterns)
@overrides
def write(self, s: str):
if __name__ == '__main__':
-
def main() -> None:
import doctest
#!/usr/bin/python3
+"""Helpers for commandline argument parsing."""
+
import argparse
import datetime
import logging
class ActionNoYes(argparse.Action):
+ """An argparse Action that allows for commandline arguments like this:
+
+ cfg.add_argument(
+ '--enable_the_thing',
+ action=ActionNoYes,
+ default=False,
+ help='Should we enable the thing?'
+ )
+
+ This creates cmdline arguments:
+
+ --enable_the_thing
+ --no_enable_the_thing
+
+ """
def __init__(self, option_strings, dest, default=None, required=False, help=None):
if default is None:
msg = 'You must provide a default with Yes/No action'
logger.critical(msg)
raise ValueError(msg)
if len(option_strings) != 1:
- msg = 'Only single argument is allowed with YesNo action'
+ msg = 'Only single argument is allowed with NoYes action'
logger.critical(msg)
raise ValueError(msg)
opt = option_strings[0]
try:
return to_bool(v)
- except Exception:
- raise argparse.ArgumentTypeError(v)
+ except Exception as e:
+ raise argparse.ArgumentTypeError(v) from e
def valid_ip(ip: str) -> str:
try:
secs = parse_duration(txt)
- except Exception as e:
- raise argparse.ArgumentTypeError(e)
- finally:
return datetime.timedelta(seconds=secs)
+ except Exception as e:
+ logger.exception(e)
+ raise argparse.ArgumentTypeError(e) from e
if __name__ == '__main__':
if len(self.state) < config.config['arper_min_entries_to_be_valid']:
raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.')
if cached_supplimental_state is not None:
- logger.debug(f'Also added {len(cached_supplimental_state)} supplimental entries.')
+ logger.debug('Also added %d supplimental entries.', len(cached_supplimental_state))
for mac, ip in cached_supplimental_state.items():
self.state[mac] = ip
for mac, ip in self.state.items():
- logger.debug(f'{mac} <-> {ip}')
+ logger.debug('%s <-> %s', mac, ip)
def update_from_arp_scan(self):
network_spec = site_config.get_config().network
mac = string_utils.extract_mac_address(line)
if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
mac = mac.lower()
- logger.debug(f'ARPER: {mac} => {ip}')
+ logger.debug('ARPER: %s => %s', mac, ip)
self.state[mac] = ip
def update_from_arp(self):
mac = string_utils.extract_mac_address(line)
if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
mac = mac.lower()
- logger.debug(f'ARPER: {mac} => {ip}')
+ logger.debug('ARPER: %s => %s', mac, ip)
self.state[mac] = ip
def get_ip_by_mac(self, mac: str) -> Optional[str]:
return self.state.inverse.get(ip, None)
@classmethod
- def load_state(cls, cache_file: str, freshness_threshold_sec: int, state: BiDict):
+ def load_state(
+ cls,
+ cache_file: str,
+ freshness_threshold_sec: int,
+ state: BiDict,
+ ):
if not file_utils.file_is_readable(cache_file):
- logger.debug(f'Can\'t read {cache_file}')
- return None
+ logger.debug('Can\'t read %s', cache_file)
+ return
if persistent.was_file_written_within_n_seconds(
cache_file,
freshness_threshold_sec,
):
- logger.debug(f'Loading state from {cache_file}')
+ logger.debug('Loading state from %s', cache_file)
count = 0
with open(cache_file, 'r') as rf:
contents = rf.readlines()
for line in contents:
line = line[:-1]
- logger.debug(f'ARPER:{cache_file}> {line}')
+ logger.debug('ARPER:%s> %s', cache_file, line)
(mac, ip) = line.split(',')
mac = mac.strip()
mac = mac.lower()
state[mac] = ip
count += 1
else:
- logger.debug(f'{cache_file} is too stale.')
+ logger.debug('%s is too stale.', cache_file)
@classmethod
@overrides
def load(cls) -> Any:
- local_state = BiDict()
+ local_state: BiDict = BiDict()
cache_file = config.config['arper_cache_location']
max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
- logger.debug(f'Trying to load main arper cache from {cache_file}...')
+ logger.debug('Trying to load main arper cache from %s...', cache_file)
cls.load_state(cache_file, max_staleness, local_state)
if len(local_state) <= config.config['arper_min_entries_to_be_valid']:
msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.'
except Exception:
pass
- supplimental_state = BiDict()
+ supplimental_state: BiDict = BiDict()
cache_file = config.config['arper_supplimental_cache_location']
max_staleness = config.config['arper_cache_max_staleness'].total_seconds()
- logger.debug(f'Trying to suppliment arper state from {cache_file}...')
+ logger.debug('Trying to suppliment arper state from %s', cache_file)
cls.load_state(cache_file, max_staleness, supplimental_state)
- if len(supplimental_state) == 0:
- supplimental_state = None
-
if len(local_state) > 0:
return cls(local_state, supplimental_state)
return None
@overrides
def save(self) -> bool:
if len(self.state) > config.config['arper_min_entries_to_be_valid']:
- logger.debug(f'Persisting state to {config.config["arper_cache_location"]}')
+ logger.debug('Persisting state to %s', config.config["arper_cache_location"])
with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
for (mac, ip) in self.state.items():
mac = mac.lower()
return True
else:
logger.warning(
- f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.'
+ 'Only saw %d entries; needed at least %d to bother persisting.',
+ len(self.state),
+ config.config["arper_min_entries_to_be_valid"],
)
return False
],
}
self.run_location = site_config.get_location()
- logger.debug(f"run_location is {self.run_location}")
+ logger.debug("base_presence run_location is %s", self.run_location)
self.weird_mac_at_cabin = False
self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
self.names_by_mac: Dict[str, str] = {}
> config.config['presence_tolerable_staleness_seconds'].total_seconds()
):
logger.debug(
- f"It's been {delta.total_seconds()}s since last update; refreshing now."
+ "It's been %ss since last update; refreshing now.", delta.total_seconds()
)
self.update()
line = line.strip()
if len(line) == 0:
continue
- logger.debug(f'{location}> {line}')
+ logger.debug('%s> %s', location, line)
if "cabin_" in line:
continue
if location == Location.CABIN:
- logger.debug(f'Cabin count: {cabin_count}')
+ logger.debug('Cabin count: %d', cabin_count)
cabin_count += 1
try:
- (mac, count, ip_name, mfg, ts) = line.split(",")
+ (mac, _, ip_name, _, ts) = line.split(",") # type: ignore
except Exception as e:
- logger.error(f'SKIPPED BAD LINE> {line}')
logger.exception(e)
+ logger.error('SKIPPED BAD LINE> %s', line)
continue
mac = mac.strip()
(self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
- logger.debug(f'Looking for {name}...')
+ logger.debug('Looking for %s...', name)
if name is Person.UNKNOWN:
if self.weird_mac_at_cabin:
votes: Dict[Location, int] = {}
tiebreaks: Dict[Location, datetime.datetime] = {}
credit = 10000
+ location = None
for mac in self.devices_by_person[name]:
if mac not in self.names_by_mac:
continue
mac_name = self.names_by_mac[mac]
- logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
+ logger.debug(
+ 'Looking for %s... check for mac %s (%s)',
+ name, mac, mac_name
+ )
for location in self.location_ts_by_mac:
if mac in self.location_ts_by_mac[location]:
ts = (self.location_ts_by_mac[location])[mac]
- logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
+ logger.debug(
+ 'Seen %s (%s) at %s since %s',
+ mac, mac_name, location, ts
+ )
tiebreaks[location] = ts
(
most_recent_location,
- first_seen_ts,
+ _,
) = dict_utils.item_with_max_value(tiebreaks)
bonus = credit
v = votes.get(most_recent_location, 0)
votes[most_recent_location] = v + bonus
- logger.debug(f'{name}: {location} gets {bonus} votes.')
+ logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
credit = int(credit * 0.2) # Note: list most important devices first
if credit <= 0:
credit = 1
if len(votes) > 0:
(location, value) = dict_utils.item_with_max_value(votes)
if value > 2001:
+ assert location
return location
return Location.UNKNOWN
import os
import sys
from inspect import stack
-from typing import List
import config
import logging_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(
+cfg = config.add_commandline_args(
f'Bootstrap ({__file__})',
'Args related to python program bootstrapper and Swiss army knife',
)
-args.add_argument(
+cfg.add_argument(
'--debug_unhandled_exceptions',
action=ActionNoYes,
default=False,
help='Break into pdb on top level unhandled exceptions.',
)
-args.add_argument(
+cfg.add_argument(
'--show_random_seed',
action=ActionNoYes,
default=False,
help='Should we display (and log.debug) the global random seed?',
)
-args.add_argument(
+cfg.add_argument(
'--set_random_seed',
type=int,
nargs=1,
metavar='SEED_INT',
help='Override the global random seed with a particular number.',
)
-args.add_argument(
+cfg.add_argument(
'--dump_all_objects',
action=ActionNoYes,
default=False,
help='Should we dump the Python import tree before main?',
)
-args.add_argument(
+cfg.add_argument(
'--audit_import_events',
action=ActionNoYes,
default=False,
help='Should we audit all import events?',
)
-args.add_argument(
+cfg.add_argument(
'--run_profiler',
action=ActionNoYes,
default=False,
help='Should we run cProfile on this code?',
)
-args.add_argument(
+cfg.add_argument(
'--trace_memory',
action=ActionNoYes,
default=False,
help='Should we record/report on memory utilization?',
)
-original_hook = sys.excepthook
+ORIGINAL_EXCEPTION_HOOK = sys.excepthook
def handle_uncaught_exception(exc_type, exc_value, exc_tb):
maybe attaches a debugger.
"""
- global original_hook
msg = f'Unhandled top level exception {exc_type}'
logger.exception(msg)
print(msg, file=sys.stderr)
else:
if not sys.stderr.isatty() or not sys.stdin.isatty():
# stdin or stderr is redirected, just do the normal thing
- original_hook(exc_type, exc_value, exc_tb)
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
else:
# a terminal is attached and stderr is not redirected, maybe debug.
import traceback
logger.info("Invoking the debugger...")
pdb.pm()
else:
- original_hook(exc_type, exc_value, exc_tb)
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
class ImportInterceptor(importlib.abc.MetaPathFinder):
fname = 'unknown'
self.module_by_filename_cache[fname] = mod
- def should_ignore_filename(self, filename: str) -> bool:
+ @staticmethod
+ def should_ignore_filename(filename: str) -> bool:
return 'importlib' in filename or 'six.py' in filename
def find_module(self, fullname, path):
raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
- def find_spec(self, loaded_module, path=None, target=None):
+ def find_spec(self, loaded_module, path=None, _=None):
s = stack()
for x in range(3, len(s)):
filename = s[x].filename
- if self.should_ignore_filename(filename):
+ if ImportInterceptor.should_ignore_filename(filename):
continue
loading_function = s[x].function
#
# Also note: move bootstrap up in the global import list to catch
# more import events and have a more complete record.
-import_interceptor = None
+IMPORT_INTERCEPTOR = None
for arg in sys.argv:
if arg == '--audit_import_events':
- import_interceptor = ImportInterceptor()
- sys.meta_path.insert(0, import_interceptor)
+ IMPORT_INTERCEPTOR = ImportInterceptor()
+ sys.meta_path.insert(0, IMPORT_INTERCEPTOR)
def dump_all_objects() -> None:
- global import_interceptor
messages = {}
all_modules = sys.modules
for obj in object.__subclasses__():
mod_file = mod.__file__
else:
mod_file = 'unknown'
- if import_interceptor is not None:
- import_path = import_interceptor.find_importer(mod_name)
+ if IMPORT_INTERCEPTOR is not None:
+ import_path = IMPORT_INTERCEPTOR.find_importer(mod_name)
else:
import_path = 'unknown'
msg = f'{class_mod_name}::{klass} ({mod_file})'
# Maybe log some info about the python interpreter itself.
logger.debug(
- f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}'
+ 'Platform: %s, maxint=0x%x, byteorder=%s',
+ sys.platform, sys.maxsize, sys.byteorder
)
- logger.debug(f'Python interpreter version: {sys.version}')
- logger.debug(f'Python implementation: {sys.implementation}')
- logger.debug(f'Python C API version: {sys.api_version}')
- logger.debug(f'Python path: {sys.path}')
+ logger.debug('Python interpreter version: %s', sys.version)
+ logger.debug('Python implementation: %s', sys.implementation)
+ logger.debug('Python C API version: %s', sys.api_version)
+ logger.debug('Python path: %s', sys.path)
# Log something about the site_config, many things use it.
import site_config
- logger.debug(f'Global site_config: {site_config.get_config()}')
+ logger.debug('Global site_config: %s', site_config.get_config())
# Allow programs that don't bother to override the random seed
# to be replayed via the commandline.
random.seed(random_seed)
# Do it, invoke the user's code. Pay attention to how long it takes.
- logger.debug(f'Starting {entry_point.__name__} (program entry point)')
+ logger.debug('Starting %s (program entry point)', entry_point.__name__)
ret = None
import stopwatch
with stopwatch.Timer() as t:
ret = entry_point(*args, **kwargs)
- logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.')
+ logger.debug('%s (program entry point) returned %s.', entry_point.__name__, ret)
if config.config['trace_memory']:
snapshot = tracemalloc.take_snapshot()
dump_all_objects()
if config.config['audit_import_events']:
- global import_interceptor
- if import_interceptor is not None:
- print(import_interceptor.tree)
+ if IMPORT_INTERCEPTOR is not None:
+ print(IMPORT_INTERCEPTOR.tree)
walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
logger.debug(
'\n'
- f'user: {utime}s\n'
- f'system: {stime}s\n'
- f'child user: {cutime}s\n'
- f'child system: {cstime}s\n'
- f'machine uptime: {elapsed_time}s\n'
- f'walltime: {walltime}s'
+ 'user: %.4fs\n'
+ 'system: %.4fs\n'
+ 'child user: %.4fs\n'
+ 'child system: %.4fs\n'
+ 'machine uptime: %.4fs\n'
+ 'walltime: %.4fs',
+ utime, stime, cutime, cstime, elapsed_time, walltime
)
# If it doesn't return cleanly, call attention to the return value.
if ret is not None and ret != 0:
- logger.error(f'Exit {ret}')
+ logger.error('Exit %s', ret)
else:
- logger.debug(f'Exit {ret}')
+ logger.debug('Exit %s', ret)
sys.exit(ret)
return initialize_wrapper
weird_orange_count += 1
elif is_near(pixel[0], 0) and is_near(pixel[1], 0):
hs_zero_count += 1
- logger.debug(f"hszero#={hs_zero_count}, weird_orange={weird_orange_count}")
+ logger.debug("hszero#=%d, weird_orange=%d", hs_zero_count, weird_orange_count)
return SanityCheckImageMetadata(
hs_zero_count > (num_pixels * 0.75),
weird_orange_count > (num_pixels * 0.75),
url = (
f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
)
- logger.debug(f'Fetching image from {url}')
+ logger.debug('Fetching image from %s', url)
try:
response = requests.get(url, stream=False, timeout=10.0)
if response.ok:
raw = response.content
- logger.debug(f'Read {len(response.content)} byte image from HTTP server')
+ logger.debug('Read %d byte image from HTTP server', len(response.content))
tmp = np.frombuffer(raw, dtype="uint8")
logger.debug(
- f'Translated raw content into {tmp.shape} {type(tmp)} with element type {type(tmp[0])}.'
+ 'Translated raw content into %s %s with element type %s',
+ tmp.shape, type(tmp), type(tmp[0]),
)
jpg = cv2.imdecode(tmp, cv2.IMREAD_COLOR)
logger.debug(
- f'Decoded into {jpg.shape} jpeg {type(jpg)} with element type {type(jpg[0][0])}'
+ 'Decoded into %s jpeg %s with element type %s',
+ jpg.shape, type(jpg), type(jpg[0][0])
)
hsv = cv2.cvtColor(jpg, cv2.COLOR_BGR2HSV)
logger.debug(
- f'Converted JPG into HSV {hsv.shape} HSV {type(hsv)} with element type {type(hsv[0][0])}'
+ 'Converted JPG into %s HSV HSV %s with element type %s',
+ hsv.shape, type(hsv), type(hsv[0][0])
)
(_, is_bad_image) = sanity_check_image(hsv)
if not is_bad_image:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
hostname = camera_name_to_hostname(camera_name)
stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
- logger.debug(f'Fetching image from RTSP stream {stream}')
+ logger.debug('Fetching image from RTSP stream %s', stream)
try:
cmd = [
"/usr/bin/timeout",
# 52 bits
@staticmethod
- def _compute_word_fingerprint(word: str, population: Mapping[str, int]) -> int:
+ def _compute_word_fingerprint(population: Mapping[str, int]) -> int:
fp = 0
for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
letter = pair[0]
if letter in fprint_feature_bit:
- count = pair[1]
- if count > 3:
- count = 3
+ count = min(pair[1], 3)
shift = fprint_feature_bit[letter]
s = count << shift
fp |= s
# 32 bits
@staticmethod
def _compute_word_letter_sig(
- letter_sigs: Mapping[str, int],
+ lsigs: Mapping[str, int],
word: str,
population: Mapping[str, int],
) -> int:
sig = 0
for pair in sorted(population.items(), key=lambda x: x[1], reverse=True):
letter = pair[0]
- if letter not in letter_sigs:
+ if letter not in lsigs:
continue
- s = letter_sigs[letter]
+ s = lsigs[letter]
count = pair[1]
if count > 1:
s <<= count
s |= count
s &= letters_mask
sig ^= s
- length = len(word)
- if length > 31:
- length = 31
+ length = min(len(word), 31)
sig ^= length << 8
sig &= letters_mask
return sig
"""
population = list_utils.population_counts(word)
- fprint = Unscrambler._compute_word_fingerprint(word, population)
+ fprint = Unscrambler._compute_word_fingerprint(population)
letter_sig = Unscrambler._compute_word_letter_sig(letter_sigs, word, population)
assert fprint & letter_sig == 0
sig = fprint | letter_sig
@staticmethod
def repopulate(
- letter_sigs: Dict[str, int],
+ lsigs: Dict[str, int],
dictfile: str = '/usr/share/dict/words',
indexfile: str = '/usr/share/dict/sparse_index',
) -> None:
for word in f:
word = word.replace('\n', '')
word = word.lower()
- sig = Unscrambler.compute_word_sig(letter_sigs, word)
- logger.debug("%s => 0x%x" % (word, sig))
+ sig = Unscrambler.compute_word_sig(word)
+ logger.debug("%s => 0x%x", word, sig)
if word in seen:
continue
seen.add(word)
if sig in words_by_sigs:
- words_by_sigs[sig] += ",%s" % word
+ words_by_sigs[sig] += f",{word}"
else:
words_by_sigs[sig] = word
with open(indexfile, 'w') as f:
"""
ret = {}
- (exact, location) = list_utils.binary_search(self.sigs, sig)
+ (_, location) = list_utils.binary_search(self.sigs, sig)
start = location - window_size
- if start < 0:
- start = 0
+ start = max(start, 0)
end = location + 1 + window_size
- if end > len(self.words):
- end = len(self.words)
+ end = min(end, len(self.words))
for x in range(start, end):
word = self.words[x]
raise Exception(f'Unknown update type {update_id} in {__file__}')
def poll_presence(self, now: datetime.datetime) -> None:
- logger.debug(f'Checking presence in {self.location} now...')
+ logger.debug('Checking presence in %s now...', self.location)
self.detector.update()
if self.detector.is_anyone_in_location_now(self.location):
self.someone_is_home = True