Used isort to sort imports. Also added to the git pre-commit hook.
[python_utils.git] / logging_utils.py
index 2d9d63b78f8543890bb25fa879667bcc59294af1..fdbb7a3d48daecb4e3b81ed4aad4bf0e11a79241 100644 (file)
@@ -8,14 +8,15 @@ import datetime
 import enum
 import io
 import logging
-from logging.handlers import RotatingFileHandler, SysLogHandler
 import os
 import random
 import sys
-from typing import Callable, Iterable, Mapping, Optional
+from logging.config import fileConfig
+from logging.handlers import RotatingFileHandler, SysLogHandler
+from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional
 
-from overrides import overrides
 import pytz
+from overrides import overrides
 
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
@@ -178,7 +179,7 @@ logging_initialized = False
 
 
 # A map from logging_callsite_id -> count of logged messages.
-squelched_logging_counts: Mapping[str, int] = {}
+squelched_logging_counts: Dict[str, int] = {}
 
 
 def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
@@ -222,7 +223,7 @@ class SquelchRepeatedMessagesFilter(logging.Filter):
     """
 
     def __init__(self) -> None:
-        self.counters = collections.Counter()
+        self.counters: collections.Counter = collections.Counter()
         super().__init__()
 
     @overrides
@@ -313,7 +314,7 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter):
 
 
 # A map from function_identifier -> probability of logging (0.0%..100.0%)
-probabilistic_logging_levels: Mapping[str, float] = {}
+probabilistic_logging_levels: Dict[str, float] = {}
 
 
 def logging_is_probabilistic(probability_of_logging: float) -> Callable:
@@ -381,7 +382,7 @@ class MillisecondAwareFormatter(logging.Formatter):
 
     """
 
-    converter = datetime.datetime.fromtimestamp
+    converter = datetime.datetime.fromtimestamp  # type: ignore
 
     @overrides
     def formatTime(self, record, datefmt=None):
@@ -396,10 +397,76 @@ class MillisecondAwareFormatter(logging.Formatter):
         return s
 
 
+def log_about_logging(
+    logger, default_logging_level, preexisting_handlers_count, fmt, facility_name
+):
+    level_name = logging._levelToName.get(
+        default_logging_level, str(default_logging_level)
+    )
+    logger.debug(f'Initialized global logging; default logging level is {level_name}.')
+    if (
+        config.config['logging_clear_preexisting_handlers']
+        and preexisting_handlers_count > 0
+    ):
+        msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
+        logger.warning(msg)
+    logger.debug(f'Logging format specification is "{fmt}"')
+    if config.config['logging_debug_threads']:
+        logger.debug(
+            '...Logging format spec captures tid/pid (--logging_debug_threads)'
+        )
+    if config.config['logging_debug_modules']:
+        logger.debug(
+            '...Logging format spec captures files/functions/lineno (--logging_debug_modules)'
+        )
+    if config.config['logging_syslog']:
+        logger.debug(
+            f'Logging to syslog as {facility_name} with priority mapping based on level'
+        )
+    if config.config['logging_filename']:
+        logger.debug(f'Logging to filename {config.config["logging_filename"]}')
+        logger.debug(
+            f'...with {config.config["logging_filename_maxsize"]} bytes max file size.'
+        )
+        logger.debug(
+            f'...and {config.config["logging_filename_count"]} rotating backup file count.'
+        )
+    if config.config['logging_console']:
+        logger.debug('Logging to the console (stderr).')
+    if config.config['logging_info_is_print']:
+        logger.debug(
+            'Logging logger.info messages will be repeated on stdout (--logging_info_is_print)'
+        )
+    if config.config['logging_squelch_repeats']:
+        logger.debug(
+            'Logging code allowed to request repeated messages be squelched (--logging_squelch_repeats)'
+        )
+    else:
+        logger.debug(
+            'Logging code forbidden to request messages be squelched; all messages logged (--no_logging_squelch_repeats)'
+        )
+    if config.config['logging_probabilistically']:
+        logger.debug(
+            'Logging code is allowed to request probabilistic logging (--logging_probabilistically)'
+        )
+    else:
+        logger.debug(
+            'Logging code is forbidden to request probabilistic logging; messages always logged (--no_logging_probabilistically)'
+        )
+    if config.config['lmodule']:
+        logger.debug(
+            f'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})'
+        )
+    if config.config['logging_captures_prints']:
+        logger.debug(
+            'Logging will capture printed data as logger.info messages (--logging_captures_prints)'
+        )
+
+
 def initialize_logging(logger=None) -> logging.Logger:
     global logging_initialized
     if logging_initialized:
-        return
+        return logging.getLogger()
     logging_initialized = True
 
     if logger is None:
@@ -413,10 +480,11 @@ def initialize_logging(logger=None) -> logging.Logger:
             preexisting_handlers_count += 1
 
     if config.config['logging_config_file'] is not None:
-        logging.config.fileConfig('logging.conf')
+        fileConfig(config.config['logging_config_file'])
         return logger
 
