from logging.handlers import RotatingFileHandler, SysLogHandler
import os
import pytz
+import random
import sys
-from typing import Iterable, Optional
+from typing import Callable, Iterable, Mapping, Optional
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
help='logging.info also prints to stdout.'
)
cfg.add_argument(
- '--logging_max_n_times_per_message',
- type=int,
- default=0,
- help='When set, ignore logged messages from the same site after N.'
+ '--logging_squelch_repeats_enabled',
+ action=argparse_utils.ActionNoYes,
+ default=True,
+ help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
+)
+cfg.add_argument(
+ '--logging_probabilistically_enabled',
+ action=argparse_utils.ActionNoYes,
+ default=True,
+ help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
)
-
# See also: OutputMultiplexer
cfg.add_argument(
'--logging_captures_prints',
action=argparse_utils.ActionNoYes,
default=False,
- help='When calling print also log.info too'
+ help='When calling print, also log.info automatically.'
)
built_in_print = print
-class OnlyInfoFilter(logging.Filter):
+def function_identifier(f: Callable) -> str:
"""
- A filter that only logs messages produced at the INFO logging level.
+ Given a callable function, return a string that identifies it.
+ Usually that string is just __module__:__name__ but there's a
+ corner case: when __module__ is __main__ (i.e. the callable is
+ defined in the same module as __main__). In this case,
+ f.__module__ returns "__main__" instead of the file that it is
+ defined in. Work around this using pathlib.Path (see below).
+
+ >>> function_identifier(function_identifier)
+ 'logging_utils:function_identifier'
+
"""
- def filter(self, record):
- return record.levelno == logging.INFO
+ if f.__module__ == '__main__':
+ from pathlib import Path
+ import __main__
+ module = __main__.__file__
+ module = Path(module).stem
+ return f'{module}:{f.__name__}'
+ else:
+ return f'{f.__module__}:{f.__name__}'
+
+# A map from logging_callsite_id -> count of logged messages.
+squelched_logging_counts: Mapping[str, int] = {}
-class OnlyNTimesFilter(logging.Filter):
+
+def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
+ """
+ A decorator that marks a function as interested in having the logging
+ messages that it produces be squelched (ignored) after it logs the
+ same message more than N times.
+
+ Note: this decorator affects *ALL* logging messages produced
+ within the decorated function. That said, messages must be
+ identical in order to be squelched. For example, if the same line
+ of code produces different messages (because of, e.g., a format
+ string), the messages are considered to be different.
+
+ """
+ def squelch_logging_wrapper(f: Callable):
+ identifier = function_identifier(f)
+ squelched_logging_counts[identifier] = squelch_after_n_repeats
+ return f
+ return squelch_logging_wrapper
+
+
+class SquelchRepeatedMessagesFilter(logging.Filter):
"""
A filter that only logs messages from a given site with the same
- message at the same logging level N times and ignores subsequent
- attempts to log.
+ (exact) message at the same logging level N times and ignores
+ subsequent attempts to log.
+
+ This filter only affects logging messages that repeat more than
+ a threshold number of times from functions that are tagged with
+ the @logging_utils.squelched_logging_ok decorator.
"""
- def __init__(self, maximum: int) -> None:
- self.maximum = maximum
+ def __init__(self) -> None:
self.counters = collections.Counter()
super().__init__()
def filter(self, record: logging.LogRecord) -> bool:
- source = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
- count = self.counters[source]
- self.counters[source] += 1
- return count < self.maximum
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in squelched_logging_counts:
+ return True
+ threshold = squelched_logging_counts[id1]
+ logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
+ count = self.counters[logsite]
+ self.counters[logsite] += 1
+ return count < threshold
+
+
+# A map from function_identifier -> probability of logging (0.0%..100.0%)
+probabilistic_logging_levels: Mapping[str, float] = {}
+
+
+def logging_is_probabilistic(probability_of_logging: float) -> Callable:
+ """
+ A decorator that indicates that all logging statements within the
+ scope of a particular (marked) function are not deterministic
+ (i.e. they do not always unconditionally log) but rather are
+ probabilistic (i.e. they log N% of the time randomly).
+
+ This affects *ALL* logging statements within the marked function.
+
+ """
+ def probabilistic_logging_wrapper(f: Callable):
+ identifier = function_identifier(f)
+ probabilistic_logging_levels[identifier] = probability_of_logging
+ return f
+ return probabilistic_logging_wrapper
+
+
+class ProbabilisticFilter(logging.Filter):
+ """
+ A filter that logs messages probabilistically (i.e. randomly at some
+ percent chance).
+
+ This filter only affects logging messages from functions that have
+ been tagged with the @logging_utils.probabilistic_logging decorator.
+
+ """
+ def filter(self, record: logging.LogRecord) -> bool:
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in probabilistic_logging_levels:
+ return True
+ threshold = probabilistic_logging_levels[id1]
+ return (random.random() * 100.0) <= threshold
+
+
+class OnlyInfoFilter(logging.Filter):
+ """
+ A filter that only logs messages produced at the INFO logging
+ level. This is used by the logging_info_is_print commandline
+ option to select a subset of the logging stream to send to a
+ stdout handler.
+
+ """
+ def filter(self, record):
+ return record.levelno == logging.INFO
class MillisecondAwareFormatter(logging.Formatter):
handler.addFilter(OnlyInfoFilter())
logger.addHandler(handler)
- maximum = config.config['logging_max_n_times_per_message']
- if maximum > 0:
+ if config.config['logging_squelch_repeats_enabled']:
+ for handler in handlers:
+ handler.addFilter(SquelchRepeatedMessagesFilter())
+
+ if config.config['logging_probabilistically_enabled']:
for handler in handlers:
- handler.addFilter(OnlyNTimesFilter(maximum))
+ handler.addFilter(ProbabilisticFilter())
logger.setLevel(numeric_level)
logger.propagate = False
def tprint(*args, **kwargs) -> None:
+ """Legacy function for printing a message augmented with thread id."""
+
if config.config['logging_debug_threads']:
from thread_utils import current_thread_id
print(f'{current_thread_id()}', end="")
def dprint(*args, **kwargs) -> None:
+ """Legacy function used to print to stderr."""
+
print(*args, file=sys.stderr, **kwargs)
class OutputMultiplexer(object):
+ """
+ A class that broadcasts printed messages to several sinks (including
+ various logging levels, different files, different file handles,
+ the house log, etc...)
+ """
class Destination(enum.IntEnum):
"""Bits in the destination_bitv bitvector. Used to indicate the
output destination."""
class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
+ """
+ A context that uses an OutputMultiplexer. e.g.
+
+ with OutputMultiplexerContext(
+ OutputMultiplexer.LOG_INFO |
+ OutputMultiplexer.LOG_DEBUG |
+ OutputMultiplexer.FILENAMES |
+ OutputMultiplexer.FILEHANDLES,
+ filenames = [ '/tmp/foo.log', '/var/log/bar.log' ],
+ handles = [ f, g ]
+ ) as mplex:
+ mplex.print("This is a log message!")
+
+ """
def __init__(self,
destination_bitv: OutputMultiplexer.Destination,
*,
def hlog(message: str) -> None:
+ """Write a message to the house log."""
+
message = message.replace("'", "'\"'\"'")
os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()