+built_in_print = print
+logging_initialized = False
+
+
+# A map from logging_callsite_id -> count of logged messages.
+squelched_logging_counts: Mapping[str, int] = {}
+
+
+def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
+ """
+ A decorator that marks a function as interested in having the logging
+ messages that it produces be squelched (ignored) after it logs the
+ same message more than N times.
+
+ Note: this decorator affects *ALL* logging messages produced
+ within the decorated function. That said, messages must be
+ identical in order to be squelched. For example, if the same line
+ of code produces different messages (because of, e.g., a format
+ string), the messages are considered to be different.
+
+ """
+
+ def squelch_logging_wrapper(f: Callable):
+ import function_utils
+
+ identifier = function_utils.function_identifier(f)
+ squelched_logging_counts[identifier] = squelch_after_n_repeats
+ return f
+
+ return squelch_logging_wrapper
+
+
+class SquelchRepeatedMessagesFilter(logging.Filter):
+ """
+ A filter that only logs messages from a given site with the same
+ (exact) message at the same logging level N times and ignores
+ subsequent attempts to log.
+
+ This filter only affects logging messages that repeat more than
+ a threshold number of times from functions that are tagged with
+ the @logging_utils.squelched_logging_ok decorator; others are
+ ignored.
+
+ This functionality is enabled by default but can be disabled via
+ the --no_logging_squelch_repeats commandline flag.
+
+ """
+
+ def __init__(self) -> None:
+ self.counters = collections.Counter()
+ super().__init__()
+
+ @overrides
+ def filter(self, record: logging.LogRecord) -> bool:
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in squelched_logging_counts:
+ return True
+ threshold = squelched_logging_counts[id1]
+ logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
+ count = self.counters[logsite]
+ self.counters[logsite] += 1
+ return count < threshold
+
+
+class DynamicPerScopeLoggingLevelFilter(logging.Filter):
+ """This filter only allows logging messages from an allow list of
+ module names or module:function names. Blocks others.
+
+ """
+
+ @staticmethod
+ def level_name_to_level(name: str) -> int:
+ numeric_level = getattr(logging, name, None)
+ if not isinstance(numeric_level, int):
+ raise ValueError(f'Invalid level: {name}')
+ return numeric_level
+
+ def __init__(
+ self,
+ default_logging_level: int,
+ per_scope_logging_levels: str,
+ ) -> None:
+ super().__init__()
+ self.valid_levels = set(
+ ['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
+ )
+ self.default_logging_level = default_logging_level
+ self.level_by_scope = {}
+ if per_scope_logging_levels is not None:
+ for chunk in per_scope_logging_levels.split(','):
+ if '=' not in chunk:
+ print(
+ f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.',
+ file=sys.stderr,
+ )
+ continue
+ try:
+ (scope, level) = chunk.split('=')
+ except ValueError:
+ print(
+ f'Malformed lmodule directive: "{chunk}". Ignored.',
+ file=sys.stderr,
+ )
+ continue
+ scope = scope.strip()
+ level = level.strip().upper()
+ if level not in self.valid_levels:
+ print(
+ f'Malformed lmodule directive: "{chunk}", bad level. Ignored.',
+ file=sys.stderr,
+ )
+ continue
+ self.level_by_scope[
+ scope
+ ] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(level)
+
+ @overrides
+ def filter(self, record: logging.LogRecord) -> bool:
+ # First try to find a logging level by scope (--lmodule)
+ if len(self.level_by_scope) > 0:
+ min_level = None
+ for scope in (
+ record.module,
+ f'{record.module}:{record.funcName}',
+ f':{record.funcName}',
+ ):
+ level = self.level_by_scope.get(scope, None)
+ if level is not None:
+ if min_level is None or level < min_level:
+ min_level = level
+
+ # If we found one, use it instead of the global default level.
+ if min_level is not None:
+ return record.levelno >= min_level
+
+ # Otherwise, use the global logging level (--logging_level)
+ return record.levelno >= self.default_logging_level
+
+
+# A map from function_identifier -> probability of logging (0.0%..100.0%)
+probabilistic_logging_levels: Mapping[str, float] = {}
+
+
+def logging_is_probabilistic(probability_of_logging: float) -> Callable:
+ """
+ A decorator that indicates that all logging statements within the
+ scope of a particular (marked) function are not deterministic
+ (i.e. they do not always unconditionally log) but rather are
+ probabilistic (i.e. they log N% of the time randomly).
+
+ Note that this functionality can be disabled (forcing all logged
+ messages to produce output) via the --no_logging_probabilistically
+ cmdline argument.
+
+ This affects *ALL* logging statements within the marked function.
+
+ """
+
+ def probabilistic_logging_wrapper(f: Callable):
+ import function_utils
+
+ identifier = function_utils.function_identifier(f)
+ probabilistic_logging_levels[identifier] = probability_of_logging
+ return f
+
+ return probabilistic_logging_wrapper
+
+
+class ProbabilisticFilter(logging.Filter):
+ """
+ A filter that logs messages probabilistically (i.e. randomly at some
+ percent chance).
+
+ This filter only affects logging messages from functions that have
+ been tagged with the @logging_utils.probabilistic_logging decorator.
+
+ """
+
+ @overrides
+ def filter(self, record: logging.LogRecord) -> bool:
+ id1 = f'{record.module}:{record.funcName}'
+ if id1 not in probabilistic_logging_levels:
+ return True
+ threshold = probabilistic_logging_levels[id1]
+ return (random.random() * 100.0) <= threshold
+