3 """Utilities related to logging."""
11 from logging.handlers import RotatingFileHandler, SysLogHandler
16 from typing import Callable, Iterable, Mapping, Optional
18 from overrides import overrides
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
25 cfg = config.add_commandline_args(
26 f'Logging ({__file__})',
27 'Args related to logging')
29 '--logging_config_file',
30 type=argparse_utils.valid_filename,
33 help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial',
39 choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
41 help='The global default level below which to squelch log messages; see also --lmodule',
46 default='%(levelname).1s:%(asctime)s: %(message)s',
47 help='The format for lines logged via the logger module.'
50 '--logging_date_format',
52 default='%Y/%m/%dT%H:%M:%S.%f%z',
54 help='The format of any dates in --logging_format.'
58 action=argparse_utils.ActionNoYes,
60 help='Should we log to the console (stderr)',
67 help='The filename of the logfile to write.'
70 '--logging_filename_maxsize',
74 help='The maximum size (in bytes) to write to the logging_filename.'
77 '--logging_filename_count',
81 help='The number of logging_filename copies to keep before deleting.'
85 action=argparse_utils.ActionNoYes,
87 help='Should we log to localhost\'s syslog.'
90 '--logging_debug_threads',
91 action=argparse_utils.ActionNoYes,
93 help='Should we prepend pid/tid data to all log messages?'
96 '--logging_debug_modules',
97 action=argparse_utils.ActionNoYes,
99 help='Should we prepend module/function data to all log messages?'
102 '--logging_info_is_print',
103 action=argparse_utils.ActionNoYes,
105 help='logging.info also prints to stdout.'
108 '--logging_squelch_repeats_enabled',
109 action=argparse_utils.ActionNoYes,
111 help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
114 '--logging_probabilistically_enabled',
115 action=argparse_utils.ActionNoYes,
117 help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
119 # See also: OutputMultiplexer
121 '--logging_captures_prints',
122 action=argparse_utils.ActionNoYes,
124 help='When calling print, also log.info automatically.'
129 metavar='<SCOPE>=<LEVEL>[,<SCOPE>=<LEVEL>...]',
131 'Allows per-scope logging levels which override the global level set with --logging-level.' +
132 'Pass a space separated list of <scope>=<level> where <scope> is one of: module, ' +
133 'module:function, or :function and <level> is a logging level (e.g. INFO, DEBUG...)'
138 built_in_print = print
141 def function_identifier(f: Callable) -> str:
143 Given a callable function, return a string that identifies it.
144 Usually that string is just __module__:__name__ but there's a
145 corner case: when __module__ is __main__ (i.e. the callable is
146 defined in the same module as __main__). In this case,
147 f.__module__ returns "__main__" instead of the file that it is
148 defined in. Work around this using pathlib.Path (see below).
150 >>> function_identifier(function_identifier)
151 'logging_utils:function_identifier'
154 if f.__module__ == '__main__':
155 from pathlib import Path
157 module = __main__.__file__
158 module = Path(module).stem
159 return f'{module}:{f.__name__}'
161 return f'{f.__module__}:{f.__name__}'
164 # A map from logging_callsite_id -> count of logged messages.
165 squelched_logging_counts: Mapping[str, int] = {}
168 def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
170 A decorator that marks a function as interested in having the logging
171 messages that it produces be squelched (ignored) after it logs the
172 same message more than N times.
174 Note: this decorator affects *ALL* logging messages produced
175 within the decorated function. That said, messages must be
176 identical in order to be squelched. For example, if the same line
177 of code produces different messages (because of, e.g., a format
178 string), the messages are considered to be different.
181 def squelch_logging_wrapper(f: Callable):
182 identifier = function_identifier(f)
183 squelched_logging_counts[identifier] = squelch_after_n_repeats
185 return squelch_logging_wrapper
188 class SquelchRepeatedMessagesFilter(logging.Filter):
190 A filter that only logs messages from a given site with the same
191 (exact) message at the same logging level N times and ignores
192 subsequent attempts to log.
194 This filter only affects logging messages that repeat more than
195 a threshold number of times from functions that are tagged with
196 the @logging_utils.squelched_logging_ok decorator.
199 def __init__(self) -> None:
200 self.counters = collections.Counter()
204 def filter(self, record: logging.LogRecord) -> bool:
205 id1 = f'{record.module}:{record.funcName}'
206 if id1 not in squelched_logging_counts:
208 threshold = squelched_logging_counts[id1]
209 logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
210 count = self.counters[logsite]
211 self.counters[logsite] += 1
212 return count < threshold
215 class DynamicPerScopeLoggingLevelFilter(logging.Filter):
216 """Only interested in seeing logging messages from an allow list of
217 module names or module:function names. Block others.
221 def level_name_to_level(name: str) -> int:
222 numeric_level = getattr(
227 if not isinstance(numeric_level, int):
228 raise ValueError('Invalid level: {name}')
233 default_logging_level: int,
234 per_scope_logging_levels: str,
237 self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
238 self.default_logging_level = default_logging_level
239 self.level_by_scope = {}
240 if per_scope_logging_levels is not None:
241 for chunk in per_scope_logging_levels.split(','):
244 f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.',
249 (scope, level) = chunk.split('=')
252 f'Malformed lmodule directive: "{chunk}". Ignored.',
256 scope = scope.strip()
257 level = level.strip().upper()
258 if level not in self.valid_levels:
260 f'Malformed lmodule directive: "{chunk}", bad level. Ignored.',
264 self.level_by_scope[scope] = (
265 DynamicPerScopeLoggingLevelFilter.level_name_to_level(
271 def filter(self, record: logging.LogRecord) -> bool:
272 # First try to find a logging level by scope (--lmodule)
273 if len(self.level_by_scope) > 0:
277 f'{record.module}:{record.funcName}',
278 f':{record.funcName}'
280 level = self.level_by_scope.get(scope, None)
281 if level is not None:
282 if min_level is None or level < min_level:
285 # If we found one, use it instead of the global default level.
286 if min_level is not None:
287 return record.levelno >= min_level
289 # Otherwise, use the global logging level (--logging_level)
290 return record.levelno >= self.default_logging_level
293 # A map from function_identifier -> probability of logging (0.0%..100.0%)
294 probabilistic_logging_levels: Mapping[str, float] = {}
297 def logging_is_probabilistic(probability_of_logging: float) -> Callable:
299 A decorator that indicates that all logging statements within the
300 scope of a particular (marked) function are not deterministic
301 (i.e. they do not always unconditionally log) but rather are
302 probabilistic (i.e. they log N% of the time randomly).
304 This affects *ALL* logging statements within the marked function.
307 def probabilistic_logging_wrapper(f: Callable):
308 identifier = function_identifier(f)
309 probabilistic_logging_levels[identifier] = probability_of_logging
311 return probabilistic_logging_wrapper
314 class ProbabilisticFilter(logging.Filter):
316 A filter that logs messages probabilistically (i.e. randomly at some
319 This filter only affects logging messages from functions that have
320 been tagged with the @logging_utils.probabilistic_logging decorator.
324 def filter(self, record: logging.LogRecord) -> bool:
325 id1 = f'{record.module}:{record.funcName}'
326 if id1 not in probabilistic_logging_levels:
328 threshold = probabilistic_logging_levels[id1]
329 return (random.random() * 100.0) <= threshold
332 class OnlyInfoFilter(logging.Filter):
334 A filter that only logs messages produced at the INFO logging
335 level. This is used by the logging_info_is_print commandline
336 option to select a subset of the logging stream to send to a
341 def filter(self, record: logging.LogRecord):
342 return record.levelno == logging.INFO
345 class MillisecondAwareFormatter(logging.Formatter):
347 A formatter for adding milliseconds to log messages.
350 converter = datetime.datetime.fromtimestamp
353 def formatTime(self, record, datefmt=None):
354 ct = MillisecondAwareFormatter.converter(
355 record.created, pytz.timezone("US/Pacific")
358 s = ct.strftime(datefmt)
360 t = ct.strftime("%Y-%m-%d %H:%M:%S")
361 s = "%s,%03d" % (t, record.msecs)
365 def initialize_logging(logger=None) -> logging.Logger:
366 assert config.has_been_parsed()
368 logger = logging.getLogger() # Root logger
370 if config.config['logging_config_file'] is not None:
371 logging.config.fileConfig('logging.conf')
376 # Global default logging level (--logging_level)
377 default_logging_level = getattr(
379 config.config['logging_level'].upper(),
382 if not isinstance(default_logging_level, int):
383 raise ValueError('Invalid level: %s' % config.config['logging_level'])
385 fmt = config.config['logging_format']
386 if config.config['logging_debug_threads']:
387 fmt = f'%(process)d.%(thread)d|{fmt}'
388 if config.config['logging_debug_modules']:
389 fmt = f'%(filename)s:%(funcName)s:%(lineno)s|{fmt}'
391 if config.config['logging_syslog']:
392 if sys.platform not in ('win32', 'cygwin'):
393 handler = SysLogHandler()
394 handler.setFormatter(
395 MillisecondAwareFormatter(
397 datefmt=config.config['logging_date_format'],
400 handlers.append(handler)
402 if config.config['logging_filename']:
403 handler = RotatingFileHandler(
404 config.config['logging_filename'],
405 maxBytes = config.config['logging_filename_maxsize'],
406 backupCount = config.config['logging_filename_count'],
408 handler.setFormatter(
409 MillisecondAwareFormatter(
411 datefmt=config.config['logging_date_format'],
414 handlers.append(handler)
416 if config.config['logging_console']:
417 handler = logging.StreamHandler(sys.stderr)
418 handler.setFormatter(
419 MillisecondAwareFormatter(
421 datefmt=config.config['logging_date_format'],
424 handlers.append(handler)
426 if len(handlers) == 0:
427 handlers.append(logging.NullHandler())
429 for handler in handlers:
430 logger.addHandler(handler)
432 if config.config['logging_info_is_print']:
433 handler = logging.StreamHandler(sys.stdout)
434 handler.addFilter(OnlyInfoFilter())
435 logger.addHandler(handler)
437 if config.config['logging_squelch_repeats_enabled']:
438 for handler in handlers:
439 handler.addFilter(SquelchRepeatedMessagesFilter())
441 if config.config['logging_probabilistically_enabled']:
442 for handler in handlers:
443 handler.addFilter(ProbabilisticFilter())
445 for handler in handlers:
447 DynamicPerScopeLoggingLevelFilter(
448 default_logging_level,
449 config.config['lmodule'],
453 logger.propagate = False
455 if config.config['logging_captures_prints']:
457 global built_in_print
459 def print_and_also_log(*arg, **kwarg):
460 f = kwarg.get('file', None)
465 built_in_print(*arg, **kwarg)
466 builtins.print = print_and_also_log
471 def get_logger(name: str = ""):
472 logger = logging.getLogger(name)
473 return initialize_logging(logger)
476 def tprint(*args, **kwargs) -> None:
477 """Legacy function for printing a message augmented with thread id."""
479 if config.config['logging_debug_threads']:
480 from thread_utils import current_thread_id
481 print(f'{current_thread_id()}', end="")
482 print(*args, **kwargs)
487 def dprint(*args, **kwargs) -> None:
488 """Legacy function used to print to stderr."""
490 print(*args, file=sys.stderr, **kwargs)
493 class OutputMultiplexer(object):
495 A class that broadcasts printed messages to several sinks (including
496 various logging levels, different files, different file handles,
497 the house log, etc...)
500 class Destination(enum.IntEnum):
501 """Bits in the destination_bitv bitvector. Used to indicate the
502 output destination."""
505 LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
507 LOG_CRITICAL = 0x10 # ⎭
508 FILENAMES = 0x20 # Must provide a filename to the c'tor.
509 FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
511 ALL_LOG_DESTINATIONS = (
512 LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
514 ALL_OUTPUT_DESTINATIONS = 0x8F
517 destination_bitv: int,
520 filenames: Optional[Iterable[str]] = None,
521 handles: Optional[Iterable[io.TextIOWrapper]] = None):
523 logger = logging.getLogger(None)
526 if filenames is not None:
528 open(filename, 'wb', buffering=0) for filename in filenames
531 if destination_bitv & OutputMultiplexer.FILENAMES:
533 "Filenames argument is required if bitv & FILENAMES"
537 if handles is not None:
538 self.h = [handle for handle in handles]
540 if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
542 "Handle argument is required if bitv & FILEHANDLES"
546 self.set_destination_bitv(destination_bitv)
548 def get_destination_bitv(self):
549 return self.destination_bitv
551 def set_destination_bitv(self, destination_bitv: int):
552 if destination_bitv & self.Destination.FILENAMES and self.f is None:
554 "Filename argument is required if bitv & FILENAMES"
556 if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
558 "Handle argument is required if bitv & FILEHANDLES"
560 self.destination_bitv = destination_bitv
562 def print(self, *args, **kwargs):
563 from string_utils import sprintf, strip_escape_sequences
564 end = kwargs.pop("end", None)
566 if not isinstance(end, str):
567 raise TypeError("end must be None or a string")
568 sep = kwargs.pop("sep", None)
570 if not isinstance(sep, str):
571 raise TypeError("sep must be None or a string")
573 raise TypeError("invalid keyword arguments to print()")
574 buf = sprintf(*args, end="", sep=sep)
582 self.destination_bitv & self.Destination.FILENAMES and
586 _.write(buf.encode('utf-8'))
590 self.destination_bitv & self.Destination.FILEHANDLES and
597 buf = strip_escape_sequences(buf)
598 if self.logger is not None:
599 if self.destination_bitv & self.Destination.LOG_DEBUG:
600 self.logger.debug(buf)
601 if self.destination_bitv & self.Destination.LOG_INFO:
602 self.logger.info(buf)
603 if self.destination_bitv & self.Destination.LOG_WARNING:
604 self.logger.warning(buf)
605 if self.destination_bitv & self.Destination.LOG_ERROR:
606 self.logger.error(buf)
607 if self.destination_bitv & self.Destination.LOG_CRITICAL:
608 self.logger.critical(buf)
609 if self.destination_bitv & self.Destination.HLOG:
613 if self.f is not None:
618 class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
620 A context that uses an OutputMultiplexer. e.g.
622 with OutputMultiplexerContext(
623 OutputMultiplexer.LOG_INFO |
624 OutputMultiplexer.LOG_DEBUG |
625 OutputMultiplexer.FILENAMES |
626 OutputMultiplexer.FILEHANDLES,
627 filenames = [ '/tmp/foo.log', '/var/log/bar.log' ],
630 mplex.print("This is a log message!")
634 destination_bitv: OutputMultiplexer.Destination,
648 def __exit__(self, etype, value, traceback) -> bool:
650 if etype is not None:
655 def hlog(message: str) -> None:
656 """Write a message to the house log."""
658 message = message.replace("'", "'\"'\"'")
659 os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
662 if __name__ == '__main__':