cfg.add_argument(
"--presence_macs_file",
type=argparse_utils.valid_filename,
- default = "/home/scott/cron/persisted_mac_addresses.txt",
+ default="/home/scott/cron/persisted_mac_addresses.txt",
metavar="FILENAME",
- help="The location of persisted_mac_addresses.txt to use."
+ help="The location of persisted_mac_addresses.txt to use.",
)
cfg.add_argument(
'--presence_tolerable_staleness_seconds',
type=argparse_utils.valid_duration,
default=datetime.timedelta(seconds=60 * 5),
metavar='DURATION',
- help='Max acceptable age of location data before auto-refreshing'
+ help='Max acceptable age of location data before auto-refreshing',
)
# Note: list most important devices first.
self.devices_by_person: Dict[Person, List[str]] = {
Person.SCOTT: [
- "DC:E5:5B:0F:03:3D", # pixel6
- "6C:40:08:AE:DC:2E", # laptop
+ "DC:E5:5B:0F:03:3D", # pixel6
+ "6C:40:08:AE:DC:2E", # laptop
],
Person.LYNN: [
- "08:CC:27:63:26:14", # motog7
- "B8:31:B5:9A:4F:19", # laptop
+ "08:CC:27:63:26:14", # motog7
+ "B8:31:B5:9A:4F:19", # laptop
],
Person.ALEX: [
- "0C:CB:85:0C:8B:AE", # phone
- "D0:C6:37:E3:36:9A", # laptop
+ "0C:CB:85:0C:8B:AE", # phone
+ "D0:C6:37:E3:36:9A", # laptop
],
Person.AARON_AND_DANA: [
"98:B6:E9:E5:5A:7C",
else:
now = datetime.datetime.now()
delta = now - self.last_update
- if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds():
+ if (
+ delta.total_seconds()
+ > config.config['presence_tolerable_staleness_seconds'].total_seconds()
+ ):
logger.debug(
f"It's been {delta.total_seconds()}s since last update; refreshing now."
)
def update_from_house(self) -> None:
from exec_utils import cmd
+
try:
persisted_macs = config.config['presence_macs_file']
except KeyError:
def update_from_cabin(self) -> None:
from exec_utils import cmd
+
try:
persisted_macs = config.config['presence_macs_file']
except KeyError:
warnings.warn(msg, stacklevel=2)
self.dark_locations.add(Location.HOUSE)
- def read_persisted_macs_file(
- self, filename: str, location: Location
- ) -> None:
+ def read_persisted_macs_file(self, filename: str, location: Location) -> None:
if location is Location.UNKNOWN:
return
with open(filename, "r") as rf:
logger.exception(e)
continue
mac = mac.strip()
- (self.location_ts_by_mac[location])[
- mac
- ] = datetime.datetime.fromtimestamp(int(ts.strip()))
+ (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
+ int(ts.strip())
+ )
ip_name = ip_name.strip()
match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
if match is not None:
def where_is_person_now(self, name: Person) -> Location:
self.maybe_update()
if len(self.dark_locations) > 0:
- msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
+ msg = (
+ f"Can't see {self.dark_locations} right now; answer confidence impacted"
+ )
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
logger.debug(f'Looking for {name}...')
return Location.UNKNOWN
import dict_utils
+
votes: Dict[Location, int] = {}
tiebreaks: Dict[Location, datetime.datetime] = {}
credit = 10000
logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
tiebreaks[location] = ts
- (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
+ (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(
+ tiebreaks
+ )
bonus = credit
v = votes.get(most_recent_location, 0)
votes[most_recent_location] = v + bonus
logger.debug(f'{name}: {location} gets {bonus} votes.')
- credit = int(
- credit * 0.2
- ) # Note: list most important devices first
+ credit = int(credit * 0.2) # Note: list most important devices first
if credit <= 0:
credit = 1
if len(votes) > 0:
for person in Person:
print(f'{person} => {p.where_is_person_now(person)}')
print()
+
+
# for location in Location:
# print(f'{location} => {p.is_anyone_in_location_now(location)}')
import argparse_utils
import config
-cfg = config.add_commandline_args(
- f'Logging ({__file__})',
- 'Args related to logging')
+cfg = config.add_commandline_args(f'Logging ({__file__})', 'Args related to logging')
cfg.add_argument(
'--logging_config_file',
type=argparse_utils.valid_filename,
'--logging_format',
type=str,
default=None,
- help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects'
+ help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects',
)
cfg.add_argument(
'--logging_date_format',
type=str,
default='%Y/%m/%dT%H:%M:%S.%f%z',
metavar='DATEFMT',
- help='The format of any dates in --logging_format.'
+ help='The format of any dates in --logging_format.',
)
cfg.add_argument(
'--logging_console',
type=str,
default=None,
metavar='FILENAME',
- help='The filename of the logfile to write.'
+ help='The filename of the logfile to write.',
)
cfg.add_argument(
'--logging_filename_maxsize',
type=int,
- default=(1024*1024),
+ default=(1024 * 1024),
metavar='#BYTES',
- help='The maximum size (in bytes) to write to the logging_filename.'
+ help='The maximum size (in bytes) to write to the logging_filename.',
)
cfg.add_argument(
'--logging_filename_count',
type=int,
default=7,
metavar='COUNT',
- help='The number of logging_filename copies to keep before deleting.'
+ help='The number of logging_filename copies to keep before deleting.',
)
cfg.add_argument(
'--logging_syslog',
action=argparse_utils.ActionNoYes,
default=False,
- help='Should we log to localhost\'s syslog.'
+ help='Should we log to localhost\'s syslog.',
)
cfg.add_argument(
'--logging_syslog_facility',
type=str,
- default = 'USER',
- choices=['NOTSET', 'AUTH', 'AUTH_PRIV', 'CRON', 'DAEMON', 'FTP', 'KERN', 'LPR', 'MAIL', 'NEWS',
- 'SYSLOG', 'USER', 'UUCP', 'LOCAL0', 'LOCAL1', 'LOCAL2', 'LOCAL3', 'LOCAL4', 'LOCAL5',
- 'LOCAL6', 'LOCAL7'],
+ default='USER',
+ choices=[
+ 'NOTSET',
+ 'AUTH',
+ 'AUTH_PRIV',
+ 'CRON',
+ 'DAEMON',
+ 'FTP',
+ 'KERN',
+ 'LPR',
+ 'MAIL',
+ 'NEWS',
+ 'SYSLOG',
+ 'USER',
+ 'UUCP',
+ 'LOCAL0',
+ 'LOCAL1',
+ 'LOCAL2',
+ 'LOCAL3',
+ 'LOCAL4',
+ 'LOCAL5',
+ 'LOCAL6',
+ 'LOCAL7',
+ ],
metavar='SYSLOG_FACILITY_LIST',
help='The default syslog message facility identifier',
)
'--logging_debug_threads',
action=argparse_utils.ActionNoYes,
default=False,
- help='Should we prepend pid/tid data to all log messages?'
+ help='Should we prepend pid/tid data to all log messages?',
)
cfg.add_argument(
'--logging_debug_modules',
action=argparse_utils.ActionNoYes,
default=False,
- help='Should we prepend module/function data to all log messages?'
+ help='Should we prepend module/function data to all log messages?',
)
cfg.add_argument(
'--logging_info_is_print',
action=argparse_utils.ActionNoYes,
default=False,
- help='logging.info also prints to stdout.'
+ help='logging.info also prints to stdout.',
)
cfg.add_argument(
'--logging_squelch_repeats',
action=argparse_utils.ActionNoYes,
default=True,
- help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
+ help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?',
)
cfg.add_argument(
'--logging_probabilistically',
action=argparse_utils.ActionNoYes,
default=True,
- help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
+ help='Do we allow probabilistic logging (for code that wants it) or should we always log?',
)
# See also: OutputMultiplexer
cfg.add_argument(
'--logging_captures_prints',
action=argparse_utils.ActionNoYes,
default=False,
- help='When calling print, also log.info automatically.'
+ help='When calling print, also log.info automatically.',
)
cfg.add_argument(
'--lmodule',
type=str,
metavar='<SCOPE>=<LEVEL>[,<SCOPE>=<LEVEL>...]',
help=(
- 'Allows per-scope logging levels which override the global level set with --logging-level.' +
- 'Pass a space separated list of <scope>=<level> where <scope> is one of: module, ' +
- 'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
- )
+ 'Allows per-scope logging levels which override the global level set with --logging-level.'
+ + 'Pass a space separated list of <scope>=<level> where <scope> is one of: module, '
+ + 'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
+ ),
)
cfg.add_argument(
'--logging_clear_preexisting_handlers',
action=argparse_utils.ActionNoYes,
default=True,
help=(
- 'Should logging code clear preexisting global logging handlers and thus insist that is ' +
- 'alone can add handlers. Use this to work around annoying modules that insert global ' +
- 'handlers with formats and logging levels you might now want. Caveat emptor, this may ' +
- 'cause you to miss logging messages.'
- )
+ 'Should logging code clear preexisting global logging handlers and thus insist that is '
+ + 'alone can add handlers. Use this to work around annoying modules that insert global '
+ + 'handlers with formats and logging levels you might now want. Caveat emptor, this may '
+ + 'cause you to miss logging messages.'
+ ),
)
built_in_print = print
string), the messages are considered to be different.
"""
+
def squelch_logging_wrapper(f: Callable):
import function_utils
+
identifier = function_utils.function_identifier(f)
squelched_logging_counts[identifier] = squelch_after_n_repeats
return f
+
return squelch_logging_wrapper
the --no_logging_squelch_repeats commandline flag.
"""
+
def __init__(self) -> None:
self.counters = collections.Counter()
super().__init__()
module names or module:function names. Blocks others.
"""
+
@staticmethod
def level_name_to_level(name: str) -> int:
- numeric_level = getattr(
- logging,
- name,
- None
- )
+ numeric_level = getattr(logging, name, None)
if not isinstance(numeric_level, int):
raise ValueError(f'Invalid level: {name}')
return numeric_level
def __init__(
- self,
- default_logging_level: int,
- per_scope_logging_levels: str,
+ self,
+ default_logging_level: int,
+ per_scope_logging_levels: str,
) -> None:
super().__init__()
- self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
+ self.valid_levels = set(
+ ['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
+ )
self.default_logging_level = default_logging_level
self.level_by_scope = {}
if per_scope_logging_levels is not None:
if '=' not in chunk:
print(
f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.',
- file=sys.stderr
+ file=sys.stderr,
)
continue
try:
except ValueError:
print(
f'Malformed lmodule directive: "{chunk}". Ignored.',
- file=sys.stderr
+ file=sys.stderr,
)
continue
scope = scope.strip()
if level not in self.valid_levels:
print(
f'Malformed lmodule directive: "{chunk}", bad level. Ignored.',
- file=sys.stderr
+ file=sys.stderr,
)
continue
- self.level_by_scope[scope] = (
- DynamicPerScopeLoggingLevelFilter.level_name_to_level(
- level
- )
- )
+ self.level_by_scope[
+ scope
+ ] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(level)
@overrides
def filter(self, record: logging.LogRecord) -> bool:
if len(self.level_by_scope) > 0:
min_level = None
for scope in (
- record.module,
- f'{record.module}:{record.funcName}',
- f':{record.funcName}'
+ record.module,
+ f'{record.module}:{record.funcName}',
+ f':{record.funcName}',
):
level = self.level_by_scope.get(scope, None)
if level is not None:
This affects *ALL* logging statements within the marked function.
"""
+
def probabilistic_logging_wrapper(f: Callable):
import function_utils
+
identifier = function_utils.function_identifier(f)
probabilistic_logging_levels[identifier] = probability_of_logging
return f
+
return probabilistic_logging_wrapper
been tagged with the @logging_utils.probabilistic_logging decorator.
"""
+
@overrides
def filter(self, record: logging.LogRecord) -> bool:
id1 = f'{record.module}:{record.funcName}'
stdout handler.
"""
+
@overrides
def filter(self, record: logging.LogRecord):
return record.levelno == logging.INFO
whatever reason, the default python logger doesn't do.
"""
+
converter = datetime.datetime.fromtimestamp
@overrides
# Global default logging level (--logging_level)
default_logging_level = getattr(
- logging,
- config.config['logging_level'].upper(),
- None
+ logging, config.config['logging_level'].upper(), None
)
if not isinstance(default_logging_level, int):
raise ValueError('Invalid level: %s' % config.config['logging_level'])
if config.config['logging_filename']:
handler = RotatingFileHandler(
config.config['logging_filename'],
- maxBytes = config.config['logging_filename_maxsize'],
- backupCount = config.config['logging_filename_count'],
+ maxBytes=config.config['logging_filename_maxsize'],
+ backupCount=config.config['logging_filename_count'],
)
handler.setFormatter(
MillisecondAwareFormatter(
if config.config['logging_captures_prints']:
import builtins
+
global built_in_print
def print_and_also_log(*arg, **kwarg):
else:
logger.info(*arg)
built_in_print(*arg, **kwarg)
+
builtins.print = print_and_also_log
# At this point the logger is ready, handlers are set up,
# etc... so log about the logging configuration.
- level_name = logging._levelToName.get(default_logging_level, str(default_logging_level))
- logger.debug(
- f'Initialized global logging; default logging level is {level_name}.'
+ level_name = logging._levelToName.get(
+ default_logging_level, str(default_logging_level)
)
- if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0:
+ logger.debug(f'Initialized global logging; default logging level is {level_name}.')
+ if (
+ config.config['logging_clear_preexisting_handlers']
+ and preexisting_handlers_count > 0
+ ):
msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
logger.warning(msg)
logger.debug(f'Logging format specification is "{fmt}"')
if config.config['logging_debug_threads']:
- logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)')
+ logger.debug(
+ '...Logging format spec captures tid/pid (--logging_debug_threads)'
+ )
if config.config['logging_debug_modules']:
- logger.debug('...Logging format spec captures files/functions/lineno (--logging_debug_modules)')
+ logger.debug(
+ '...Logging format spec captures files/functions/lineno (--logging_debug_modules)'
+ )
if config.config['logging_syslog']:
- logger.debug(f'Logging to syslog as {facility_name} with priority mapping based on level')
+ logger.debug(
+ f'Logging to syslog as {facility_name} with priority mapping based on level'
+ )
if config.config['logging_filename']:
logger.debug(f'Logging to filename {config.config["logging_filename"]}')
- logger.debug(f'...with {config.config["logging_filename_maxsize"]} bytes max file size.')
- logger.debug(f'...and {config.config["logging_filename_count"]} rotating backup file count.')
+ logger.debug(
+ f'...with {config.config["logging_filename_maxsize"]} bytes max file size.'
+ )
+ logger.debug(
+ f'...and {config.config["logging_filename_count"]} rotating backup file count.'
+ )
if config.config['logging_console']:
logger.debug('Logging to the console (stderr).')
if config.config['logging_info_is_print']:
f'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})'
)
if config.config['logging_captures_prints']:
- logger.debug('Logging will capture printed data as logger.info messages (--logging_captures_prints)')
+ logger.debug(
+ 'Logging will capture printed data as logger.info messages (--logging_captures_prints)'
+ )
return logger
"""
if config.config['logging_debug_threads']:
from thread_utils import current_thread_id
+
print(f'{current_thread_id()}', end="")
print(*args, **kwargs)
else:
easy usage pattern.
"""
+
class Destination(enum.IntEnum):
"""Bits in the destination_bitv bitvector. Used to indicate the
output destination."""
- LOG_DEBUG = 0x01 # ⎫
- LOG_INFO = 0x02 # ⎪
- LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
- LOG_ERROR = 0x08 # ⎪
- LOG_CRITICAL = 0x10 # ⎭
- FILENAMES = 0x20 # Must provide a filename to the c'tor.
- FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
+
+ LOG_DEBUG = 0x01 # ⎫
+ LOG_INFO = 0x02 # ⎪
+ LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
+ LOG_ERROR = 0x08 # ⎪
+ LOG_CRITICAL = 0x10 # ⎭
+ FILENAMES = 0x20 # Must provide a filename to the c'tor.
+ FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
HLOG = 0x80
ALL_LOG_DESTINATIONS = (
LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
)
ALL_OUTPUT_DESTINATIONS = 0x8F
- def __init__(self,
- destination_bitv: int,
- *,
- logger=None,
- filenames: Optional[Iterable[str]] = None,
- handles: Optional[Iterable[io.TextIOWrapper]] = None):
+ def __init__(
+ self,
+ destination_bitv: int,
+ *,
+ logger=None,
+ filenames: Optional[Iterable[str]] = None,
+ handles: Optional[Iterable[io.TextIOWrapper]] = None,
+ ):
if logger is None:
logger = logging.getLogger(None)
self.logger = logger
if filenames is not None:
- self.f = [
- open(filename, 'wb', buffering=0) for filename in filenames
- ]
+ self.f = [open(filename, 'wb', buffering=0) for filename in filenames]
else:
if destination_bitv & OutputMultiplexer.FILENAMES:
- raise ValueError(
- "Filenames argument is required if bitv & FILENAMES"
- )
+ raise ValueError("Filenames argument is required if bitv & FILENAMES")
self.f = None
if handles is not None:
self.h = [handle for handle in handles]
else:
if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
- raise ValueError(
- "Handle argument is required if bitv & FILEHANDLES"
- )
+ raise ValueError("Handle argument is required if bitv & FILEHANDLES")
self.h = None
self.set_destination_bitv(destination_bitv)
def set_destination_bitv(self, destination_bitv: int):
if destination_bitv & self.Destination.FILENAMES and self.f is None:
- raise ValueError(
- "Filename argument is required if bitv & FILENAMES"
- )
+ raise ValueError("Filename argument is required if bitv & FILENAMES")
if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
- raise ValueError(
- "Handle argument is required if bitv & FILEHANDLES"
- )
+ raise ValueError("Handle argument is required if bitv & FILEHANDLES")
self.destination_bitv = destination_bitv
def print(self, *args, **kwargs):
from string_utils import sprintf, strip_escape_sequences
+
end = kwargs.pop("end", None)
if end is not None:
if not isinstance(end, str):
end = "\n"
if end == '\n':
buf += '\n'
- if (
- self.destination_bitv & self.Destination.FILENAMES and
- self.f is not None
- ):
+ if self.destination_bitv & self.Destination.FILENAMES and self.f is not None:
for _ in self.f:
_.write(buf.encode('utf-8'))
_.flush()
- if (
- self.destination_bitv & self.Destination.FILEHANDLES and
- self.h is not None
- ):
+ if self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None:
for _ in self.h:
_.write(buf)
_.flush()
mplex.print("This is a log message!")
"""
- def __init__(self,
- destination_bitv: OutputMultiplexer.Destination,
- *,
- logger = None,
- filenames = None,
- handles = None):
+
+ def __init__(
+ self,
+ destination_bitv: OutputMultiplexer.Destination,
+ *,
+ logger=None,
+ filenames=None,
+ handles=None,
+ ):
super().__init__(
- destination_bitv,
- logger=logger,
- filenames=filenames,
- handles=handles)
+ destination_bitv, logger=logger, filenames=filenames, handles=handles
+ )
def __enter__(self):
return self
if __name__ == '__main__':
import doctest
+
doctest.testmod()