X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=logging_utils.py;h=819e3d3ee780a78cc903a890eba03e533b608870;hb=ed8fa2b10b0177b15b7423263bdd390efde2f0c8;hp=0c7d19362d7ed59cf9009053465763e47e6e4709;hpb=c79ecbf708a63a54a9c3e8d189b65d4794930082;p=python_utils.git diff --git a/logging_utils.py b/logging_utils.py index 0c7d193..819e3d3 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -2,6 +2,7 @@ """Utilities related to logging.""" +import collections import contextlib import datetime import enum @@ -10,8 +11,11 @@ import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os import pytz +import random import sys -from typing import Iterable, Optional +from typing import Callable, Iterable, Mapping, Optional + +from overrides import overrides # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -34,7 +38,7 @@ cfg.add_argument( default='INFO', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], metavar='LEVEL', - help='The level below which to squelch log messages.', + help='The global default level below which to squelch log messages; see also --lmodule', ) cfg.add_argument( '--logging_format', @@ -72,7 +76,7 @@ cfg.add_argument( cfg.add_argument( '--logging_filename_count', type=int, - default=2, + default=7, metavar='COUNT', help='The number of logging_filename copies to keep before deleting.' ) @@ -88,32 +92,264 @@ cfg.add_argument( default=False, help='Should we prepend pid/tid data to all log messages?' ) +cfg.add_argument( + '--logging_debug_modules', + action=argparse_utils.ActionNoYes, + default=False, + help='Should we prepend module/function data to all log messages?' +) cfg.add_argument( '--logging_info_is_print', action=argparse_utils.ActionNoYes, default=False, help='logging.info also prints to stdout.' ) - -# See also: OutputMultiplexer/OutputContext +cfg.add_argument( + '--logging_squelch_repeats_enabled', + action=argparse_utils.ActionNoYes, + default=True, + help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?' +) +cfg.add_argument( + '--logging_probabilistically_enabled', + action=argparse_utils.ActionNoYes, + default=True, + help='Do we allow probabilistic logging (for code that wants it) or should we always log?' +) +# See also: OutputMultiplexer cfg.add_argument( '--logging_captures_prints', action=argparse_utils.ActionNoYes, default=False, - help='When calling print also log.info too' + help='When calling print, also log.info automatically.' +) +cfg.add_argument( + '--lmodule', + type=str, + metavar='=[,=...]', + help=( + 'Allows per-scope logging levels which override the global level set with --logging-level.' + + 'Pass a space separated list of = where is one of: module, ' + + 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' + ) ) + built_in_print = print +def function_identifier(f: Callable) -> str: + """ + Given a callable function, return a string that identifies it. + Usually that string is just __module__:__name__ but there's a + corner case: when __module__ is __main__ (i.e. the callable is + defined in the same module as __main__). In this case, + f.__module__ returns "__main__" instead of the file that it is + defined in. Work around this using pathlib.Path (see below). + + >>> function_identifier(function_identifier) + 'logging_utils:function_identifier' + + """ + if f.__module__ == '__main__': + from pathlib import Path + import __main__ + module = __main__.__file__ + module = Path(module).stem + return f'{module}:{f.__name__}' + else: + return f'{f.__module__}:{f.__name__}' + + +# A map from logging_callsite_id -> count of logged messages. +squelched_logging_counts: Mapping[str, int] = {} + + +def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable: + """ + A decorator that marks a function as interested in having the logging + messages that it produces be squelched (ignored) after it logs the + same message more than N times. + + Note: this decorator affects *ALL* logging messages produced + within the decorated function. That said, messages must be + identical in order to be squelched. For example, if the same line + of code produces different messages (because of, e.g., a format + string), the messages are considered to be different. + + """ + def squelch_logging_wrapper(f: Callable): + identifier = function_identifier(f) + squelched_logging_counts[identifier] = squelch_after_n_repeats + return f + return squelch_logging_wrapper + + +class SquelchRepeatedMessagesFilter(logging.Filter): + """ + A filter that only logs messages from a given site with the same + (exact) message at the same logging level N times and ignores + subsequent attempts to log. + + This filter only affects logging messages that repeat more than + a threshold number of times from functions that are tagged with + the @logging_utils.squelched_logging_ok decorator. + + """ + def __init__(self) -> None: + self.counters = collections.Counter() + super().__init__() + + @overrides + def filter(self, record: logging.LogRecord) -> bool: + id1 = f'{record.module}:{record.funcName}' + if id1 not in squelched_logging_counts: + return True + threshold = squelched_logging_counts[id1] + logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}' + count = self.counters[logsite] + self.counters[logsite] += 1 + return count < threshold + + +class DynamicPerScopeLoggingLevelFilter(logging.Filter): + """Only interested in seeing logging messages from an allow list of + module names or module:function names. Block others. + + """ + @staticmethod + def level_name_to_level(name: str) -> int: + numeric_level = getattr( + logging, + name, + None + ) + if not isinstance(numeric_level, int): + raise ValueError('Invalid level: {name}') + return numeric_level + + def __init__( + self, + default_logging_level: int, + per_scope_logging_levels: str, + ) -> None: + super().__init__() + self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) + self.default_logging_level = default_logging_level + self.level_by_scope = {} + if per_scope_logging_levels is not None: + for chunk in per_scope_logging_levels.split(','): + if '=' not in chunk: + print( + f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.', + file=sys.stderr + ) + continue + try: + (scope, level) = chunk.split('=') + except ValueError: + print( + f'Malformed lmodule directive: "{chunk}". Ignored.', + file=sys.stderr + ) + continue + scope = scope.strip() + level = level.strip().upper() + if level not in self.valid_levels: + print( + f'Malformed lmodule directive: "{chunk}", bad level. Ignored.', + file=sys.stderr + ) + continue + self.level_by_scope[scope] = ( + DynamicPerScopeLoggingLevelFilter.level_name_to_level( + level + ) + ) + + @overrides + def filter(self, record: logging.LogRecord) -> bool: + # First try to find a logging level by scope (--lmodule) + if len(self.level_by_scope) > 0: + min_level = None + for scope in ( + record.module, + f'{record.module}:{record.funcName}', + f':{record.funcName}' + ): + level = self.level_by_scope.get(scope, None) + if level is not None: + if min_level is None or level < min_level: + min_level = level + + # If we found one, use it instead of the global default level. + if min_level is not None: + return record.levelno >= min_level + + # Otherwise, use the global logging level (--logging_level) + return record.levelno >= self.default_logging_level + + +# A map from function_identifier -> probability of logging (0.0%..100.0%) +probabilistic_logging_levels: Mapping[str, float] = {} + + +def logging_is_probabilistic(probability_of_logging: float) -> Callable: + """ + A decorator that indicates that all logging statements within the + scope of a particular (marked) function are not deterministic + (i.e. they do not always unconditionally log) but rather are + probabilistic (i.e. they log N% of the time randomly). + + This affects *ALL* logging statements within the marked function. + + """ + def probabilistic_logging_wrapper(f: Callable): + identifier = function_identifier(f) + probabilistic_logging_levels[identifier] = probability_of_logging + return f + return probabilistic_logging_wrapper + + +class ProbabilisticFilter(logging.Filter): + """ + A filter that logs messages probabilistically (i.e. randomly at some + percent chance). + + This filter only affects logging messages from functions that have + been tagged with the @logging_utils.probabilistic_logging decorator. + + """ + @overrides + def filter(self, record: logging.LogRecord) -> bool: + id1 = f'{record.module}:{record.funcName}' + if id1 not in probabilistic_logging_levels: + return True + threshold = probabilistic_logging_levels[id1] + return (random.random() * 100.0) <= threshold + + class OnlyInfoFilter(logging.Filter): - def filter(self, record): + """ + A filter that only logs messages produced at the INFO logging + level. This is used by the logging_info_is_print commandline + option to select a subset of the logging stream to send to a + stdout handler. + + """ + @overrides + def filter(self, record: logging.LogRecord): return record.levelno == logging.INFO class MillisecondAwareFormatter(logging.Formatter): + """ + A formatter for adding milliseconds to log messages. + + """ converter = datetime.datetime.fromtimestamp + @overrides def formatTime(self, record, datefmt=None): ct = MillisecondAwareFormatter.converter( record.created, pytz.timezone("US/Pacific") @@ -136,30 +372,31 @@ def initialize_logging(logger=None) -> logging.Logger: return logger handlers = [] - numeric_level = getattr( + + # Global default logging level (--logging_level) + default_logging_level = getattr( logging, config.config['logging_level'].upper(), None ) - if not isinstance(numeric_level, int): + if not isinstance(default_logging_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) fmt = config.config['logging_format'] if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' + if config.config['logging_debug_modules']: + fmt = f'%(filename)s:%(funcName)s:%(lineno)s|{fmt}' if config.config['logging_syslog']: if sys.platform not in ('win32', 'cygwin'): handler = SysLogHandler() -# for k, v in encoded_priorities.items(): -# handler.encodePriority(k, v) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) - handler.setLevel(numeric_level) handlers.append(handler) if config.config['logging_filename']: @@ -168,7 +405,6 @@ def initialize_logging(logger=None) -> logging.Logger: maxBytes = config.config['logging_filename_maxsize'], backupCount = config.config['logging_filename_count'], ) - handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, @@ -179,7 +415,6 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_console']: handler = logging.StreamHandler(sys.stderr) - handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, @@ -199,7 +434,22 @@ def initialize_logging(logger=None) -> logging.Logger: handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) - logger.setLevel(numeric_level) + if config.config['logging_squelch_repeats_enabled']: + for handler in handlers: + handler.addFilter(SquelchRepeatedMessagesFilter()) + + if config.config['logging_probabilistically_enabled']: + for handler in handlers: + handler.addFilter(ProbabilisticFilter()) + + for handler in handlers: + handler.addFilter( + DynamicPerScopeLoggingLevelFilter( + default_logging_level, + config.config['lmodule'], + ) + ) + logger.setLevel(0) logger.propagate = False if config.config['logging_captures_prints']: @@ -224,6 +474,8 @@ def get_logger(name: str = ""): def tprint(*args, **kwargs) -> None: + """Legacy function for printing a message augmented with thread id.""" + if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") @@ -233,19 +485,26 @@ def tprint(*args, **kwargs) -> None: def dprint(*args, **kwargs) -> None: + """Legacy function used to print to stderr.""" + print(*args, file=sys.stderr, **kwargs) class OutputMultiplexer(object): + """ + A class that broadcasts printed messages to several sinks (including + various logging levels, different files, different file handles, + the house log, etc...) + """ class Destination(enum.IntEnum): """Bits in the destination_bitv bitvector. Used to indicate the output destination.""" - LOG_DEBUG = 0x01 # -\ - LOG_INFO = 0x02 # | - LOG_WARNING = 0x04 # > Should provide logger to the c'tor. - LOG_ERROR = 0x08 # | - LOG_CRITICAL = 0x10 # _/ + LOG_DEBUG = 0x01 # ⎫ + LOG_INFO = 0x02 # ⎪ + LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor. + LOG_ERROR = 0x08 # ⎪ + LOG_CRITICAL = 0x10 # ⎭ FILENAMES = 0x20 # Must provide a filename to the c'tor. FILEHANDLES = 0x40 # Must provide a handle to the c'tor. HLOG = 0x80 @@ -269,7 +528,7 @@ class OutputMultiplexer(object): open(filename, 'wb', buffering=0) for filename in filenames ] else: - if self.destination_bitv & OutputMultiplexer.FILENAMES: + if destination_bitv & OutputMultiplexer.FILENAMES: raise ValueError( "Filenames argument is required if bitv & FILENAMES" ) @@ -278,7 +537,7 @@ class OutputMultiplexer(object): if handles is not None: self.h = [handle for handle in handles] else: - if self.destination_bitv & OutputMultiplexer.FILEHANDLES: + if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES: raise ValueError( "Handle argument is required if bitv & FILEHANDLES" ) @@ -357,6 +616,20 @@ class OutputMultiplexer(object): class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): + """ + A context that uses an OutputMultiplexer. e.g. + + with OutputMultiplexerContext( + OutputMultiplexer.LOG_INFO | + OutputMultiplexer.LOG_DEBUG | + OutputMultiplexer.FILENAMES | + OutputMultiplexer.FILEHANDLES, + filenames = [ '/tmp/foo.log', '/var/log/bar.log' ], + handles = [ f, g ] + ) as mplex: + mplex.print("This is a log message!") + + """ def __init__(self, destination_bitv: OutputMultiplexer.Destination, *, @@ -380,5 +653,12 @@ class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): def hlog(message: str) -> None: + """Write a message to the house log.""" + message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'") + + +if __name__ == '__main__': + import doctest + doctest.testmod()