3 """Utilities related to logging."""
11 from logging.handlers import RotatingFileHandler, SysLogHandler
16 from typing import Callable, Iterable, Mapping, Optional
18 from overrides import overrides
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
25 cfg = config.add_commandline_args(
26 f'Logging ({__file__})',
27 'Args related to logging')
29 '--logging_config_file',
30 type=argparse_utils.valid_filename,
33 help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial',
39 choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
41 help='The global default level below which to squelch log messages; see also --lmodule',
46 default='%(levelname).1s:%(asctime)s: %(message)s',
47 help='The format for lines logged via the logger module.'
50 '--logging_date_format',
52 default='%Y/%m/%dT%H:%M:%S.%f%z',
54 help='The format of any dates in --logging_format.'
58 action=argparse_utils.ActionNoYes,
60 help='Should we log to the console (stderr)',
67 help='The filename of the logfile to write.'
70 '--logging_filename_maxsize',
74 help='The maximum size (in bytes) to write to the logging_filename.'
77 '--logging_filename_count',
81 help='The number of logging_filename copies to keep before deleting.'
85 action=argparse_utils.ActionNoYes,
87 help='Should we log to localhost\'s syslog.'
90 '--logging_debug_threads',
91 action=argparse_utils.ActionNoYes,
93 help='Should we prepend pid/tid data to all log messages?'
96 '--logging_info_is_print',
97 action=argparse_utils.ActionNoYes,
99 help='logging.info also prints to stdout.'
102 '--logging_squelch_repeats_enabled',
103 action=argparse_utils.ActionNoYes,
105 help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
108 '--logging_probabilistically_enabled',
109 action=argparse_utils.ActionNoYes,
111 help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
113 # See also: OutputMultiplexer
115 '--logging_captures_prints',
116 action=argparse_utils.ActionNoYes,
118 help='When calling print, also log.info automatically.'
123 metavar='<SCOPE>=<LEVEL>[,<SCOPE>=<LEVEL>...]',
125 'Allows per-scope logging levels which override the global level set with --logging-level.' +
126 'Pass a space separated list of <scope>=<level> where <scope> is one of: module, ' +
127 'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
132 built_in_print = print
135 def function_identifier(f: Callable) -> str:
137 Given a callable function, return a string that identifies it.
138 Usually that string is just __module__:__name__ but there's a
139 corner case: when __module__ is __main__ (i.e. the callable is
140 defined in the same module as __main__). In this case,
141 f.__module__ returns "__main__" instead of the file that it is
142 defined in. Work around this using pathlib.Path (see below).
144 >>> function_identifier(function_identifier)
145 'logging_utils:function_identifier'
148 if f.__module__ == '__main__':
149 from pathlib import Path
151 module = __main__.__file__
152 module = Path(module).stem
153 return f'{module}:{f.__name__}'
155 return f'{f.__module__}:{f.__name__}'
158 # A map from logging_callsite_id -> count of logged messages.
159 squelched_logging_counts: Mapping[str, int] = {}
162 def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
164 A decorator that marks a function as interested in having the logging
165 messages that it produces be squelched (ignored) after it logs the
166 same message more than N times.
168 Note: this decorator affects *ALL* logging messages produced
169 within the decorated function. That said, messages must be
170 identical in order to be squelched. For example, if the same line
171 of code produces different messages (because of, e.g., a format
172 string), the messages are considered to be different.
175 def squelch_logging_wrapper(f: Callable):
176 identifier = function_identifier(f)
177 squelched_logging_counts[identifier] = squelch_after_n_repeats
179 return squelch_logging_wrapper
182 class SquelchRepeatedMessagesFilter(logging.Filter):
184 A filter that only logs messages from a given site with the same
185 (exact) message at the same logging level N times and ignores
186 subsequent attempts to log.
188 This filter only affects logging messages that repeat more than
189 a threshold number of times from functions that are tagged with
190 the @logging_utils.squelched_logging_ok decorator.
193 def __init__(self) -> None:
194 self.counters = collections.Counter()
198 def filter(self, record: logging.LogRecord) -> bool:
199 id1 = f'{record.module}:{record.funcName}'
200 if id1 not in squelched_logging_counts:
202 threshold = squelched_logging_counts[id1]
203 logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
204 count = self.counters[logsite]
205 self.counters[logsite] += 1
206 return count < threshold
209 class DynamicPerScopeLoggingLevelFilter(logging.Filter):
210 """Only interested in seeing logging messages from an allow list of
211 module names or module:function names. Block others.
215 def level_name_to_level(name: str) -> int:
216 numeric_level = getattr(
221 if not isinstance(numeric_level, int):
222 raise ValueError('Invalid level: {name}')
227 default_logging_level: int,
228 per_scope_logging_levels: str,
231 self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
232 self.default_logging_level = default_logging_level
233 self.level_by_scope = {}
234 if per_scope_logging_levels is not None:
235 for chunk in per_scope_logging_levels.split(','):
238 f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.',
243 (scope, level) = chunk.split('=')
246 f'Malformed lmodule directive: "{chunk}". Ignored.',
250 scope = scope.strip()
251 level = level.strip().upper()
252 if level not in self.valid_levels:
254 f'Malformed lmodule directive: "{chunk}", bad level. Ignored.',
258 self.level_by_scope[scope] = (
259 DynamicPerScopeLoggingLevelFilter.level_name_to_level(
265 def filter(self, record: logging.LogRecord) -> bool:
266 # First try to find a logging level by scope (--lmodule)
267 if len(self.level_by_scope) > 0:
271 f'{record.module}:{record.funcName}',
272 f':{record.funcName}'
274 level = self.level_by_scope.get(scope, None)
275 if level is not None:
276 if min_level is None or level < min_level:
279 # If we found one, use it instead of the global default level.
280 if min_level is not None:
281 return record.levelno >= min_level
283 # Otherwise, use the global logging level (--logging_level)
284 return record.levelno >= self.default_logging_level
287 # A map from function_identifier -> probability of logging (0.0%..100.0%)
288 probabilistic_logging_levels: Mapping[str, float] = {}
291 def logging_is_probabilistic(probability_of_logging: float) -> Callable:
293 A decorator that indicates that all logging statements within the
294 scope of a particular (marked) function are not deterministic
295 (i.e. they do not always unconditionally log) but rather are
296 probabilistic (i.e. they log N% of the time randomly).
298 This affects *ALL* logging statements within the marked function.
301 def probabilistic_logging_wrapper(f: Callable):
302 identifier = function_identifier(f)
303 probabilistic_logging_levels[identifier] = probability_of_logging
305 return probabilistic_logging_wrapper
308 class ProbabilisticFilter(logging.Filter):
310 A filter that logs messages probabilistically (i.e. randomly at some
313 This filter only affects logging messages from functions that have
314 been tagged with the @logging_utils.probabilistic_logging decorator.
318 def filter(self, record: logging.LogRecord) -> bool:
319 id1 = f'{record.module}:{record.funcName}'
320 if id1 not in probabilistic_logging_levels:
322 threshold = probabilistic_logging_levels[id1]
323 return (random.random() * 100.0) <= threshold
326 class OnlyInfoFilter(logging.Filter):
328 A filter that only logs messages produced at the INFO logging
329 level. This is used by the logging_info_is_print commandline
330 option to select a subset of the logging stream to send to a
335 def filter(self, record: logging.LogRecord):
336 return record.levelno == logging.INFO
339 class MillisecondAwareFormatter(logging.Formatter):
341 A formatter for adding milliseconds to log messages.
344 converter = datetime.datetime.fromtimestamp
347 def formatTime(self, record, datefmt=None):
348 ct = MillisecondAwareFormatter.converter(
349 record.created, pytz.timezone("US/Pacific")
352 s = ct.strftime(datefmt)
354 t = ct.strftime("%Y-%m-%d %H:%M:%S")
355 s = "%s,%03d" % (t, record.msecs)
359 def initialize_logging(logger=None) -> logging.Logger:
360 assert config.has_been_parsed()
362 logger = logging.getLogger() # Root logger
364 if config.config['logging_config_file'] is not None:
365 logging.config.fileConfig('logging.conf')
370 # Global default logging level (--logging_level)
371 default_logging_level = getattr(
373 config.config['logging_level'].upper(),
376 if not isinstance(default_logging_level, int):
377 raise ValueError('Invalid level: %s' % config.config['logging_level'])
379 fmt = config.config['logging_format']
380 if config.config['logging_debug_threads']:
381 fmt = f'%(process)d.%(thread)d|{fmt}'
383 if config.config['logging_syslog']:
384 if sys.platform not in ('win32', 'cygwin'):
385 handler = SysLogHandler()
386 handler.setFormatter(
387 MillisecondAwareFormatter(
389 datefmt=config.config['logging_date_format'],
392 handlers.append(handler)
394 if config.config['logging_filename']:
395 handler = RotatingFileHandler(
396 config.config['logging_filename'],
397 maxBytes = config.config['logging_filename_maxsize'],
398 backupCount = config.config['logging_filename_count'],
400 handler.setFormatter(
401 MillisecondAwareFormatter(
403 datefmt=config.config['logging_date_format'],
406 handlers.append(handler)
408 if config.config['logging_console']:
409 handler = logging.StreamHandler(sys.stderr)
410 handler.setFormatter(
411 MillisecondAwareFormatter(
413 datefmt=config.config['logging_date_format'],
416 handlers.append(handler)
418 if len(handlers) == 0:
419 handlers.append(logging.NullHandler())
421 for handler in handlers:
422 logger.addHandler(handler)
424 if config.config['logging_info_is_print']:
425 handler = logging.StreamHandler(sys.stdout)
426 handler.addFilter(OnlyInfoFilter())
427 logger.addHandler(handler)
429 if config.config['logging_squelch_repeats_enabled']:
430 for handler in handlers:
431 handler.addFilter(SquelchRepeatedMessagesFilter())
433 if config.config['logging_probabilistically_enabled']:
434 for handler in handlers:
435 handler.addFilter(ProbabilisticFilter())
437 for handler in handlers:
439 DynamicPerScopeLoggingLevelFilter(
440 default_logging_level,
441 config.config['lmodule'],
445 logger.propagate = False
447 if config.config['logging_captures_prints']:
449 global built_in_print
451 def print_and_also_log(*arg, **kwarg):
452 f = kwarg.get('file', None)
457 built_in_print(*arg, **kwarg)
458 builtins.print = print_and_also_log
463 def get_logger(name: str = ""):
464 logger = logging.getLogger(name)
465 return initialize_logging(logger)
468 def tprint(*args, **kwargs) -> None:
469 """Legacy function for printing a message augmented with thread id."""
471 if config.config['logging_debug_threads']:
472 from thread_utils import current_thread_id
473 print(f'{current_thread_id()}', end="")
474 print(*args, **kwargs)
479 def dprint(*args, **kwargs) -> None:
480 """Legacy function used to print to stderr."""
482 print(*args, file=sys.stderr, **kwargs)
485 class OutputMultiplexer(object):
487 A class that broadcasts printed messages to several sinks (including
488 various logging levels, different files, different file handles,
489 the house log, etc...)
492 class Destination(enum.IntEnum):
493 """Bits in the destination_bitv bitvector. Used to indicate the
494 output destination."""
497 LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
499 LOG_CRITICAL = 0x10 # ⎭
500 FILENAMES = 0x20 # Must provide a filename to the c'tor.
501 FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
503 ALL_LOG_DESTINATIONS = (
504 LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
506 ALL_OUTPUT_DESTINATIONS = 0x8F
509 destination_bitv: int,
512 filenames: Optional[Iterable[str]] = None,
513 handles: Optional[Iterable[io.TextIOWrapper]] = None):
515 logger = logging.getLogger(None)
518 if filenames is not None:
520 open(filename, 'wb', buffering=0) for filename in filenames
523 if destination_bitv & OutputMultiplexer.FILENAMES:
525 "Filenames argument is required if bitv & FILENAMES"
529 if handles is not None:
530 self.h = [handle for handle in handles]
532 if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
534 "Handle argument is required if bitv & FILEHANDLES"
538 self.set_destination_bitv(destination_bitv)
540 def get_destination_bitv(self):
541 return self.destination_bitv
543 def set_destination_bitv(self, destination_bitv: int):
544 if destination_bitv & self.Destination.FILENAMES and self.f is None:
546 "Filename argument is required if bitv & FILENAMES"
548 if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
550 "Handle argument is required if bitv & FILEHANDLES"
552 self.destination_bitv = destination_bitv
554 def print(self, *args, **kwargs):
555 from string_utils import sprintf, strip_escape_sequences
556 end = kwargs.pop("end", None)
558 if not isinstance(end, str):
559 raise TypeError("end must be None or a string")
560 sep = kwargs.pop("sep", None)
562 if not isinstance(sep, str):
563 raise TypeError("sep must be None or a string")
565 raise TypeError("invalid keyword arguments to print()")
566 buf = sprintf(*args, end="", sep=sep)
574 self.destination_bitv & self.Destination.FILENAMES and
578 _.write(buf.encode('utf-8'))
582 self.destination_bitv & self.Destination.FILEHANDLES and
589 buf = strip_escape_sequences(buf)
590 if self.logger is not None:
591 if self.destination_bitv & self.Destination.LOG_DEBUG:
592 self.logger.debug(buf)
593 if self.destination_bitv & self.Destination.LOG_INFO:
594 self.logger.info(buf)
595 if self.destination_bitv & self.Destination.LOG_WARNING:
596 self.logger.warning(buf)
597 if self.destination_bitv & self.Destination.LOG_ERROR:
598 self.logger.error(buf)
599 if self.destination_bitv & self.Destination.LOG_CRITICAL:
600 self.logger.critical(buf)
601 if self.destination_bitv & self.Destination.HLOG:
605 if self.f is not None:
610 class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
612 A context that uses an OutputMultiplexer. e.g.
614 with OutputMultiplexerContext(
615 OutputMultiplexer.LOG_INFO |
616 OutputMultiplexer.LOG_DEBUG |
617 OutputMultiplexer.FILENAMES |
618 OutputMultiplexer.FILEHANDLES,
619 filenames = [ '/tmp/foo.log', '/var/log/bar.log' ],
622 mplex.print("This is a log message!")
626 destination_bitv: OutputMultiplexer.Destination,
640 def __exit__(self, etype, value, traceback) -> bool:
642 if etype is not None:
647 def hlog(message: str) -> None:
648 """Write a message to the house log."""
650 message = message.replace("'", "'\"'\"'")
651 os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
654 if __name__ == '__main__':