Adds unittest.
[python_utils.git] / logging_utils.py
index 0c4694e1056790af1db9dc53a5966a5392310110..ca1544150065b367b976d9ca43382fa1a6cff5e5 100644 (file)
@@ -1,4 +1,5 @@
 #!/usr/bin/env python3
+# -*- coding: utf-8 -*-
 
 """Utilities related to logging."""
 
@@ -8,14 +9,15 @@ import datetime
 import enum
 import io
 import logging
-from logging.handlers import RotatingFileHandler, SysLogHandler
 import os
 import random
 import sys
-from typing import Callable, Iterable, Mapping, Optional
+from logging.config import fileConfig
+from logging.handlers import RotatingFileHandler, SysLogHandler
+from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional
 
-from overrides import overrides
 import pytz
+from overrides import overrides
 
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
@@ -178,7 +180,7 @@ logging_initialized = False
 
 
 # A map from logging_callsite_id -> count of logged messages.
-squelched_logging_counts: Mapping[str, int] = {}
+squelched_logging_counts: Dict[str, int] = {}
 
 
 def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
@@ -222,8 +224,8 @@ class SquelchRepeatedMessagesFilter(logging.Filter):
     """
 
     def __init__(self) -> None:
-        self.counters = collections.Counter()
         super().__init__()
+        self.counters: collections.Counter = collections.Counter()
 
     @overrides
     def filter(self, record: logging.LogRecord) -> bool:
@@ -256,9 +258,7 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter):
         per_scope_logging_levels: str,
     ) -> None:
         super().__init__()
-        self.valid_levels = set(
-            ['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
-        )
+        self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
         self.default_logging_level = default_logging_level
         self.level_by_scope = {}
         if per_scope_logging_levels is not None:
@@ -285,9 +285,9 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter):
                         file=sys.stderr,
                     )
                     continue
-                self.level_by_scope[
-                    scope
-                ] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(level)
+                self.level_by_scope[scope] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(
+                    level
+                )
 
     @overrides
     def filter(self, record: logging.LogRecord) -> bool:
@@ -313,7 +313,7 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter):
 
 
 # A map from function_identifier -> probability of logging (0.0%..100.0%)
-probabilistic_logging_levels: Mapping[str, float] = {}
+probabilistic_logging_levels: Dict[str, float] = {}
 
 
 def logging_is_probabilistic(probability_of_logging: float) -> Callable:
@@ -381,13 +381,11 @@ class MillisecondAwareFormatter(logging.Formatter):
 
     """
 
-    converter = datetime.datetime.fromtimestamp
+    converter = datetime.datetime.fromtimestamp  # type: ignore
 
     @overrides
     def formatTime(self, record, datefmt=None):
-        ct = MillisecondAwareFormatter.converter(
-            record.created, pytz.timezone("US/Pacific")
-        )
+        ct = MillisecondAwareFormatter.converter(record.created, pytz.timezone("US/Pacific"))
         if datefmt:
             s = ct.strftime(datefmt)
         else:
@@ -397,36 +395,29 @@ class MillisecondAwareFormatter(logging.Formatter):
 
 
 def log_about_logging(
-    logger, default_logging_level, preexisting_handlers_count, fmt, facility_name
+    logger,
+    default_logging_level,
+    preexisting_handlers_count,
+    fmt,
+    facility_name,
 ):
-    level_name = logging._levelToName.get(
-        default_logging_level, str(default_logging_level)
-    )
+    level_name = logging._levelToName.get(default_logging_level, str(default_logging_level))
     logger.debug(f'Initialized global logging; default logging level is {level_name}.')
-    if (
-        config.config['logging_clear_preexisting_handlers']
-        and preexisting_handlers_count > 0
-    ):
+    if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0:
         msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
         logger.warning(msg)
     logger.debug(f'Logging format specification is "{fmt}"')
     if config.config['logging_debug_threads']:
-        logger.debug(
-            '...Logging format spec captures tid/pid (--logging_debug_threads)'
-        )
+        logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)')
     if config.config['logging_debug_modules']:
         logger.debug(
             '...Logging format spec captures files/functions/lineno (--logging_debug_modules)'
         )
     if config.config['logging_syslog']:
-        logger.debug(
-            f'Logging to syslog as {facility_name} with priority mapping based on level'
-        )
+        logger.debug(f'Logging to syslog as {facility_name} with priority mapping based on level')
     if config.config['logging_filename']:
         logger.debug(f'Logging to filename {config.config["logging_filename"]}')
-        logger.debug(
-            f'...with {config.config["logging_filename_maxsize"]} bytes max file size.'
-        )
+        logger.debug(f'...with {config.config["logging_filename_maxsize"]} bytes max file size.')
         logger.debug(
             f'...and {config.config["logging_filename_count"]} rotating backup file count.'
         )
@@ -465,7 +456,7 @@ def log_about_logging(
 def initialize_logging(logger=None) -> logging.Logger:
     global logging_initialized
     if logging_initialized:
-        return
+        return logging.getLogger()
     logging_initialized = True
 
     if logger is None:
@@ -479,15 +470,14 @@ def initialize_logging(logger=None) -> logging.Logger:
             preexisting_handlers_count += 1
 
     if config.config['logging_config_file'] is not None:
-        logging.config.fileConfig('logging.conf')
+        fileConfig(config.config['logging_config_file'])
         return logger
 
-    handlers = []
+    handlers: List[logging.Handler] = []
+    handler: Optional[logging.Handler] = None
 
     # Global default logging level (--logging_level)
-    default_logging_level = getattr(
-        logging, config.config['logging_level'].upper(), None
-    )
+    default_logging_level = getattr(logging, config.config['logging_level'].upper(), None)
     if not isinstance(default_logging_level, int):
         raise ValueError('Invalid level: %s' % config.config['logging_level'])
 
@@ -508,7 +498,8 @@ def initialize_logging(logger=None) -> logging.Logger:
         if sys.platform not in ('win32', 'cygwin'):
             if config.config['logging_syslog_facility']:
                 facility_name = 'LOG_' + config.config['logging_syslog_facility']
-            facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER)
+            facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER)  # type: ignore
+            assert facility is not None
             handler = SysLogHandler(facility=facility, address='/dev/log')
             handler.setFormatter(
                 MillisecondAwareFormatter(
@@ -666,13 +657,15 @@ class OutputMultiplexer(object):
             logger = logging.getLogger(None)
         self.logger = logger
 
+        self.f: Optional[List[Any]] = None
         if filenames is not None:
             self.f = [open(filename, 'wb', buffering=0) for filename in filenames]
         else:
-            if destination_bitv & OutputMultiplexer.FILENAMES:
+            if destination_bitv & OutputMultiplexer.Destination.FILENAMES:
                 raise ValueError("Filenames argument is required if bitv & FILENAMES")
             self.f = None
 
+        self.h: Optional[List[Any]] = None
         if handles is not None:
             self.h = [handle for handle in handles]
         else: