"""Utilities related to logging."""
+import collections
import contextlib
import datetime
import enum
+import io
import logging
from logging.handlers import RotatingFileHandler, SysLogHandler
import os
import pytz
+import random
import sys
+from typing import Callable, Iterable, Mapping, Optional
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
default=False,
help='logging.info also prints to stdout.'
)
-
-# See also: OutputMultiplexer/OutputContext
+cfg.add_argument(
+ '--logging_squelch_repeats_enabled',
+ action=argparse_utils.ActionNoYes,
+ default=True,
+ help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
+)
+cfg.add_argument(
+ '--logging_probabilistically_enabled',
+ action=argparse_utils.ActionNoYes,
+ default=True,
+ help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
+)
+# See also: OutputMultiplexer
cfg.add_argument(
'--logging_captures_prints',
action=argparse_utils.ActionNoYes,
default=False,
- help='When calling print also log.info too'
+ help='When calling print, also log.info automatically.'
)
built_in_print = print
+def function_identifier(f: Callable) -> str:
+ """
+ Given a callable function, return a string that identifies it.
+ Usually that string is just __module__:__name__ but there's a
+ corner case: when __module__ is __main__ (i.e. the callable is
+ defined in the same module as __main__). In this case,
+ f.__module__ returns "__main__" instead of the file that it is
+ defined in. Work around this using pathlib.Path (see below).
+
+ >>> function_identifier(function_identifier)
+ 'logging_utils:function_identifier'
+
+ """
+ if f.__module__ == '__main__':
+ from pathlib import Path
+ import __main__
+ module = __main__.__file__
+ module = Path(module).stem
+ return f'{module}:{f.__name__}'
+ else:
+ return f'{f.__module__}:{f.__name__}'
+
+
+# A map from logging_callsite_id -> count of logged messages.
+squelched_logging_counts: Mapping[str, int] = {}
+
+
+def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
+ """
+ A decorator that marks a function as interested in having the logging
+ messages that it produces be squelched (ignored) after it logs the
+ same message more than N times.
+
+ Note: this decorator affects *ALL* logging messages produced
+ within the decorated function. That said, messages must be
+ identical in order to be squelched. For example, if the same line
+ of code produces different messages (because of, e.g., a format
+ string), the messages are considered to be different.
+
+ """
+ def squelch_logging_wrapper(f: Callable):
+ identifier = function_identifier(f)
+ squelched_logging_counts[identifier] = squelch_after_n_repeats
+ return f
+ return squelch_logging_wrapper
+
+
+class SquelchRepeatedMessagesFilter(logging.Filter):
+ """
+ A filter that only logs messages from a given site with the same
+ (exact) message at the same logging level N times and ignores
+ subsequent attempts to log.
+
+ This filter only affects logging messages that repeat more than
+ a threshold number of times from functions that are tagged with
+ the @logging_utils.squelched_logging_ok decorator.
+
+ """
+ def __init__(self) -> None:
+ self.counters = collections.Counter()
+ super().__init__()
+
+ def filter(self, record: logging.LogRecord) -> bool:
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in squelched_logging_counts:
+ return True
+ threshold = squelched_logging_counts[id1]
+ logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
+ count = self.counters[logsite]
+ self.counters[logsite] += 1
+ return count < threshold
+
+
+# A map from function_identifier -> probability of logging (0.0%..100.0%)
+probabilistic_logging_levels: Mapping[str, float] = {}
+
+
+def logging_is_probabilistic(probability_of_logging: float) -> Callable:
+ """
+ A decorator that indicates that all logging statements within the
+ scope of a particular (marked) function are not deterministic
+ (i.e. they do not always unconditionally log) but rather are
+ probabilistic (i.e. they log N% of the time randomly).
+
+ This affects *ALL* logging statements within the marked function.
+
+ """
+ def probabilistic_logging_wrapper(f: Callable):
+ identifier = function_identifier(f)
+ probabilistic_logging_levels[identifier] = probability_of_logging
+ return f
+ return probabilistic_logging_wrapper
+
+
+class ProbabilisticFilter(logging.Filter):
+ """
+ A filter that logs messages probabilistically (i.e. randomly at some
+ percent chance).
+
+ This filter only affects logging messages from functions that have
+ been tagged with the @logging_utils.probabilistic_logging decorator.
+
+ """
+ def filter(self, record: logging.LogRecord) -> bool:
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in probabilistic_logging_levels:
+ return True
+ threshold = probabilistic_logging_levels[id1]
+ return (random.random() * 100.0) <= threshold
+
+
class OnlyInfoFilter(logging.Filter):
+ """
+ A filter that only logs messages produced at the INFO logging
+ level. This is used by the logging_info_is_print commandline
+ option to select a subset of the logging stream to send to a
+ stdout handler.
+
+ """
def filter(self, record):
return record.levelno == logging.INFO
class MillisecondAwareFormatter(logging.Formatter):
+ """
+ A formatter for adding milliseconds to log messages.
+
+ """
converter = datetime.datetime.fromtimestamp
def formatTime(self, record, datefmt=None):
handler.addFilter(OnlyInfoFilter())
logger.addHandler(handler)
+ if config.config['logging_squelch_repeats_enabled']:
+ for handler in handlers:
+ handler.addFilter(SquelchRepeatedMessagesFilter())
+
+ if config.config['logging_probabilistically_enabled']:
+ for handler in handlers:
+ handler.addFilter(ProbabilisticFilter())
+
logger.setLevel(numeric_level)
logger.propagate = False
def tprint(*args, **kwargs) -> None:
+ """Legacy function for printing a message augmented with thread id."""
+
if config.config['logging_debug_threads']:
from thread_utils import current_thread_id
print(f'{current_thread_id()}', end="")
def dprint(*args, **kwargs) -> None:
+ """Legacy function used to print to stderr."""
+
print(*args, file=sys.stderr, **kwargs)
class OutputMultiplexer(object):
+ """
+ A class that broadcasts printed messages to several sinks (including
+ various logging levels, different files, different file handles,
+ the house log, etc...)
+ """
class Destination(enum.IntEnum):
"""Bits in the destination_bitv bitvector. Used to indicate the
output destination."""
- STDOUT = 0x1
- STDERR = 0x2
- LOG_DEBUG = 0x4 # -\
- LOG_INFO = 0x8 # |
- LOG_WARNING = 0x10 # > Should provide logger to the c'tor.
- LOG_ERROR = 0x20 # |
- LOG_CRITICAL = 0x40 # _/
- FILENAME = 0x80 # Must provide a filename to the c'tor.
- FILEHANDLE = 0x100 # Must provide a handle to the c'tor.
- HLOG = 0x200
+ LOG_DEBUG = 0x01 # ⎫
+ LOG_INFO = 0x02 # ⎪
+ LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
+ LOG_ERROR = 0x08 # ⎪
+ LOG_CRITICAL = 0x10 # ⎭
+ FILENAMES = 0x20 # Must provide a filename to the c'tor.
+ FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
+ HLOG = 0x80
ALL_LOG_DESTINATIONS = (
LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
)
- ALL_OUTPUT_DESTINATIONS = 0x2FF
+ ALL_OUTPUT_DESTINATIONS = 0x8F
def __init__(self,
destination_bitv: int,
*,
logger=None,
- filename=None,
- handle=None):
+ filenames: Optional[Iterable[str]] = None,
+ handles: Optional[Iterable[io.TextIOWrapper]] = None):
if logger is None:
logger = logging.getLogger(None)
self.logger = logger
- if filename is not None:
- self.f = open(filename, "wb", buffering=0)
+ if filenames is not None:
+ self.f = [
+ open(filename, 'wb', buffering=0) for filename in filenames
+ ]
else:
- if self.destination_bitv & OutputMultiplexer.FILENAME:
+ if destination_bitv & OutputMultiplexer.FILENAMES:
raise ValueError(
- "Filename argument is required if bitv & FILENAME"
+ "Filenames argument is required if bitv & FILENAMES"
)
self.f = None
- if handle is not None:
- self.h = handle
+ if handles is not None:
+ self.h = [handle for handle in handles]
else:
- if self.destination_bitv & OutputMultiplexer.FILEHANDLE:
+ if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
raise ValueError(
- "Handle argument is required if bitv & FILEHANDLE"
+ "Handle argument is required if bitv & FILEHANDLES"
)
self.h = None
return self.destination_bitv
def set_destination_bitv(self, destination_bitv: int):
- if destination_bitv & self.Destination.FILENAME and self.f is None:
+ if destination_bitv & self.Destination.FILENAMES and self.f is None:
raise ValueError(
- "Filename argument is required if bitv & FILENAME"
+ "Filename argument is required if bitv & FILENAMES"
)
- if destination_bitv & self.Destination.FILEHANDLE and self.h is None:
+ if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
raise ValueError(
- "Handle argument is required if bitv & FILEHANDLE"
+ "Handle argument is required if bitv & FILEHANDLES"
)
self.destination_bitv = destination_bitv
sep = " "
if end is None:
end = "\n"
- if self.destination_bitv & self.Destination.STDOUT:
- print(buf, file=sys.stdout, sep=sep, end=end)
- if self.destination_bitv & self.Destination.STDERR:
- print(buf, file=sys.stderr, sep=sep, end=end)
if end == '\n':
buf += '\n'
if (
- self.destination_bitv & self.Destination.FILENAME and
+ self.destination_bitv & self.Destination.FILENAMES and
self.f is not None
):
- self.f.write(buf.encode('utf-8'))
- self.f.flush()
+ for _ in self.f:
+ _.write(buf.encode('utf-8'))
+ _.flush()
if (
- self.destination_bitv & self.Destination.FILEHANDLE and
+ self.destination_bitv & self.Destination.FILEHANDLES and
self.h is not None
):
- self.h.write(buf)
- self.h.flush()
+ for _ in self.h:
+ _.write(buf)
+ _.flush()
buf = strip_escape_sequences(buf)
if self.logger is not None:
def close(self):
if self.f is not None:
- self.f.close()
+ for _ in self.f:
+ _.close()
+
+class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
+ """
+ A context that uses an OutputMultiplexer. e.g.
-class OutputContext(OutputMultiplexer, contextlib.ContextDecorator):
+ with OutputMultiplexerContext(
+ OutputMultiplexer.LOG_INFO |
+ OutputMultiplexer.LOG_DEBUG |
+ OutputMultiplexer.FILENAMES |
+ OutputMultiplexer.FILEHANDLES,
+ filenames = [ '/tmp/foo.log', '/var/log/bar.log' ],
+ handles = [ f, g ]
+ ) as mplex:
+ mplex.print("This is a log message!")
+
+ """
def __init__(self,
destination_bitv: OutputMultiplexer.Destination,
*,
- logger=None,
- filename=None,
- handle=None):
+ logger = None,
+ filenames = None,
+ handles = None):
super().__init__(
destination_bitv,
logger=logger,
- filename=filename,
- handle=handle)
+ filenames=filenames,
+ handles=handles)
def __enter__(self):
return self
def hlog(message: str) -> None:
+ """Write a message to the house log."""
+
message = message.replace("'", "'\"'\"'")
os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()