#!/usr/bin/env python3
+"""Helper methods dealing with functions."""
+
from typing import Callable
#!/usr/bin/env python3
+"""A module to serve as a local client library around HTTP calls to
+the Google Assistant via a local gateway."""
+
import logging
-import sys
import warnings
from typing import NamedTuple, Optional
class GoogleResponse(NamedTuple):
+ """A response wrapper."""
+
success: bool
response: str
audio_url: str
is True, perform speech recognition on the audio response from Google so as
to translate it into text (best effort, YMMV).
"""
- logging.debug(f"Asking google: '{cmd}'")
+ logging.debug("Asking google: '%s'", cmd)
payload = {
"command": cmd,
"user": config.config['google_assistant_username'],
if success:
logger.debug('Google request succeeded.')
if len(response) > 0:
- logger.debug(f"Google said: '{response}'")
+ logger.debug("Google said: '%s'", response)
audio = f"{config.config['google_assistant_bridge']}{j['audio']}"
if recognize_speech:
recognizer = sr.Recognizer()
audio_transcription = recognizer.recognize_google(
speech,
)
- logger.debug(f"Transcription: '{audio_transcription}'")
+ logger.debug("Transcription: '%s'", audio_transcription)
except sr.UnknownValueError as e:
logger.exception(e)
msg = 'Unable to parse Google assistant\'s response.'
audio_url=audio,
audio_transcription=audio_transcription,
)
- sys.exit(-1)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+"""A text-based simple histogram helper class."""
+
import math
-from numbers import Number
from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
T = TypeVar("T", int, float)
class SimpleHistogram(Generic[T]):
+ """A simple histogram."""
+
# Useful in defining wide open bottom/top bucket bounds:
POSITIVE_INFINITY = math.inf
NEGATIVE_INFINITY = -math.inf
#!/usr/bin/env python3
+"""A helper class for generating thread safe monotonically increasing
+id numbers."""
+
import itertools
import logging
if name not in generators:
generators[name] = itertools.count(start, 1)
x = next(generators[name])
- logger.debug(f"Generated next id {x}")
+ logger.debug("Generated next id %d", x)
return x
try:
while True:
response = readchar.readchar()
- logger.debug(f'Keystroke: {ord(response)}')
+ logger.debug('Keystroke: 0x%x', ord(response))
if response in valid_responses:
break
if ord(response) in os_special_keystrokes:
#!/usr/bin/env python3
+"""A simple compression helper for lowercase ascii text."""
+
import bitstring
from collect.bidict import BiDict
"""
compressed = bitstring.BitArray()
- for (n, letter) in enumerate(uncompressed):
+ for letter in uncompressed:
if 'a' <= letter <= 'z':
bits = ord(letter) - ord('a') + 1 # 1..26
else:
if letter not in special_characters:
- raise Exception(
- f'"{uncompressed}" contains uncompressable char="{letter}"'
- )
+ raise Exception(f'"{uncompressed}" contains uncompressable char="{letter}"')
bits = special_characters[letter]
compressed.append(f"uint:5={bits}")
while len(compressed) % 8 != 0:
#!/usr/bin/env python3
+"""Some useful(?) utilities for dealing with Lists."""
+
from collections import Counter
from itertools import islice
-from typing import Any, Iterator, List, Mapping, Sequence, Tuple
+from typing import Any, Iterator, List, Sequence, Tuple
def shard(lst: List[Any], size: int) -> Iterator[Any]:
yield from _permute(cdr, path + car)
-def binary_search(
- lst: Sequence[Any], target: Any, *, sanity_check=False
-) -> Tuple[bool, int]:
+def binary_search(lst: Sequence[Any], target: Any, *, sanity_check=False) -> Tuple[bool, int]:
"""Performs a binary search on lst (which must already be sorted).
Returns a Tuple composed of a bool which indicates whether the
target was found and an int which indicates the index closest to
return _binary_search(lst, target, 0, len(lst) - 1)
-def _binary_search(
- lst: Sequence[Any], target: Any, low: int, high: int
-) -> Tuple[bool, int]:
+def _binary_search(lst: Sequence[Any], target: Any, low: int, high: int) -> Tuple[bool, int]:
if high >= low:
mid = (high + low) // 2
if lst[mid] == target:
#!/usr/bin/env python3
+"""File-based locking helper."""
+
import datetime
import json
import logging
class LockFileException(Exception):
+ """An exception related to lock files."""
+
pass
@dataclass
class LockFileContents:
+ """The contents we'll write to each lock file."""
+
pid: int
commandline: str
expiration_timestamp: Optional[float]
expiration_timestamp: Optional[float] = None,
override_command: Optional[str] = None,
) -> None:
- self.is_locked = False
- self.lockfile = lockfile_path
- self.override_command = override_command
+ self.is_locked: bool = False
+ self.lockfile: str = lockfile_path
+ self.locktime: Optional[int] = None
+ self.override_command: Optional[str] = override_command
if do_signal_cleanup:
signal.signal(signal.SIGINT, self._signal)
signal.signal(signal.SIGTERM, self._signal)
return not os.path.exists(self.lockfile)
def try_acquire_lock_once(self) -> bool:
- logger.debug(f"Trying to acquire {self.lockfile}.")
+ logger.debug("Trying to acquire %s.", self.lockfile)
try:
# Attempt to create the lockfile. These flags cause
# os.open to raise an OSError if the file already
contents = self._get_lockfile_contents()
logger.debug(contents)
f.write(contents)
- logger.debug(f'Success; I own {self.lockfile}.')
+ logger.debug('Success; I own %s.', self.lockfile)
self.is_locked = True
return True
except OSError:
pass
- msg = f'Could not acquire {self.lockfile}.'
- logger.warning(msg)
+ logger.warning('Couldn\'t acquire %s.', self.lockfile)
return False
def acquire_with_retries(
logger.warning(msg)
raise LockFileException(msg)
- def __exit__(self, type, value, traceback):
+ def __exit__(self, _, value, traceback):
if self.locktime:
ts = datetime.datetime.now().timestamp()
duration = ts - self.locktime
line = lines[0]
line_dict = json.loads(line)
contents = LockFileContents(**line_dict)
- logger.debug(f'Blocking lock contents="{contents}"')
+ logger.debug('Blocking lock contents="%s"', contents)
# Does the PID exist still?
try:
import sys
from logging.config import fileConfig
from logging.handlers import RotatingFileHandler, SysLogHandler
-from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional
+from typing import Any, Callable, Dict, Iterable, List, Optional
import pytz
from overrides import overrides
),
)
-built_in_print = print
-logging_initialized = False
+BUILT_IN_PRINT = print
+LOGGING_INITIALIZED = False
# A map from logging_callsite_id -> count of logged messages.
s = ct.strftime(datefmt)
else:
t = ct.strftime("%Y-%m-%d %H:%M:%S")
- s = "%s,%03d" % (t, record.msecs)
+ s = f"{t},{record.msecs:%03d}"
return s
def initialize_logging(logger=None) -> logging.Logger:
- global logging_initialized
- if logging_initialized:
+ global LOGGING_INITIALIZED
+ if LOGGING_INITIALIZED:
return logging.getLogger()
- logging_initialized = True
+ LOGGING_INITIALIZED = True
if logger is None:
logger = logging.getLogger()
# Global default logging level (--logging_level)
default_logging_level = getattr(logging, config.config['logging_level'].upper(), None)
if not isinstance(default_logging_level, int):
- raise ValueError('Invalid level: %s' % config.config['logging_level'])
+ raise ValueError(f'Invalid level: {config.config["logging_level"]}')
if config.config['logging_format']:
fmt = config.config['logging_format']
logger.propagate = False
if config.config['logging_captures_prints']:
- global built_in_print
import builtins
def print_and_also_log(*arg, **kwarg):
logger.warning(*arg)
else:
logger.info(*arg)
- built_in_print(*arg, **kwarg)
+ BUILT_IN_PRINT(*arg, **kwarg)
builtins.print = print_and_also_log
self.h: Optional[List[Any]] = None
if handles is not None:
- self.h = [handle for handle in handles]
+ self.h = list(handles)
else:
if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
raise ValueError("Handle argument is required if bitv & FILEHANDLES")
#!/usr/bin/env python3
+"""This is a module concerned with the creation of and searching of a
+corpus of documents. The corpus is held in memory for fast
+searching."""
+
from __future__ import annotations
import enum
"""An error encountered while parsing a logical search expression."""
def __init__(self, message: str):
+ super().__init__()
self.message = message
#!/usr/bin/env python3
+"""Mathematical helpers."""
+
import functools
import math
from heapq import heappop, heappush
3.141
"""
- assert decimals > 0 and decimals < 10
- multiplier = 10 ** decimals
+ assert 0 < decimals < 10
+ multiplier = 10**decimals
return int(n * multiplier) / multiplier
#!/usr/bin/env python3
+"""Miscellaneous utilities."""
+
import os
class Method(Enum):
+ """How should we parallelize; by threads, processes or remote workers?"""
+
THREAD = 1
PROCESS = 2
REMOTE = 3
#!/usr/bin/env python3
+"""A Persistent is just a class with a load and save method. This
+module defines the Persistent base and a decorator that can be used to
+create a persistent singleton that autoloads and autosaves."""
+
import atexit
import datetime
import enum
# memory.
if self.instance is not None:
logger.debug(
- f'Returning already instantiated singleton instance of {cls.__name__}.'
+ 'Returning already instantiated singleton instance of %s.', cls.__name__
)
return self.instance
# Otherwise, try to load it from persisted state.
was_loaded = False
- logger.debug(f'Attempting to load {cls.__name__} from persisted state.')
+ logger.debug('Attempting to load %s from persisted state.', cls.__name__)
self.instance = cls.load()
if not self.instance:
msg = 'Loading from cache failed.'
logger.warning(msg)
- logger.debug(f'Attempting to instantiate {cls.__name__} directly.')
+ logger.debug('Attempting to instantiate %s directly.', cls.__name__)
self.instance = cls(*args, **kwargs)
else:
- logger.debug(f'Class {cls.__name__} was loaded from persisted state successfully.')
+ logger.debug('Class %s was loaded from persisted state successfully.', cls.__name__)
was_loaded = True
assert self.instance is not None
#!/usr/bin/env python3
+"""A helper to identify and optionally obscure some bad words."""
+
import logging
import random
import re
@decorator_utils.singleton
class ProfanityFilter(object):
+ """A helper to identify and optionally obscure some bad words."""
+
def __init__(self):
self.bad_words = set(
[
chunks = [self.stemmer.stem(word) for word in nltk.word_tokenize(result)]
return ' '.join(chunks)
- def tokenize(self, text: str):
+ @staticmethod
+ def tokenize(text: str):
for x in nltk.word_tokenize(text):
for y in re.split(r'\W+', x):
yield y
False
"""
- words = [word for word in self.tokenize(text)]
+ words = list(self.tokenize(text))
for word in words:
if self.is_bad_word(word):
- logger.debug(f'"{word}" is profanity')
+ logger.debug('"%s" is profanity', word)
return True
if len(words) > 1:
for bigram in string_utils.ngrams_presplit(words, 2):
bigram = ' '.join(bigram)
if self.is_bad_word(bigram):
- logger.debug(f'"{bigram}" is profanity')
+ logger.debug('"%s" is profanity', bigram)
return True
if len(words) > 2:
for trigram in string_utils.ngrams_presplit(words, 3):
trigram = ' '.join(trigram)
if self.is_bad_word(trigram):
- logger.debug(f'"{trigram}" is profanity')
+ logger.debug('"%s" is profanity', trigram)
return True
return False
break
return out
- words = [x for x in self.tokenize(text)]
+ words = list(self.tokenize(text))
words.append('')
words.append('')
words.append('')
for ancestor in ancestors:
name = ancestor.name()
pid = ancestor.pid
- logger.debug(f'Ancestor process {name} (pid={pid})')
+ logger.debug('Ancestor process %s (pid=%d)', name, pid)
if 'ssh' in name.lower():
saw_sshd = True
break
if not saw_sshd:
- logger.error(
- 'Did not see sshd in our ancestors list?! Committing suicide.'
- )
+ logger.error('Did not see sshd in our ancestors list?! Committing suicide.')
os.system('pstree')
os.kill(os.getpid(), signal.SIGTERM)
time.sleep(5.0)
if config.config['watch_for_cancel']:
(thread, stop_thread) = watch_for_cancel()
- logger.debug(f'Reading {in_file}.')
+ logger.debug('Reading %s.', in_file)
try:
with open(in_file, 'rb') as rb:
serialized = rb.read()
except Exception as e:
logger.exception(e)
- logger.critical(f'Problem reading {in_file}. Aborting.')
+ logger.critical('Problem reading %s. Aborting.', in_file)
cleanup_and_exit(thread, stop_thread, 1)
- logger.debug(f'Deserializing {in_file}.')
+ logger.debug('Deserializing %s', in_file)
try:
fun, args, kwargs = cloudpickle.loads(serialized)
except Exception as e:
logger.exception(e)
- logger.critical(f'Problem deserializing {in_file}. Aborting.')
+ logger.critical('Problem deserializing %s. Aborting.', in_file)
cleanup_and_exit(thread, stop_thread, 2)
logger.debug('Invoking user code...')
with Timer() as t:
ret = fun(*args, **kwargs)
- logger.debug(f'User code took {t():.1f}s')
+ logger.debug('User code took %.1fs', t())
logger.debug('Serializing results')
try:
serialized = cloudpickle.dumps(ret)
except Exception as e:
logger.exception(e)
- logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
+ logger.critical('Could not serialize result (%s). Aborting.', type(ret))
cleanup_and_exit(thread, stop_thread, 3)
- logger.debug(f'Writing {out_file}.')
+ logger.debug('Writing %s', out_file)
try:
with open(out_file, 'wb') as wb:
wb.write(serialized)
except Exception as e:
logger.exception(e)
- logger.critical(f'Error writing {out_file}. Aborting.')
+ logger.critical('Error writing %s. Aborting.', out_file)
cleanup_and_exit(thread, stop_thread, 4)
cleanup_and_exit(thread, stop_thread, 0)
#!/usr/bin/env python3
+"""Location/site dependent data."""
+
import logging
import platform
from dataclasses import dataclass
@dataclass
class SiteConfig(object):
+ """The set of information specific to where the program is running."""
+
location_name: str
location: Location
network: str
if location_override is None or location_override == 'NONE':
location = this_location()
else:
- logger.debug(f'site_config\'s location_override was set to: {location_override}')
+ logger.debug('site_config\'s location_override was set to: %s', location_override)
location = location_override
return location
#!/usr/bin/env python3
+"""A future that can be treated like the result that it contains and
+will not block until it is used. At that point, if the underlying
+value is not yet available, it will block until it becomes
+available."""
+
from __future__ import annotations
import concurrent
import concurrent.futures as fut
import logging
-import traceback
from typing import Callable, List, Set, TypeVar
from overrides import overrides
smart_future_by_real_future = {}
completed_futures: Set[fut.Future] = set()
for x in futures:
- assert type(x) == SmartFuture
+ assert isinstance(x, SmartFuture)
real_futures.append(x.wrapped_future)
smart_future_by_real_future[x.wrapped_future] = x
if log_exceptions and not f.cancelled():
exception = f.exception()
if exception is not None:
- logger.warning(
- f'Future {id(f)} raised an unhandled exception and exited.'
- )
+ logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
logger.exception(exception)
raise exception
yield smart_future_by_real_future[f]
if callback is not None:
callback()
- return
def wait_all(
) -> None:
real_futures = []
for x in futures:
- assert type(x) == SmartFuture
+ assert isinstance(x, SmartFuture)
real_futures.append(x.wrapped_future)
(done, not_done) = concurrent.futures.wait(
if not f.cancelled():
exception = f.exception()
if exception is not None:
- logger.warning(
- f'Future {id(f)} raised an unhandled exception and exited.'
- )
+ logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
logger.exception(exception)
raise exception
assert len(done) == len(real_futures)
"""
def __init__(self, wrapped_future: fut.Future) -> None:
- assert type(wrapped_future) == fut.Future
+ assert isinstance(wrapped_future, fut.Future)
self.wrapped_future = wrapped_future
self.id = id_generator.get("smart_future_id")
# You shouldn't have to call this; instead, have a look at defining a
# method on DeferredOperand base class.
@overrides
- def _resolve(self, *, timeout=None) -> T:
+ def _resolve(self, timeout=None) -> T:
return self.wrapped_future.result(timeout)
#!/usr/bin/env python3
+"""Several helpers to keep track of internal state via periodic
+polling. StateTracker expects to be invoked periodically to maintain
+state whereas the others automatically update themselves and,
+optionally, expose an event for client code to wait on state changes."""
+
import datetime
import logging
import threading
"""
self.update_ids_to_update_secs = update_ids_to_update_secs
self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
+ self.now: Optional[datetime.datetime] = None
for x in update_ids_to_update_secs.keys():
self.last_reminder_ts[x] = None
refresh_secs = self.update_ids_to_update_secs[update_id]
last_run = self.last_reminder_ts[update_id]
if last_run is None: # Never run before
- logger.debug(f'id {update_id} has never been run; running it now')
+ logger.debug('id %s has never been run; running it now', update_id)
self.update(update_id, self.now, self.last_reminder_ts[update_id])
self.last_reminder_ts[update_id] = self.now
else:
delta = self.now - last_run
if delta.total_seconds() >= refresh_secs: # Is overdue?
- logger.debug(f'id {update_id} is overdue; running it now')
+ logger.debug('id %s is overdue; running it now', update_id)
self.update(
update_id,
self.now,
logger.debug('pace_maker noticed event; shutting down')
return
self.heartbeat()
- logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
+ logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
time.sleep(self.sleep_delay)
def __init__(
super().__init__(update_ids_to_update_secs)
if override_sleep_delay is not None:
- logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
+ logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
self.sleep_delay = override_sleep_delay
else:
periods_list = list(update_ids_to_update_secs.values())
self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
- logger.info(f'Computed sleep_delay={self.sleep_delay}')
+ logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
(thread, stop_event) = self.pace_maker()
self.should_terminate = stop_event
self.updater_thread = thread
#!/usr/bin/env python3
+"""A simple stopwatch decorator / context for timing things."""
+
import time
from typing import Callable, Optional
"""
True if the string represents a valid date.
"""
- import dateparse
import dateparse.dateparse_utils as dp
try:
try:
d = dp.DateParser() # type: ignore
dt = d.parse(in_str)
- if type(dt) == datetime.datetime:
+ if isinstance(dt, datetime.datetime):
return dt
except ValueError:
msg = f'Unable to parse datetime {in_str}.'
for second in second_list:
# Disallow there're/where're. They're valid English
# but sound weird.
- if (first == 'there' or first == 'where') and second == 'a(re)':
+ if (first in ('there', 'where')) and second == 'a(re)':
continue
pattern = fr'\b({first})\s+{second}\b'
# Column specs map input lines' columns into outputs.
# [col1, col2...]
for spec in column_specs:
- chunk = ''
+ hunk = ''
for n in spec:
- chunk = chunk + delim + input_lines[n]
- chunk = chunk.strip(delim)
- out.append(chunk)
+ hunk = hunk + delim + input_lines[n]
+ hunk = hunk.strip(delim)
+ out.append(hunk)
return out
# Column specs map input lines' columns into outputs.
# "key", [col1, col2...]
for spec in column_specs:
- chunk = ''
+ hunk = ''
for n in spec[1]:
- chunk = chunk + delim + input_lines[n]
- chunk = chunk.strip(delim)
- out[spec[0]] = chunk
+ hunk = hunk + delim + input_lines[n]
+ hunk = hunk.strip(delim)
+ out[spec[0]] = hunk
return out
b'1, 2, 3'
"""
- if type(x) is str:
+ if isinstance(x, str):
return x.encode('ascii')
- if type(x) is bytes:
+ if isinstance(x, bytes):
return x
raise Exception('to_ascii works with strings and bytes')
class RowsColumns(NamedTuple):
+ """Row + Column"""
+
rows: int
columns: int
).split()
except Exception as e:
logger.exception(e)
- raise Exception('Can\'t determine console size?!')
+ raise Exception('Can\'t determine console size?!') from e
return RowsColumns(int(rows), int(columns))
barcount = len(_bar)
min_num, max_num = min(numbers), max(numbers)
span = max_num - min_num
- sparkline = ''.join(
+ sline = ''.join(
_bar[min([barcount - 1, int((n - min_num) / span * barcount)])] for n in numbers
)
- return min_num, max_num, sparkline
+ return min_num, max_num, sline
def distribute_strings(
#!/usr/bin/env python3
+"""Utilities for dealing with threads + threading."""
+
import functools
import logging
import os
kwargs=kwa,
)
thread.start()
- logger.debug(f'Started thread {thread.name} tid={thread.ident}')
+ logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
return (thread, should_terminate)
return inner_wrapper
newargs = (should_terminate, *args)
thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
thread.start()
- logger.debug(f'Started thread {thread.name} tid={thread.ident}')
+ logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
return (thread, should_terminate)
return wrapper_repeat
#!/usr/bin/env python3
+"""Utility functions for dealing with typing."""
+
import logging
from typing import Any, Optional
class PerfRegressionDataPersister(ABC):
+ """A base class for a signature dealing with persisting perf
+ regression data."""
+
def __init__(self):
pass
class FileBasedPerfRegressionDataPersister(PerfRegressionDataPersister):
+ """A perf regression data persister that uses files."""
+
def __init__(self, filename: str):
+ super().__init__()
self.filename = filename
self.traces_to_delete: List[str] = []
class DatabasePerfRegressionDataPersister(PerfRegressionDataPersister):
+ """A perf regression data persister that uses a database backend."""
+
def __init__(self, dbspec: str):
+ super().__init__()
self.dbspec = dbspec
self.engine = sa.create_engine(self.dbspec)
self.conn = self.engine.connect()
def save_performance_data(self, method_id: str, data: Dict[str, List[float]]):
self.delete_performance_data(method_id)
- for (method_id, perf_data) in data.items():
+ for (mid, perf_data) in data.items():
sql = 'INSERT INTO runtimes_by_function (function, runtime) VALUES '
for perf in perf_data:
- self.conn.execute(sql + f'("{method_id}", {perf});')
+ self.conn.execute(sql + f'("{mid}", {perf});')
def delete_performance_data(self, method_id: str):
sql = f'DELETE FROM runtimes_by_function WHERE function = "{method_id}"'
func_id = function_utils.function_identifier(func)
func_name = func.__name__
- logger.debug(f'Watching {func_name}\'s performance...')
- logger.debug(f'Canonical function identifier = {func_id}')
+ logger.debug('Watching %s\'s performance...', func_name)
+ logger.debug('Canonical function identifier = "%s"', func_id)
try:
perfdb = helper.load_performance_data(func_id)
hist = perfdb.get(func_id, [])
if len(hist) < config.config['unittests_num_perf_samples']:
hist.append(run_time)
- logger.debug(f'Still establishing a perf baseline for {func_name}')
+ logger.debug('Still establishing a perf baseline for %s', func_name)
else:
stdev = statistics.stdev(hist)
- logger.debug(f'For {func_name}, performance stdev={stdev}')
+ logger.debug('For %s, performance stdev=%.2f', func_name, stdev)
slowest = hist[-1]
- logger.debug(f'For {func_name}, slowest perf on record is {slowest:f}s')
+ logger.debug('For %s, slowest perf on record is %.2fs', func_name, slowest)
limit = slowest + stdev * 4
- logger.debug(f'For {func_name}, max acceptable runtime is {limit:f}s')
- logger.debug(f'For {func_name}, actual observed runtime was {run_time:f}s')
+ logger.debug('For %s, max acceptable runtime is %.2fs', func_name, limit)
+ logger.debug('For %s, actual observed runtime was %.2fs', func_name, run_time)
if run_time > limit:
msg = f'''{func_id} performance has regressed unacceptably.
{slowest:f}s is the slowest runtime on record in {len(hist)} perf samples.
for name, m in inspect.getmembers(cls, inspect.isfunction):
if name.startswith(prefix):
setattr(cls, name, check_method_for_perf_regressions(m))
- logger.debug(f'Wrapping {cls.__name__}:{name}.')
+ logger.debug('Wrapping %s:%s.', cls.__name__, name)
return cls
return decorate_the_testcase
#!/usr/bin/env python3
+"""A fast word unscrambler library."""
+
import logging
from typing import Dict, Mapping, Optional
logger = logging.getLogger(__name__)
letters_bits = 32
-letters_mask = 2 ** letters_bits - 1
+letters_mask = 2**letters_bits - 1
fprint_bits = 52
-fprint_mask = (2 ** fprint_bits - 1) << letters_bits
+fprint_mask = (2**fprint_bits - 1) << letters_bits
fprint_feature_bit = {
'e': 0,
self.sigs = []
self.words = []
- filename = self.get_indexfile(indexfile)
+ filename = Unscrambler.get_indexfile(indexfile)
with open(filename, 'r') as rf:
lines = rf.readlines()
for line in lines:
self.sigs.append(isig)
self.words.append(word)
- def get_indexfile(self, indexfile: Optional[str]) -> str:
+ @staticmethod
+ def get_indexfile(indexfile: Optional[str]) -> str:
if indexfile is None:
if 'unscrambler_default_indexfile' in config.config:
indexfile = config.config['unscramble_indexfile']
@staticmethod
def repopulate(
- lsigs: Dict[str, int],
dictfile: str = '/usr/share/dict/words',
indexfile: str = '/usr/share/dict/sparse_index',
) -> None: