X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=logging_utils.py;h=183e1f0109eb8b8dc243b23e9cceb431b01073e0;hb=9484900f080e16f118806fe54973e69d36b043e8;hp=b7fd11fd8168cfbc985fa1fa60e28704b86075bd;hpb=1574e8a3a8982fab9278ad534f9427d464e4bffb;p=python_utils.git diff --git a/logging_utils.py b/logging_utils.py index b7fd11f..183e1f0 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -2,14 +2,18 @@ """Utilities related to logging.""" +import collections import contextlib import datetime import enum +import io import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os import pytz +import random import sys +from typing import Callable, Iterable, Mapping, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -92,24 +96,157 @@ cfg.add_argument( default=False, help='logging.info also prints to stdout.' ) - -# See also: OutputMultiplexer/OutputContext +cfg.add_argument( + '--logging_squelch_repeats_enabled', + action=argparse_utils.ActionNoYes, + default=True, + help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?' +) +cfg.add_argument( + '--logging_probabilistically_enabled', + action=argparse_utils.ActionNoYes, + default=True, + help='Do we allow probabilistic logging (for code that wants it) or should we always log?' +) +# See also: OutputMultiplexer cfg.add_argument( '--logging_captures_prints', action=argparse_utils.ActionNoYes, default=False, - help='When calling print also log.info too' + help='When calling print, also log.info automatically.' ) built_in_print = print +def function_identifier(f: Callable) -> str: + """ + Given a callable function, return a string that identifies it. + Usually that string is just __module__:__name__ but there's a + corner case: when __module__ is __main__ (i.e. the callable is + defined in the same module as __main__). In this case, + f.__module__ returns "__main__" instead of the file that it is + defined in. Work around this using pathlib.Path (see below). + + >>> function_identifier(function_identifier) + 'logging_utils:function_identifier' + + """ + if f.__module__ == '__main__': + from pathlib import Path + import __main__ + module = __main__.__file__ + module = Path(module).stem + return f'{module}:{f.__name__}' + else: + return f'{f.__module__}:{f.__name__}' + + +# A map from logging_callsite_id -> count of logged messages. +squelched_logging_counts: Mapping[str, int] = {} + + +def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable: + """ + A decorator that marks a function as interested in having the logging + messages that it produces be squelched (ignored) after it logs the + same message more than N times. + + Note: this decorator affects *ALL* logging messages produced + within the decorated function. That said, messages must be + identical in order to be squelched. For example, if the same line + of code produces different messages (because of, e.g., a format + string), the messages are considered to be different. + + """ + def squelch_logging_wrapper(f: Callable): + identifier = function_identifier(f) + squelched_logging_counts[identifier] = squelch_after_n_repeats + return f + return squelch_logging_wrapper + + +class SquelchRepeatedMessagesFilter(logging.Filter): + """ + A filter that only logs messages from a given site with the same + (exact) message at the same logging level N times and ignores + subsequent attempts to log. + + This filter only affects logging messages that repeat more than + a threshold number of times from functions that are tagged with + the @logging_utils.squelched_logging_ok decorator. + + """ + def __init__(self) -> None: + self.counters = collections.Counter() + super().__init__() + + def filter(self, record: logging.LogRecord) -> bool: + id1 = f'{record.module}:{record.funcName}' + if id1 not in squelched_logging_counts: + return True + threshold = squelched_logging_counts[id1] + logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}' + count = self.counters[logsite] + self.counters[logsite] += 1 + return count < threshold + + +# A map from function_identifier -> probability of logging (0.0%..100.0%) +probabilistic_logging_levels: Mapping[str, float] = {} + + +def logging_is_probabilistic(probability_of_logging: float) -> Callable: + """ + A decorator that indicates that all logging statements within the + scope of a particular (marked) function are not deterministic + (i.e. they do not always unconditionally log) but rather are + probabilistic (i.e. they log N% of the time randomly). + + This affects *ALL* logging statements within the marked function. + + """ + def probabilistic_logging_wrapper(f: Callable): + identifier = function_identifier(f) + probabilistic_logging_levels[identifier] = probability_of_logging + return f + return probabilistic_logging_wrapper + + +class ProbabilisticFilter(logging.Filter): + """ + A filter that logs messages probabilistically (i.e. randomly at some + percent chance). + + This filter only affects logging messages from functions that have + been tagged with the @logging_utils.probabilistic_logging decorator. + + """ + def filter(self, record: logging.LogRecord) -> bool: + id1 = f'{record.module}:{record.funcName}' + if id1 not in probabilistic_logging_levels: + return True + threshold = probabilistic_logging_levels[id1] + return (random.random() * 100.0) <= threshold + + class OnlyInfoFilter(logging.Filter): + """ + A filter that only logs messages produced at the INFO logging + level. This is used by the logging_info_is_print commandline + option to select a subset of the logging stream to send to a + stdout handler. + + """ def filter(self, record): return record.levelno == logging.INFO class MillisecondAwareFormatter(logging.Formatter): + """ + A formatter for adding milliseconds to log messages. + + """ converter = datetime.datetime.fromtimestamp def formatTime(self, record, datefmt=None): @@ -197,6 +334,14 @@ def initialize_logging(logger=None) -> logging.Logger: handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) + if config.config['logging_squelch_repeats_enabled']: + for handler in handlers: + handler.addFilter(SquelchRepeatedMessagesFilter()) + + if config.config['logging_probabilistically_enabled']: + for handler in handlers: + handler.addFilter(ProbabilisticFilter()) + logger.setLevel(numeric_level) logger.propagate = False @@ -222,6 +367,8 @@ def get_logger(name: str = ""): def tprint(*args, **kwargs) -> None: + """Legacy function for printing a message augmented with thread id.""" + if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") @@ -231,54 +378,61 @@ def tprint(*args, **kwargs) -> None: def dprint(*args, **kwargs) -> None: + """Legacy function used to print to stderr.""" + print(*args, file=sys.stderr, **kwargs) class OutputMultiplexer(object): + """ + A class that broadcasts printed messages to several sinks (including + various logging levels, different files, different file handles, + the house log, etc...) + """ class Destination(enum.IntEnum): """Bits in the destination_bitv bitvector. Used to indicate the output destination.""" - STDOUT = 0x1 - STDERR = 0x2 - LOG_DEBUG = 0x4 # -\ - LOG_INFO = 0x8 # | - LOG_WARNING = 0x10 # > Should provide logger to the c'tor. - LOG_ERROR = 0x20 # | - LOG_CRITICAL = 0x40 # _/ - FILENAME = 0x80 # Must provide a filename to the c'tor. - FILEHANDLE = 0x100 # Must provide a handle to the c'tor. - HLOG = 0x200 + LOG_DEBUG = 0x01 # ⎫ + LOG_INFO = 0x02 # ⎪ + LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor. + LOG_ERROR = 0x08 # ⎪ + LOG_CRITICAL = 0x10 # ⎭ + FILENAMES = 0x20 # Must provide a filename to the c'tor. + FILEHANDLES = 0x40 # Must provide a handle to the c'tor. + HLOG = 0x80 ALL_LOG_DESTINATIONS = ( LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL ) - ALL_OUTPUT_DESTINATIONS = 0x2FF + ALL_OUTPUT_DESTINATIONS = 0x8F def __init__(self, destination_bitv: int, *, logger=None, - filename=None, - handle=None): + filenames: Optional[Iterable[str]] = None, + handles: Optional[Iterable[io.TextIOWrapper]] = None): if logger is None: logger = logging.getLogger(None) self.logger = logger - if filename is not None: - self.f = open(filename, "wb", buffering=0) + if filenames is not None: + self.f = [ + open(filename, 'wb', buffering=0) for filename in filenames + ] else: - if self.destination_bitv & OutputMultiplexer.FILENAME: + if destination_bitv & OutputMultiplexer.FILENAMES: raise ValueError( - "Filename argument is required if bitv & FILENAME" + "Filenames argument is required if bitv & FILENAMES" ) self.f = None - if handle is not None: - self.h = handle + if handles is not None: + self.h = [handle for handle in handles] else: - if self.destination_bitv & OutputMultiplexer.FILEHANDLE: + if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES: raise ValueError( - "Handle argument is required if bitv & FILEHANDLE" + "Handle argument is required if bitv & FILEHANDLES" ) self.h = None @@ -288,13 +442,13 @@ class OutputMultiplexer(object): return self.destination_bitv def set_destination_bitv(self, destination_bitv: int): - if destination_bitv & self.Destination.FILENAME and self.f is None: + if destination_bitv & self.Destination.FILENAMES and self.f is None: raise ValueError( - "Filename argument is required if bitv & FILENAME" + "Filename argument is required if bitv & FILENAMES" ) - if destination_bitv & self.Destination.FILEHANDLE and self.h is None: + if destination_bitv & self.Destination.FILEHANDLES and self.h is None: raise ValueError( - "Handle argument is required if bitv & FILEHANDLE" + "Handle argument is required if bitv & FILEHANDLES" ) self.destination_bitv = destination_bitv @@ -315,25 +469,23 @@ class OutputMultiplexer(object): sep = " " if end is None: end = "\n" - if self.destination_bitv & self.Destination.STDOUT: - print(buf, file=sys.stdout, sep=sep, end=end) - if self.destination_bitv & self.Destination.STDERR: - print(buf, file=sys.stderr, sep=sep, end=end) if end == '\n': buf += '\n' if ( - self.destination_bitv & self.Destination.FILENAME and + self.destination_bitv & self.Destination.FILENAMES and self.f is not None ): - self.f.write(buf.encode('utf-8')) - self.f.flush() + for _ in self.f: + _.write(buf.encode('utf-8')) + _.flush() if ( - self.destination_bitv & self.Destination.FILEHANDLE and + self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None ): - self.h.write(buf) - self.h.flush() + for _ in self.h: + _.write(buf) + _.flush() buf = strip_escape_sequences(buf) if self.logger is not None: @@ -352,21 +504,36 @@ class OutputMultiplexer(object): def close(self): if self.f is not None: - self.f.close() + for _ in self.f: + _.close() + +class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): + """ + A context that uses an OutputMultiplexer. e.g. -class OutputContext(OutputMultiplexer, contextlib.ContextDecorator): + with OutputMultiplexerContext( + OutputMultiplexer.LOG_INFO | + OutputMultiplexer.LOG_DEBUG | + OutputMultiplexer.FILENAMES | + OutputMultiplexer.FILEHANDLES, + filenames = [ '/tmp/foo.log', '/var/log/bar.log' ], + handles = [ f, g ] + ) as mplex: + mplex.print("This is a log message!") + + """ def __init__(self, destination_bitv: OutputMultiplexer.Destination, *, - logger=None, - filename=None, - handle=None): + logger = None, + filenames = None, + handles = None): super().__init__( destination_bitv, logger=logger, - filename=filename, - handle=handle) + filenames=filenames, + handles=handles) def __enter__(self): return self @@ -379,5 +546,12 @@ class OutputContext(OutputMultiplexer, contextlib.ContextDecorator): def hlog(message: str) -> None: + """Write a message to the house log.""" + message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'") + + +if __name__ == '__main__': + import doctest + doctest.testmod()