-    handlers = []
+    handlers: List[logging.Handler] = []
+    handler: Optional[logging.Handler] = None
 
     # Global default logging level (--logging_level)
     default_logging_level = getattr(
@@ -437,11 +505,13 @@ def initialize_logging(logger=None) -> logging.Logger:
     if config.config['logging_debug_modules']:
         fmt = f'%(filename)s:%(funcName)s:%(lineno)s|{fmt}'
 
+    facility_name = None
     if config.config['logging_syslog']:
         if sys.platform not in ('win32', 'cygwin'):
             if config.config['logging_syslog_facility']:
                 facility_name = 'LOG_' + config.config['logging_syslog_facility']
-            facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER)
+            facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER)  # type: ignore
+            assert facility
             handler = SysLogHandler(facility=facility, address='/dev/log')
             handler.setFormatter(
                 MillisecondAwareFormatter(
@@ -505,9 +575,8 @@ def initialize_logging(logger=None) -> logging.Logger:
     logger.propagate = False
 
     if config.config['logging_captures_prints']:
-        import builtins
-
         global built_in_print
+        import builtins
 
         def print_and_also_log(*arg, **kwarg):
             f = kwarg.get('file', None)
@@ -521,68 +590,13 @@ def initialize_logging(logger=None) -> logging.Logger:
 
     # At this point the logger is ready, handlers are set up,
     # etc... so log about the logging configuration.
-
-    level_name = logging._levelToName.get(
-        default_logging_level, str(default_logging_level)
+    log_about_logging(
+        logger,
+        default_logging_level,
+        preexisting_handlers_count,
+        fmt,
+        facility_name,
     )
-    logger.debug(f'Initialized global logging; default logging level is {level_name}.')
-    if (
-        config.config['logging_clear_preexisting_handlers']
-        and preexisting_handlers_count > 0
-    ):
-        msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
-        logger.warning(msg)
-    logger.debug(f'Logging format specification is "{fmt}"')
-    if config.config['logging_debug_threads']:
-        logger.debug(
-            '...Logging format spec captures tid/pid (--logging_debug_threads)'
-        )
-    if config.config['logging_debug_modules']:
-        logger.debug(
-            '...Logging format spec captures files/functions/lineno (--logging_debug_modules)'
-        )
-    if config.config['logging_syslog']:
-        logger.debug(
-            f'Logging to syslog as {facility_name} with priority mapping based on level'
-        )
-    if config.config['logging_filename']:
-        logger.debug(f'Logging to filename {config.config["logging_filename"]}')
-        logger.debug(
-            f'...with {config.config["logging_filename_maxsize"]} bytes max file size.'
-        )
-        logger.debug(
-            f'...and {config.config["logging_filename_count"]} rotating backup file count.'
-        )
-    if config.config['logging_console']:
-        logger.debug('Logging to the console (stderr).')
-    if config.config['logging_info_is_print']:
-        logger.debug(
-            'Logging logger.info messages will be repeated on stdout (--logging_info_is_print)'
-        )
-    if config.config['logging_squelch_repeats']:
-        logger.debug(
-            'Logging code allowed to request repeated messages be squelched (--logging_squelch_repeats)'
-        )
-    else:
-        logger.debug(
-            'Logging code forbidden to request messages be squelched; all messages logged (--no_logging_squelch_repeats)'
-        )
-    if config.config['logging_probabilistically']:
-        logger.debug(
-            'Logging code is allowed to request probabilistic logging (--logging_probabilistically)'
-        )
-    else:
-        logger.debug(
-            'Logging code is forbidden to request probabilistic logging; messages always logged (--no_logging_probabilistically)'
-        )
-    if config.config['lmodule']:
-        logger.debug(
-            f'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})'
-        )
-    if config.config['logging_captures_prints']:
-        logger.debug(
-            'Logging will capture printed data as logger.info messages (--logging_captures_prints)'
-        )
     return logger
 
 
@@ -628,18 +642,20 @@ class OutputMultiplexer(object):
         """Bits in the destination_bitv bitvector.  Used to indicate the
         output destination."""
 
-        LOG_DEBUG = 0x01  #  ⎫
-        LOG_INFO = 0x02  #  ⎪
-        LOG_WARNING = 0x04  #  ⎬ Must provide logger to the c'tor.
-        LOG_ERROR = 0x08  #  ⎪
+        # fmt: off
+        LOG_DEBUG = 0x01     #  ⎫
+        LOG_INFO = 0x02      #  ⎪
+        LOG_WARNING = 0x04   #  ⎬ Must provide logger to the c'tor.
+        LOG_ERROR = 0x08     #  ⎪
         LOG_CRITICAL = 0x10  #  ⎭
-        FILENAMES = 0x20  # Must provide a filename to the c'tor.
-        FILEHANDLES = 0x40  # Must provide a handle to the c'tor.
+        FILENAMES = 0x20     # Must provide a filename to the c'tor.
+        FILEHANDLES = 0x40   # Must provide a handle to the c'tor.
         HLOG = 0x80
         ALL_LOG_DESTINATIONS = (
             LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
         )
         ALL_OUTPUT_DESTINATIONS = 0x8F
+        # fmt: on
 
     def __init__(
         self,
@@ -653,13 +669,15 @@ class OutputMultiplexer(object):
             logger = logging.getLogger(None)
         self.logger = logger
 
+        self.f: Optional[List[Any]] = None
         if filenames is not None:
             self.f = [open(filename, 'wb', buffering=0) for filename in filenames]
         else:
-            if destination_bitv & OutputMultiplexer.FILENAMES:
+            if destination_bitv & OutputMultiplexer.Destination.FILENAMES:
                 raise ValueError("Filenames argument is required if bitv & FILENAMES")
             self.f = None
 
+        self.h: Optional[List[Any]] = None
         if handles is not None:
             self.h = [handle for handle in handles]
         else: