#!/usr/bin/env python3 """Utilities related to logging.""" import collections import contextlib import datetime import enum import io import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os import pytz import random import sys from typing import Callable, Iterable, Mapping, Optional from overrides import overrides # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import argparse_utils import config cfg = config.add_commandline_args( f'Logging ({__file__})', 'Args related to logging') cfg.add_argument( '--logging_config_file', type=argparse_utils.valid_filename, default=None, metavar='FILENAME', help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial', ) cfg.add_argument( '--logging_level', type=str, default='INFO', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], metavar='LEVEL', help='The global default level below which to squelch log messages; see also --lmodule', ) cfg.add_argument( '--logging_format', type=str, default=None, help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects' ) cfg.add_argument( '--logging_date_format', type=str, default='%Y/%m/%dT%H:%M:%S.%f%z', metavar='DATEFMT', help='The format of any dates in --logging_format.' ) cfg.add_argument( '--logging_console', action=argparse_utils.ActionNoYes, default=True, help='Should we log to the console (stderr)', ) cfg.add_argument( '--logging_filename', type=str, default=None, metavar='FILENAME', help='The filename of the logfile to write.' ) cfg.add_argument( '--logging_filename_maxsize', type=int, default=(1024*1024), metavar='#BYTES', help='The maximum size (in bytes) to write to the logging_filename.' ) cfg.add_argument( '--logging_filename_count', type=int, default=7, metavar='COUNT', help='The number of logging_filename copies to keep before deleting.' ) cfg.add_argument( '--logging_syslog', action=argparse_utils.ActionNoYes, default=False, help='Should we log to localhost\'s syslog.' ) cfg.add_argument( '--logging_syslog_facility', type=str, default = 'USER', choices=['NOTSET', 'AUTH', 'AUTH_PRIV', 'CRON', 'DAEMON', 'FTP', 'KERN', 'LPR', 'MAIL', 'NEWS', 'SYSLOG', 'USER', 'UUCP', 'LOCAL0', 'LOCAL1', 'LOCAL2', 'LOCAL3', 'LOCAL4', 'LOCAL5', 'LOCAL6', 'LOCAL7'], metavar='SYSLOG_FACILITY_LIST', help='The default syslog message facility identifier', ) cfg.add_argument( '--logging_debug_threads', action=argparse_utils.ActionNoYes, default=False, help='Should we prepend pid/tid data to all log messages?' ) cfg.add_argument( '--logging_debug_modules', action=argparse_utils.ActionNoYes, default=False, help='Should we prepend module/function data to all log messages?' ) cfg.add_argument( '--logging_info_is_print', action=argparse_utils.ActionNoYes, default=False, help='logging.info also prints to stdout.' ) cfg.add_argument( '--logging_squelch_repeats_enabled', action=argparse_utils.ActionNoYes, default=True, help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?' ) cfg.add_argument( '--logging_probabilistically_enabled', action=argparse_utils.ActionNoYes, default=True, help='Do we allow probabilistic logging (for code that wants it) or should we always log?' ) # See also: OutputMultiplexer cfg.add_argument( '--logging_captures_prints', action=argparse_utils.ActionNoYes, default=False, help='When calling print, also log.info automatically.' ) cfg.add_argument( '--lmodule', type=str, metavar='=[,=...]', help=( 'Allows per-scope logging levels which override the global level set with --logging-level.' + 'Pass a space separated list of = where is one of: module, ' + 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' ) ) built_in_print = print def function_identifier(f: Callable) -> str: """ Given a callable function, return a string that identifies it. Usually that string is just __module__:__name__ but there's a corner case: when __module__ is __main__ (i.e. the callable is defined in the same module as __main__). In this case, f.__module__ returns "__main__" instead of the file that it is defined in. Work around this using pathlib.Path (see below). >>> function_identifier(function_identifier) 'logging_utils:function_identifier' """ if f.__module__ == '__main__': from pathlib import Path import __main__ module = __main__.__file__ module = Path(module).stem return f'{module}:{f.__name__}' else: return f'{f.__module__}:{f.__name__}' # A map from logging_callsite_id -> count of logged messages. squelched_logging_counts: Mapping[str, int] = {} def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable: """ A decorator that marks a function as interested in having the logging messages that it produces be squelched (ignored) after it logs the same message more than N times. Note: this decorator affects *ALL* logging messages produced within the decorated function. That said, messages must be identical in order to be squelched. For example, if the same line of code produces different messages (because of, e.g., a format string), the messages are considered to be different. """ def squelch_logging_wrapper(f: Callable): identifier = function_identifier(f) squelched_logging_counts[identifier] = squelch_after_n_repeats return f return squelch_logging_wrapper class SquelchRepeatedMessagesFilter(logging.Filter): """ A filter that only logs messages from a given site with the same (exact) message at the same logging level N times and ignores subsequent attempts to log. This filter only affects logging messages that repeat more than a threshold number of times from functions that are tagged with the @logging_utils.squelched_logging_ok decorator. """ def __init__(self) -> None: self.counters = collections.Counter() super().__init__() @overrides def filter(self, record: logging.LogRecord) -> bool: id1 = f'{record.module}:{record.funcName}' if id1 not in squelched_logging_counts: return True threshold = squelched_logging_counts[id1] logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}' count = self.counters[logsite] self.counters[logsite] += 1 return count < threshold class DynamicPerScopeLoggingLevelFilter(logging.Filter): """Only interested in seeing logging messages from an allow list of module names or module:function names. Block others. """ @staticmethod def level_name_to_level(name: str) -> int: numeric_level = getattr( logging, name, None ) if not isinstance(numeric_level, int): raise ValueError('Invalid level: {name}') return numeric_level def __init__( self, default_logging_level: int, per_scope_logging_levels: str, ) -> None: super().__init__() self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) self.default_logging_level = default_logging_level self.level_by_scope = {} if per_scope_logging_levels is not None: for chunk in per_scope_logging_levels.split(','): if '=' not in chunk: print( f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.', file=sys.stderr ) continue try: (scope, level) = chunk.split('=') except ValueError: print( f'Malformed lmodule directive: "{chunk}". Ignored.', file=sys.stderr ) continue scope = scope.strip() level = level.strip().upper() if level not in self.valid_levels: print( f'Malformed lmodule directive: "{chunk}", bad level. Ignored.', file=sys.stderr ) continue self.level_by_scope[scope] = ( DynamicPerScopeLoggingLevelFilter.level_name_to_level( level ) ) @overrides def filter(self, record: logging.LogRecord) -> bool: # First try to find a logging level by scope (--lmodule) if len(self.level_by_scope) > 0: min_level = None for scope in ( record.module, f'{record.module}:{record.funcName}', f':{record.funcName}' ): level = self.level_by_scope.get(scope, None) if level is not None: if min_level is None or level < min_level: min_level = level # If we found one, use it instead of the global default level. if min_level is not None: return record.levelno >= min_level # Otherwise, use the global logging level (--logging_level) return record.levelno >= self.default_logging_level # A map from function_identifier -> probability of logging (0.0%..100.0%) probabilistic_logging_levels: Mapping[str, float] = {} def logging_is_probabilistic(probability_of_logging: float) -> Callable: """ A decorator that indicates that all logging statements within the scope of a particular (marked) function are not deterministic (i.e. they do not always unconditionally log) but rather are probabilistic (i.e. they log N% of the time randomly). This affects *ALL* logging statements within the marked function. """ def probabilistic_logging_wrapper(f: Callable): identifier = function_identifier(f) probabilistic_logging_levels[identifier] = probability_of_logging return f return probabilistic_logging_wrapper class ProbabilisticFilter(logging.Filter): """ A filter that logs messages probabilistically (i.e. randomly at some percent chance). This filter only affects logging messages from functions that have been tagged with the @logging_utils.probabilistic_logging decorator. """ @overrides def filter(self, record: logging.LogRecord) -> bool: id1 = f'{record.module}:{record.funcName}' if id1 not in probabilistic_logging_levels: return True threshold = probabilistic_logging_levels[id1] return (random.random() * 100.0) <= threshold class OnlyInfoFilter(logging.Filter): """ A filter that only logs messages produced at the INFO logging level. This is used by the logging_info_is_print commandline option to select a subset of the logging stream to send to a stdout handler. """ @overrides def filter(self, record: logging.LogRecord): return record.levelno == logging.INFO class MillisecondAwareFormatter(logging.Formatter): """ A formatter for adding milliseconds to log messages. """ converter = datetime.datetime.fromtimestamp @overrides def formatTime(self, record, datefmt=None): ct = MillisecondAwareFormatter.converter( record.created, pytz.timezone("US/Pacific") ) if datefmt: s = ct.strftime(datefmt) else: t = ct.strftime("%Y-%m-%d %H:%M:%S") s = "%s,%03d" % (t, record.msecs) return s def initialize_logging(logger=None) -> logging.Logger: assert config.has_been_parsed() if logger is None: logger = logging.getLogger() # Root logger if config.config['logging_config_file'] is not None: logging.config.fileConfig('logging.conf') return logger handlers = [] # Global default logging level (--logging_level) default_logging_level = getattr( logging, config.config['logging_level'].upper(), None ) if not isinstance(default_logging_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) if config.config['logging_format']: fmt = config.config['logging_format'] else: if config.config['logging_syslog']: fmt = '%(levelname).1s:%(filename)s[%(process)d]: %(message)s' else: fmt = '%(levelname).1s:%(asctime)s: %(message)s' if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' if config.config['logging_debug_modules']: fmt = f'%(filename)s:%(funcName)s:%(lineno)s|{fmt}' if config.config['logging_syslog']: if sys.platform not in ('win32', 'cygwin'): if config.config['logging_syslog_facility']: facility_name = 'LOG_' + config.config['logging_syslog_facility'] facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER) handler = SysLogHandler(facility=SysLogHandler.LOG_CRON, address='/dev/log') handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if config.config['logging_filename']: handler = RotatingFileHandler( config.config['logging_filename'], maxBytes = config.config['logging_filename_maxsize'], backupCount = config.config['logging_filename_count'], ) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if config.config['logging_console']: handler = logging.StreamHandler(sys.stderr) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if len(handlers) == 0: handlers.append(logging.NullHandler()) for handler in handlers: logger.addHandler(handler) if config.config['logging_info_is_print']: handler = logging.StreamHandler(sys.stdout) handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) if config.config['logging_squelch_repeats_enabled']: for handler in handlers: handler.addFilter(SquelchRepeatedMessagesFilter()) if config.config['logging_probabilistically_enabled']: for handler in handlers: handler.addFilter(ProbabilisticFilter()) for handler in handlers: handler.addFilter( DynamicPerScopeLoggingLevelFilter( default_logging_level, config.config['lmodule'], ) ) logger.setLevel(0) logger.propagate = False if config.config['logging_captures_prints']: import builtins global built_in_print def print_and_also_log(*arg, **kwarg): f = kwarg.get('file', None) if f == sys.stderr: logger.warning(*arg) else: logger.info(*arg) built_in_print(*arg, **kwarg) builtins.print = print_and_also_log return logger def get_logger(name: str = ""): logger = logging.getLogger(name) return initialize_logging(logger) def tprint(*args, **kwargs) -> None: """Legacy function for printing a message augmented with thread id.""" if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") print(*args, **kwargs) else: pass def dprint(*args, **kwargs) -> None: """Legacy function used to print to stderr.""" print(*args, file=sys.stderr, **kwargs) class OutputMultiplexer(object): """ A class that broadcasts printed messages to several sinks (including various logging levels, different files, different file handles, the house log, etc...) """ class Destination(enum.IntEnum): """Bits in the destination_bitv bitvector. Used to indicate the output destination.""" LOG_DEBUG = 0x01 # ⎫ LOG_INFO = 0x02 # ⎪ LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor. LOG_ERROR = 0x08 # ⎪ LOG_CRITICAL = 0x10 # ⎭ FILENAMES = 0x20 # Must provide a filename to the c'tor. FILEHANDLES = 0x40 # Must provide a handle to the c'tor. HLOG = 0x80 ALL_LOG_DESTINATIONS = ( LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL ) ALL_OUTPUT_DESTINATIONS = 0x8F def __init__(self, destination_bitv: int, *, logger=None, filenames: Optional[Iterable[str]] = None, handles: Optional[Iterable[io.TextIOWrapper]] = None): if logger is None: logger = logging.getLogger(None) self.logger = logger if filenames is not None: self.f = [ open(filename, 'wb', buffering=0) for filename in filenames ] else: if destination_bitv & OutputMultiplexer.FILENAMES: raise ValueError( "Filenames argument is required if bitv & FILENAMES" ) self.f = None if handles is not None: self.h = [handle for handle in handles] else: if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES: raise ValueError( "Handle argument is required if bitv & FILEHANDLES" ) self.h = None self.set_destination_bitv(destination_bitv) def get_destination_bitv(self): return self.destination_bitv def set_destination_bitv(self, destination_bitv: int): if destination_bitv & self.Destination.FILENAMES and self.f is None: raise ValueError( "Filename argument is required if bitv & FILENAMES" ) if destination_bitv & self.Destination.FILEHANDLES and self.h is None: raise ValueError( "Handle argument is required if bitv & FILEHANDLES" ) self.destination_bitv = destination_bitv def print(self, *args, **kwargs): from string_utils import sprintf, strip_escape_sequences end = kwargs.pop("end", None) if end is not None: if not isinstance(end, str): raise TypeError("end must be None or a string") sep = kwargs.pop("sep", None) if sep is not None: if not isinstance(sep, str): raise TypeError("sep must be None or a string") if kwargs: raise TypeError("invalid keyword arguments to print()") buf = sprintf(*args, end="", sep=sep) if sep is None: sep = " " if end is None: end = "\n" if end == '\n': buf += '\n' if ( self.destination_bitv & self.Destination.FILENAMES and self.f is not None ): for _ in self.f: _.write(buf.encode('utf-8')) _.flush() if ( self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None ): for _ in self.h: _.write(buf) _.flush() buf = strip_escape_sequences(buf) if self.logger is not None: if self.destination_bitv & self.Destination.LOG_DEBUG: self.logger.debug(buf) if self.destination_bitv & self.Destination.LOG_INFO: self.logger.info(buf) if self.destination_bitv & self.Destination.LOG_WARNING: self.logger.warning(buf) if self.destination_bitv & self.Destination.LOG_ERROR: self.logger.error(buf) if self.destination_bitv & self.Destination.LOG_CRITICAL: self.logger.critical(buf) if self.destination_bitv & self.Destination.HLOG: hlog(buf) def close(self): if self.f is not None: for _ in self.f: _.close() class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): """ A context that uses an OutputMultiplexer. e.g. with OutputMultiplexerContext( OutputMultiplexer.LOG_INFO | OutputMultiplexer.LOG_DEBUG | OutputMultiplexer.FILENAMES | OutputMultiplexer.FILEHANDLES, filenames = [ '/tmp/foo.log', '/var/log/bar.log' ], handles = [ f, g ] ) as mplex: mplex.print("This is a log message!") """ def __init__(self, destination_bitv: OutputMultiplexer.Destination, *, logger = None, filenames = None, handles = None): super().__init__( destination_bitv, logger=logger, filenames=filenames, handles=handles) def __enter__(self): return self def __exit__(self, etype, value, traceback) -> bool: super().close() if etype is not None: return False return True def hlog(message: str) -> None: """Write a message to the house log.""" message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'") if __name__ == '__main__': import doctest doctest.testmod()