import datetime
import logging
import os
+from typing import Any
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
setattr(namespace, self.dest, True)
-def valid_bool(v):
+def valid_bool(v: Any) -> bool:
+ """
+ If the string is a valid bool, return its value.
+
+ >>> valid_bool(True)
+ True
+
+ >>> valid_bool("true")
+ True
+
+ >>> valid_bool("yes")
+ True
+
+ >>> valid_bool("on")
+ True
+
+ >>> valid_bool("1")
+ True
+
+ >>> valid_bool(12345)
+ Traceback (most recent call last):
+ ...
+ argparse.ArgumentTypeError: 12345
+
+ """
if isinstance(v, bool):
return v
from string_utils import to_bool
- return to_bool(v)
+ try:
+ return to_bool(v)
+ except Exception:
+ raise argparse.ArgumentTypeError(v)
def valid_ip(ip: str) -> str:
+ """
+ If the string is a valid IPv4 address, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_ip("1.2.3.4")
+ '1.2.3.4'
+
+ >>> valid_ip("localhost")
+ Traceback (most recent call last):
+ ...
+ argparse.ArgumentTypeError: localhost is an invalid IP address
+
+ """
from string_utils import extract_ip_v4
s = extract_ip_v4(ip.strip())
if s is not None:
def valid_mac(mac: str) -> str:
+ """
+ If the string is a valid MAC address, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_mac('12:23:3A:4F:55:66')
+ '12:23:3A:4F:55:66'
+
+ >>> valid_mac('12-23-3A-4F-55-66')
+ '12-23-3A-4F-55-66'
+
+ >>> valid_mac('big')
+ Traceback (most recent call last):
+ ...
+ argparse.ArgumentTypeError: big is an invalid MAC address
+
+ """
from string_utils import extract_mac_address
s = extract_mac_address(mac)
if s is not None:
def valid_percentage(num: str) -> float:
+ """
+ If the string is a valid percentage, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_percentage("15%")
+ 15.0
+
+ >>> valid_percentage('40')
+ 40.0
+
+ >>> valid_percentage('115')
+ Traceback (most recent call last):
+ ...
+ argparse.ArgumentTypeError: 115 is an invalid percentage; expected 0 <= n <= 100.0
+
+ """
num = num.strip('%')
n = float(num)
if 0.0 <= n <= 100.0:
def valid_filename(filename: str) -> str:
+ """
+ If the string is a valid filename, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_filename('/tmp')
+ '/tmp'
+
+ >>> valid_filename('wfwefwefwefwefwefwefwefwef')
+ Traceback (most recent call last):
+ ...
+ argparse.ArgumentTypeError: wfwefwefwefwefwefwefwefwef was not found and is therefore invalid.
+
+ """
s = filename.strip()
if os.path.exists(s):
return s
def valid_date(txt: str) -> datetime.date:
+ """If the string is a valid date, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_date('6/5/2021')
+ datetime.date(2021, 6, 5)
+
+ # Note: dates like 'next wednesday' work fine, they are just
+ # hard to test for without knowing when the testcase will be
+ # executed...
+ >>> valid_date('next wednesday') # doctest: +ELLIPSIS
+ -ANYTHING-
+ """
from string_utils import to_date
date = to_date(txt)
if date is not None:
def valid_datetime(txt: str) -> datetime.datetime:
+ """If the string is a valid datetime, return it. Otherwise raise
+ an ArgumentTypeError.
+
+ >>> valid_datetime('6/5/2021 3:01:02')
+ datetime.datetime(2021, 6, 5, 3, 1, 2)
+
+ # Again, these types of expressions work fine but are
+ # difficult to test with doctests because the answer is
+ # relative to the time the doctest is executed.
+ >>> valid_datetime('next christmas at 4:15am') # doctest: +ELLIPSIS
+ -ANYTHING-
+ """
from string_utils import to_datetime
dt = to_datetime(txt)
if dt is not None:
msg = f'Cannot parse argument as datetime: {txt}'
logger.warning(msg)
raise argparse.ArgumentTypeError(msg)
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.ELLIPSIS_MARKER = '-ANYTHING-'
+ doctest.testmod()
def handle_uncaught_exception(exc_type, exc_value, exc_tb):
+ """
+ Top-level exception handler for exceptions that make it past any exception
+ handlers in the python code being run. Logs the error and stacktrace then
+ maybe attaches a debugger.
+ """
global original_hook
msg = f'Unhandled top level exception {exc_type}'
logger.exception(msg)
# stdin or stderr is redirected, just do the normal thing
original_hook(exc_type, exc_value, exc_tb)
else:
- # a terminal is attached and stderr is not redirected, debug.
+ # a terminal is attached and stderr is not redirected, maybe debug.
+ import traceback
+ traceback.print_exception(exc_type, exc_value, exc_tb)
if config.config['debug_unhandled_exceptions']:
- import traceback
import pdb
- traceback.print_exception(exc_type, exc_value, exc_tb)
logger.info("Invoking the debugger...")
pdb.pm()
else:
def initialize(entry_point):
+ """
+ Remember to initialize config, initialize logging, set/log a random
+ seed, etc... before running main.
- """Remember to initialize config and logging before running main."""
+ """
@functools.wraps(entry_point)
def initialize_wrapper(*args, **kwargs):
+
+ # Hook top level unhandled exceptions, maybe invoke debugger.
if sys.excepthook == sys.__excepthook__:
sys.excepthook = handle_uncaught_exception
+
+ # Try to figure out the name of the program entry point. Then
+ # parse configuration (based on cmdline flags, environment vars
+ # etc...)
if (
'__globals__' in entry_point.__dict__ and
'__file__' in entry_point.__globals__
else:
config.parse(None)
+ # Initialize logging... and log some remembered messages from
+ # config module.
logging_utils.initialize_logging(logging.getLogger())
-
config.late_logging()
# Allow programs that don't bother to override the random seed
logger.debug(msg)
random.seed(random_seed)
+ # Do it, invoke the user's code. Pay attention to how long it takes.
logger.debug(f'Starting {entry_point.__name__} (program entry point)')
-
ret = None
import stopwatch
with stopwatch.Timer() as t:
f'child system: {cstime}s\n'
f'machine uptime: {elapsed_time}s\n'
f'walltime: {walltime}s')
+
+ # If it doesn't return cleanly, call attention to the return value.
if ret is not None and ret != 0:
logger.error(f'Exit {ret}')
else:
#!/usr/bin/env python3
-"""Global configuration driven by commandline arguments (even across
-different modules). Usage:
+"""Global configuration driven by commandline arguments, environment variables
+and saved configuration files. This works across several modules.
+
+Usage:
module.py:
----------
config.parse() # Very important, this must be invoked!
If you set this up and remember to invoke config.parse(), all commandline
- arguments will play nicely together:
+ arguments will play nicely together. This is done automatically for you
+ if you're using the bootstrap module's initialize wrapper.
% main.py -h
usage: main.py [-h]
False
>>> d
{'test': 2, 'ing': 1}
+
"""
if key in d.keys():
d[key] = inc_function(d[key])
>>> c = {'c': 1, 'd': 2}
>>> coalesce([a, b, c])
{'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
+
"""
out: Dict[Any, Any] = {}
for d in inputs:
Traceback (most recent call last):
...
ValueError: max() arg is an empty sequence
+
"""
return max(d.items(), key=lambda _: _[1])
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> item_with_min_value(d)
('a', 1)
+
"""
return min(d.items(), key=lambda _: _[1])
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> key_with_max_value(d)
'c'
+
"""
return item_with_max_value(d)[0]
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> key_with_min_value(d)
'a'
+
"""
return item_with_min_value(d)[0]
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> max_value(d)
3
+
"""
return item_with_max_value(d)[1]
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> min_value(d)
1
+
"""
return item_with_min_value(d)[1]
>>> d = {'a': 3, 'b': 2, 'c': 1}
>>> max_key(d)
'c'
+
"""
return max(d.keys())
>>> d = {'a': 3, 'b': 2, 'c': 1}
>>> min_key(d)
'a'
+
"""
return min(d.keys())
default_response: str = None,
timeout_seconds: int = None,
) -> str:
+ """Get a single keystroke response to a prompt."""
+
def _handle_timeout(signum, frame) -> None:
raise exceptions.TimeoutError()
def yn_response(prompt: str = None, *, timeout_seconds=None) -> str:
+ """Get a Y/N response to a prompt."""
+
return single_keystroke_response(
["y", "n", "Y", "N"], prompt=prompt, timeout_seconds=timeout_seconds
).lower()
def keystroke_helper() -> None:
+ """Misc util to watch keystrokes and report what they were."""
+
print("Watching for keystrokes; ^C to quit.")
while True:
key = readchar.readkey()
"""Utilities related to logging."""
+import collections
import contextlib
import datetime
import enum
default=False,
help='logging.info also prints to stdout.'
)
+cfg.add_argument(
+ '--logging_max_n_times_per_message',
+ type=int,
+ default=0,
+ help='When set, ignore logged messages from the same site after N.'
+)
# See also: OutputMultiplexer
cfg.add_argument(
class OnlyInfoFilter(logging.Filter):
+ """
+ A filter that only logs messages produced at the INFO logging level.
+ """
def filter(self, record):
return record.levelno == logging.INFO
+class OnlyNTimesFilter(logging.Filter):
+ """
+ A filter that only logs messages from a given site with the same
+ message at the same logging level N times and ignores subsequent
+ attempts to log.
+
+ """
+ def __init__(self, maximum: int) -> None:
+ self.maximum = maximum
+ self.counters = collections.Counter()
+ super().__init__()
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ source = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
+ count = self.counters[source]
+ self.counters[source] += 1
+ return count < self.maximum
+
+
class MillisecondAwareFormatter(logging.Formatter):
+ """
+ A formatter for adding milliseconds to log messages.
+
+ """
converter = datetime.datetime.fromtimestamp
def formatTime(self, record, datefmt=None):
handler.addFilter(OnlyInfoFilter())
logger.addHandler(handler)
+ maximum = config.config['logging_max_n_times_per_message']
+ if maximum > 0:
+ for handler in handlers:
+ handler.addFilter(OnlyNTimesFilter(maximum))
+
logger.setLevel(numeric_level)
logger.propagate = False
... reference=None,
... )
... )
+ >>> c.add_doc(Document(
+ ... docid=3,
+ ... tags=set(['urgent']),
+ ... properties=[
+ ... ('author', 'Scott'),
+ ... ('subject', 'car turning in front of you')
+ ... ],
+ ... reference=None,
+ ... )
+ ... )
>>> c.query('author:Scott and important')
{1}
"""
>>> to_bool('True')
True
+
>>> to_bool('1')
True
+
>>> to_bool('yes')
True
+
>>> to_bool('no')
False
+
>>> to_bool('huh?')
False
+
+ >>> to_bool('on')
+ True
+
"""
if not is_string(in_str):
raise ValueError(in_str)
- return in_str.lower() in ("true", "1", "yes", "y", "t")
+ return in_str.lower() in ("true", "1", "yes", "y", "t", "on")
def to_date(in_str: str) -> Optional[datetime.date]:
BASE="${BASE} (doctest)"
make_header "${BASE}" "${CYAN}"
OUT=$( python3 ${doctest} 2>&1 )
- if [ "$OUT" == "" ]; then
+ FAILED=$( echo "${OUT}" | grep '\*\*\*Test Failed\*\*\*' | wc -l )
+ if [ $FAILED == 0 ]; then
echo "OK"
else
- echo -e "${OUT}"
+ echo -e "${FAILED}"
FAILURES=$((FAILURES+1))
fi
done
fi
if [ ${FAILURES} -ne 0 ]; then
- echo -e "${RED}There were ${FAILURES} failure(s).${NC}"
+ if [ ${FAILURES} -eq 1 ]; then
+ echo -e "${RED}There was ${FAILURES} failure.${NC}"
+ else
+ echo -e "${RED}There were ${FAILURES} failures.${NC}"
+ fi
+else
+ echo -e "${GREEN}Everything looks good.${NC}"
fi
def check_method_for_perf_regressions(func: Callable) -> Callable:
- """This is meant to be used on a method in a class that subclasses
+ """
+ This is meant to be used on a method in a class that subclasses
unittest.TestCase. When thus decorated it will time the execution
of the code in the method, compare it with a database of
historical perfmance, and fail the test with a perf-related
message if it has become too slow.
- """
+ """
def load_known_test_performance_characteristics():
with open(_db, 'rb') as f:
return pickle.load(f)
)
else:
stdev = statistics.stdev(hist)
- limit = hist[-1] + stdev * 3
+ limit = hist[-1] + stdev * 5
logger.debug(
f'Max acceptable performace for {func.__name__} is {limit:f}s'
)
):
msg = f'''{func_id} performance has regressed unacceptably.
{hist[-1]:f}s is the slowest record in {len(hist)} db perf samples.
-It just ran in {run_time:f}s which is >3 stdevs slower than the slowest sample.
+It just ran in {run_time:f}s which is >5 stdevs slower than the slowest sample.
Here is the current, full db perf timing distribution:
-{hist}'''
- slf = args[0]
+'''
+ for x in hist:
+ msg += f'{x:f}\n'
logger.error(msg)
+ slf = args[0]
slf.fail(msg)
else:
hist.append(run_time)