#!/usr/bin/env python3 """Utilities related to logging.""" import contextlib import datetime import logging from logging.handlers import RotatingFileHandler, SysLogHandler import os import pytz import sys # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import argparse_utils import config cfg = config.add_commandline_args( f'Logging ({__file__})', 'Args related to logging') cfg.add_argument( '--logging_config_file', type=argparse_utils.valid_filename, default=None, metavar='FILENAME', help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial', ) cfg.add_argument( '--logging_level', type=str, default='INFO', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], metavar='LEVEL', help='The level below which to squelch log messages.', ) cfg.add_argument( '--logging_format', type=str, default='%(levelname)s:%(asctime)s: %(message)s', help='The format for lines logged via the logger module.' ) cfg.add_argument( '--logging_date_format', type=str, default='%Y/%m/%dT%H:%M:%S.%f%z', metavar='DATEFMT', help='The format of any dates in --logging_format.' ) cfg.add_argument( '--logging_console', action=argparse_utils.ActionNoYes, default=True, help='Should we log to the console (stderr)', ) cfg.add_argument( '--logging_filename', type=str, default=None, metavar='FILENAME', help='The filename of the logfile to write.' ) cfg.add_argument( '--logging_filename_maxsize', type=int, default=(1024*1024), metavar='#BYTES', help='The maximum size (in bytes) to write to the logging_filename.' ) cfg.add_argument( '--logging_filename_count', type=int, default=2, metavar='COUNT', help='The number of logging_filename copies to keep before deleting.' ) cfg.add_argument( '--logging_syslog', action=argparse_utils.ActionNoYes, default=False, help='Should we log to localhost\'s syslog.' ) cfg.add_argument( '--logging_debug_threads', action=argparse_utils.ActionNoYes, default=False, help='Should we prepend pid/tid data to all log messages?' ) cfg.add_argument( '--logging_info_is_print', action=argparse_utils.ActionNoYes, default=False, help='logging.info also prints to stdout.' ) class OnlyInfoFilter(logging.Filter): def filter(self, record): return record.levelno == logging.INFO class MillisecondAwareFormatter(logging.Formatter): converter = datetime.datetime.fromtimestamp def formatTime(self, record, datefmt=None): ct = MillisecondAwareFormatter.converter( record.created, pytz.timezone("US/Pacific") ) if datefmt: s = ct.strftime(datefmt) else: t = ct.strftime("%Y-%m-%d %H:%M:%S") s = "%s,%03d" % (t, record.msecs) return s def initialize_logging(logger=None) -> logging.Logger: assert config.has_been_parsed() if logger is None: logger = logging.getLogger() # Root logger if config.config['logging_config_file'] is not None: logging.config.fileConfig('logging.conf') return logger handlers = [] numeric_level = getattr( logging, config.config['logging_level'].upper(), None ) if not isinstance(numeric_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) fmt = config.config['logging_format'] if config.config['logging_debug_threads']: fmt = f'%(process)d.%(thread)d|{fmt}' if config.config['logging_syslog']: if sys.platform in ('win32', 'cygwin'): print( "WARNING: Current platform does not support syslog; IGNORING.", file=sys.stderr ) else: handler = SysLogHandler() # for k, v in encoded_priorities.items(): # handler.encodePriority(k, v) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handler.setLevel(numeric_level) handlers.append(handler) if config.config['logging_filename'] is not None: handler = RotatingFileHandler( config.config['logging_filename'], maxBytes = config.config['logging_filename_maxsize'], backupCount = config.config['logging_filename_count'], ) handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if config.config['logging_console']: handler = logging.StreamHandler(sys.stderr) handler.setLevel(numeric_level) handler.setFormatter( MillisecondAwareFormatter( fmt=fmt, datefmt=config.config['logging_date_format'], ) ) handlers.append(handler) if len(handlers) == 0: handlers.append(logging.NullHandler()) for handler in handlers: logger.addHandler(handler) if config.config['logging_info_is_print']: handler = logging.StreamHandler(sys.stdout) handler.addFilter(OnlyInfoFilter()) logger.addHandler(handler) logger.setLevel(numeric_level) logger.propagate = False return logger def get_logger(name: str = ""): logger = logging.getLogger(name) return initialize_logging(logger) def tprint(*args, **kwargs) -> None: if config.config['logging_debug_threads']: from thread_utils import current_thread_id print(f'{current_thread_id()}', end="") print(*args, **kwargs) else: pass def dprint(*args, **kwargs) -> None: print(*args, file=sys.stderr, **kwargs) class OutputSink(object): # Bits in the destination_bitv bitvector. Used to indicate the # output destination. STDOUT = 0x1 STDERR = 0x2 LOG_DEBUG = 0x4 # -\ LOG_INFO = 0x8 # | LOG_WARNING = 0x10 # > Should provide logger to the c'tor. LOG_ERROR = 0x20 # | LOG_CRITICAL = 0x40 # _/ FILENAME = 0x80 # Must provide a filename to the c'tor. HLOG = 0x100 ALL_LOG_DESTINATIONS = ( LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL ) ALL_OUTPUT_DESTINATIONS = 0x1FF def __init__(self, destination_bitv: int, *, logger=None, filename=None): if logger is None: logger = logging.getLogger(None) self.logger = logger if filename is not None: self.f = open(filename, "wb", buffering=0) else: if self.destination_bitv & OutputSink.FILENAME: raise ValueError( "Filename argument is required if bitv & FILENAME" ) self.f = None self.set_destination_bitv(destination_bitv) def get_destination_bitv(self): return self.destination_bitv def set_destination_bitv(self, destination_bitv: int): if destination_bitv & self.FILENAME and self.f is None: raise ValueError( "Filename argument is required if bitv & FILENAME" ) self.destination_bitv = destination_bitv def print(self, *args, **kwargs): from string_utils import sprintf, strip_escape_sequences end = kwargs.pop("end", None) if end is not None: if not isinstance(end, str): raise TypeError("end must be None or a string") sep = kwargs.pop("sep", None) if sep is not None: if not isinstance(sep, str): raise TypeError("sep must be None or a string") if kwargs: raise TypeError("invalid keyword arguments to print()") buf = sprintf(*args, end="", sep=sep) if sep is None: sep = " " if end is None: end = "\n" if self.destination_bitv & self.STDOUT: print(buf, file=sys.stdout, sep=sep, end=end) if self.destination_bitv & self.STDERR: print(buf, file=sys.stderr, sep=sep, end=end) if end == '\n': buf += '\n' if self.destination_bitv & self.FILENAME and self.f is not None: self.f.write(buf.encode('utf-8')) self.f.flush() buf = strip_escape_sequences(buf) if self.logger is not None: if self.destination_bitv & self.LOG_DEBUG: self.logger.debug(buf) if self.destination_bitv & self.LOG_INFO: self.logger.info(buf) if self.destination_bitv & self.LOG_WARNING: self.logger.warning(buf) if self.destination_bitv & self.LOG_ERROR: self.logger.error(buf) if self.destination_bitv & self.LOG_CRITICAL: self.logger.critical(buf) if self.destination_bitv & self.HLOG: hlog(buf) def close(self): if self.f is not None: self.f.close() class OutputContext(OutputSink, contextlib.ContextDecorator): def __init__(self, destination_bitv: int, *, logger=None, filename=None): super().__init__(destination_bitv, logger=logger, filename=filename) def __enter__(self): return self def __exit__(self, etype, value, traceback): super().close() if etype is not None: return False return True def hlog(message: str) -> None: message = message.replace("'", "'\"'\"'") os.system(f"/usr/bin/logger -p local7.info -- '{message}'")