#!/usr/bin/env python3 """Utilities related to logging.""" import collections import contextlib import datetime import enum import io import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os import pytz import sys from typing import Iterable, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import argparse_utils import config cfg = config.add_commandline_args( f'Logging ({__file__})', 'Args related to logging') cfg.add_argument( '--logging_config_file', type=argparse_utils.valid_filename, default=None, metavar='FILENAME', help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial', ) cfg.add_argument( '--logging_level', type=str, default='INFO', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], metavar='LEVEL', help='The level below which to squelch log messages.', ) cfg.add_argument( '--logging_format', type=str, default='%(levelname).1s:%(asctime)s: %(message)s', help='The format for lines logged via the logger module.' ) cfg.add_argument( '--logging_date_format', type=str, default='%Y/%m/%dT%H:%M:%S.%f%z', metavar='DATEFMT', help='The format of any dates in --logging_format.' ) cfg.add_argument( '--logging_console', action=argparse_utils.ActionNoYes, default=True, help='Should we log to the console (stderr)', ) cfg.add_argument( '--logging_filename', type=str, default=None, metavar='FILENAME', help='The filename of the logfile to write.' ) cfg.add_argument( '--logging_filename_maxsize', type=int, default=(1024*1024), metavar='#BYTES', help='The maximum size (in bytes) to write to the logging_filename.' ) cfg.add_argument( '--logging_filename_count', type=int, default=2, metavar='COUNT', help='The number of logging_filename copies to keep before deleting.' ) cfg.add_argument( '--logging_syslog', action=argparse_utils.ActionNoYes, default=False, help='Should we log to localhost\'s syslog.' ) cfg.add_argument( '--logging_debug_threads', action=argparse_utils.ActionNoYes, default=False, help='Should we prepend pid/tid data to all log messages?' ) cfg.add_argument( '--logging_info_is_print', action=argparse_utils.ActionNoYes, default=False, help='logging.info also prints to stdout.' ) cfg.add_argument( '--logging_max_n_times_per_message', type=int, default=0, help='When set, ignore logged messages from the same site after N.' ) # See also: OutputMultiplexer cfg.add_argument( '--logging_captures_prints', action=argparse_utils.ActionNoYes, default=False, help='When calling print also log.info too' ) built_in_print = print class OnlyInfoFilter(logging.Filter): """ A filter that only logs messages produced at the INFO logging level. """ def filter(self, record): return record.levelno == logging.INFO class OnlyNTimesFilter(logging.Filter): """ A filter that only logs messages from a given site with the same message at the same logging level N times and ignores subsequent attempts to log. """ def __init__(self, maximum: int) -> None: self.maximum = maximum self.counters = collections.Counter() super().__init__() def filter(self, record: logging.LogRecord) -> bool: source = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}' count = self.counters[source] self.counters[source] += 1 return count < self.maximum class MillisecondAwareFormatter(logging.Formatter): """ A formatter for adding milliseconds to log messages. """ converter = datetime.datetime.fromtimestamp def formatTime(self, record, datefmt=None): ct = MillisecondAwareFormatter.converter( record.created, pytz.timezone("US/Pacific") ) if datefmt: s = ct.strftime(datefmt) else: t = ct.strftime("%Y-%m-%d %H:%M:%S") s = "%s,%03d" % (t, record.msecs) return s def initialize_logging(logger=None) -> logging.Logger: assert config.has_been_parsed() if logger is None: logger = logging.getLogger() # Root logger if config.config['logging_config_file'] is not None: logging.config.fileConfig('logging.conf') return logger handlers = [] numeric_level = getattr( logging, config.config['logging_level'].upper(), None ) if not isinstance(numeric_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) fmt = config.config['logging_format'] if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' if config.config['logging_syslog']: if sys.platform not in ('win32', 'cygwin'): handler = SysLogHandler() # for k, v in encoded_priorities.items(): # handler.encodePriority(k, v) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handler.setLevel(numeric_level) handlers.append(handler) if config.config['logging_filename']: handler = RotatingFileHandler( config.config['logging_filename'], maxBytes = config.config['logging_filename_maxsize'], backupCount = config.config['logging_filename_count'], ) handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if config.config['logging_console']: handler = logging.StreamHandler(sys.stderr) handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if len(handlers) == 0: handlers.append(logging.NullHandler()) for handler in handlers: logger.addHandler(handler) if config.config['logging_info_is_print']: handler = logging.StreamHandler(sys.stdout) handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) maximum = config.config['logging_max_n_times_per_message'] if maximum > 0: for handler in handlers: handler.addFilter(OnlyNTimesFilter(maximum)) logger.setLevel(numeric_level) logger.propagate = False if config.config['logging_captures_prints']: import builtins global built_in_print def print_and_also_log(*arg, **kwarg): f = kwarg.get('file', None) if f == sys.stderr: logger.warning(*arg) else: logger.info(*arg) built_in_print(*arg, **kwarg) builtins.print = print_and_also_log return logger def get_logger(name: str = ""): logger = logging.getLogger(name) return initialize_logging(logger) def tprint(*args, **kwargs) -> None: if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") print(*args, **kwargs) else: pass def dprint(*args, **kwargs) -> None: print(*args, file=sys.stderr, **kwargs) class OutputMultiplexer(object): class Destination(enum.IntEnum): """Bits in the destination_bitv bitvector. Used to indicate the output destination.""" LOG_DEBUG = 0x01 # -\ LOG_INFO = 0x02 # | LOG_WARNING = 0x04 # > Should provide logger to the c'tor. LOG_ERROR = 0x08 # | LOG_CRITICAL = 0x10 # _/ FILENAMES = 0x20 # Must provide a filename to the c'tor. FILEHANDLES = 0x40 # Must provide a handle to the c'tor. HLOG = 0x80 ALL_LOG_DESTINATIONS = ( LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL ) ALL_OUTPUT_DESTINATIONS = 0x8F def __init__(self, destination_bitv: int, *, logger=None, filenames: Optional[Iterable[str]] = None, handles: Optional[Iterable[io.TextIOWrapper]] = None): if logger is None: logger = logging.getLogger(None) self.logger = logger if filenames is not None: self.f = [ open(filename, 'wb', buffering=0) for filename in filenames ] else: if destination_bitv & OutputMultiplexer.FILENAMES: raise ValueError( "Filenames argument is required if bitv & FILENAMES" ) self.f = None if handles is not None: self.h = [handle for handle in handles] else: if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES: raise ValueError( "Handle argument is required if bitv & FILEHANDLES" ) self.h = None self.set_destination_bitv(destination_bitv) def get_destination_bitv(self): return self.destination_bitv def set_destination_bitv(self, destination_bitv: int): if destination_bitv & self.Destination.FILENAMES and self.f is None: raise ValueError( "Filename argument is required if bitv & FILENAMES" ) if destination_bitv & self.Destination.FILEHANDLES and self.h is None: raise ValueError( "Handle argument is required if bitv & FILEHANDLES" ) self.destination_bitv = destination_bitv def print(self, *args, **kwargs): from string_utils import sprintf, strip_escape_sequences end = kwargs.pop("end", None) if end is not None: if not isinstance(end, str): raise TypeError("end must be None or a string") sep = kwargs.pop("sep", None) if sep is not None: if not isinstance(sep, str): raise TypeError("sep must be None or a string") if kwargs: raise TypeError("invalid keyword arguments to print()") buf = sprintf(*args, end="", sep=sep) if sep is None: sep = " " if end is None: end = "\n" if end == '\n': buf += '\n' if ( self.destination_bitv & self.Destination.FILENAMES and self.f is not None ): for _ in self.f: _.write(buf.encode('utf-8')) _.flush() if ( self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None ): for _ in self.h: _.write(buf) _.flush() buf = strip_escape_sequences(buf) if self.logger is not None: if self.destination_bitv & self.Destination.LOG_DEBUG: self.logger.debug(buf) if self.destination_bitv & self.Destination.LOG_INFO: self.logger.info(buf) if self.destination_bitv & self.Destination.LOG_WARNING: self.logger.warning(buf) if self.destination_bitv & self.Destination.LOG_ERROR: self.logger.error(buf) if self.destination_bitv & self.Destination.LOG_CRITICAL: self.logger.critical(buf) if self.destination_bitv & self.Destination.HLOG: hlog(buf) def close(self): if self.f is not None: for _ in self.f: _.close() class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): def __init__(self, destination_bitv: OutputMultiplexer.Destination, *, logger = None, filenames = None, handles = None): super().__init__( destination_bitv, logger=logger, filenames=filenames, handles=handles) def __enter__(self): return self def __exit__(self, etype, value, traceback) -> bool: super().close() if etype is not None: return False return True def hlog(message: str) -> None: message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'")