3 """Utilities related to logging."""
11 from logging.handlers import RotatingFileHandler, SysLogHandler
16 from typing import Callable, Iterable, Mapping, Optional
18 # This module is commonly used by others in here and should avoid
19 # taking any unnecessary dependencies back on them.
23 cfg = config.add_commandline_args(
24 f'Logging ({__file__})',
25 'Args related to logging')
27 '--logging_config_file',
28 type=argparse_utils.valid_filename,
31 help='Config file containing the logging setup, see: https://docs.python.org/3/howto/logging.html#logging-advanced-tutorial',
37 choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
39 help='The level below which to squelch log messages.',
44 default='%(levelname).1s:%(asctime)s: %(message)s',
45 help='The format for lines logged via the logger module.'
48 '--logging_date_format',
50 default='%Y/%m/%dT%H:%M:%S.%f%z',
52 help='The format of any dates in --logging_format.'
56 action=argparse_utils.ActionNoYes,
58 help='Should we log to the console (stderr)',
65 help='The filename of the logfile to write.'
68 '--logging_filename_maxsize',
72 help='The maximum size (in bytes) to write to the logging_filename.'
75 '--logging_filename_count',
79 help='The number of logging_filename copies to keep before deleting.'
83 action=argparse_utils.ActionNoYes,
85 help='Should we log to localhost\'s syslog.'
88 '--logging_debug_threads',
89 action=argparse_utils.ActionNoYes,
91 help='Should we prepend pid/tid data to all log messages?'
94 '--logging_info_is_print',
95 action=argparse_utils.ActionNoYes,
97 help='logging.info also prints to stdout.'
100 '--logging_squelch_repeats_enabled',
101 action=argparse_utils.ActionNoYes,
103 help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?'
106 '--logging_probabilistically_enabled',
107 action=argparse_utils.ActionNoYes,
109 help='Do we allow probabilistic logging (for code that wants it) or should we always log?'
111 # See also: OutputMultiplexer
113 '--logging_captures_prints',
114 action=argparse_utils.ActionNoYes,
116 help='When calling print, also log.info automatically.'
119 built_in_print = print
122 def function_identifier(f: Callable) -> str:
124 Given a callable function, return a string that identifies it.
125 Usually that string is just __module__:__name__ but there's a
126 corner case: when __module__ is __main__ (i.e. the callable is
127 defined in the same module as __main__). In this case,
128 f.__module__ returns "__main__" instead of the file that it is
129 defined in. Work around this using pathlib.Path (see below).
131 >>> function_identifier(function_identifier)
132 'logging_utils:function_identifier'
135 if f.__module__ == '__main__':
136 from pathlib import Path
138 module = __main__.__file__
139 module = Path(module).stem
140 return f'{module}:{f.__name__}'
142 return f'{f.__module__}:{f.__name__}'
145 # A map from logging_callsite_id -> count of logged messages.
146 squelched_logging_counts: Mapping[str, int] = {}
149 def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable:
151 A decorator that marks a function as interested in having the logging
152 messages that it produces be squelched (ignored) after it logs the
153 same message more than N times.
155 Note: this decorator affects *ALL* logging messages produced
156 within the decorated function. That said, messages must be
157 identical in order to be squelched. For example, if the same line
158 of code produces different messages (because of, e.g., a format
159 string), the messages are considered to be different.
162 def squelch_logging_wrapper(f: Callable):
163 identifier = function_identifier(f)
164 squelched_logging_counts[identifier] = squelch_after_n_repeats
166 return squelch_logging_wrapper
169 class SquelchRepeatedMessagesFilter(logging.Filter):
171 A filter that only logs messages from a given site with the same
172 (exact) message at the same logging level N times and ignores
173 subsequent attempts to log.
175 This filter only affects logging messages that repeat more than
176 a threshold number of times from functions that are tagged with
177 the @logging_utils.squelched_logging_ok decorator.
180 def __init__(self) -> None:
181 self.counters = collections.Counter()
184 def filter(self, record: logging.LogRecord) -> bool:
185 id1 = f'{record.module}:{record.funcName}'
186 if id1 not in squelched_logging_counts:
188 threshold = squelched_logging_counts[id1]
189 logsite = f'{record.pathname}+{record.lineno}+{record.levelno}+{record.msg}'
190 count = self.counters[logsite]
191 self.counters[logsite] += 1
192 return count < threshold
195 # A map from function_identifier -> probability of logging (0.0%..100.0%)
196 probabilistic_logging_levels: Mapping[str, float] = {}
199 def logging_is_probabilistic(probability_of_logging: float) -> Callable:
201 A decorator that indicates that all logging statements within the
202 scope of a particular (marked) function are not deterministic
203 (i.e. they do not always unconditionally log) but rather are
204 probabilistic (i.e. they log N% of the time randomly).
206 This affects *ALL* logging statements within the marked function.
209 def probabilistic_logging_wrapper(f: Callable):
210 identifier = function_identifier(f)
211 probabilistic_logging_levels[identifier] = probability_of_logging
213 return probabilistic_logging_wrapper
216 class ProbabilisticFilter(logging.Filter):
218 A filter that logs messages probabilistically (i.e. randomly at some
221 This filter only affects logging messages from functions that have
222 been tagged with the @logging_utils.probabilistic_logging decorator.
225 def filter(self, record: logging.LogRecord) -> bool:
226 id1 = f'{record.module}:{record.funcName}'
227 if id1 not in probabilistic_logging_levels:
229 threshold = probabilistic_logging_levels[id1]
230 return (random.random() * 100.0) <= threshold
233 class OnlyInfoFilter(logging.Filter):
235 A filter that only logs messages produced at the INFO logging
236 level. This is used by the logging_info_is_print commandline
237 option to select a subset of the logging stream to send to a
241 def filter(self, record):
242 return record.levelno == logging.INFO
245 class MillisecondAwareFormatter(logging.Formatter):
247 A formatter for adding milliseconds to log messages.
250 converter = datetime.datetime.fromtimestamp
252 def formatTime(self, record, datefmt=None):
253 ct = MillisecondAwareFormatter.converter(
254 record.created, pytz.timezone("US/Pacific")
257 s = ct.strftime(datefmt)
259 t = ct.strftime("%Y-%m-%d %H:%M:%S")
260 s = "%s,%03d" % (t, record.msecs)
264 def initialize_logging(logger=None) -> logging.Logger:
265 assert config.has_been_parsed()
267 logger = logging.getLogger() # Root logger
269 if config.config['logging_config_file'] is not None:
270 logging.config.fileConfig('logging.conf')
274 numeric_level = getattr(
276 config.config['logging_level'].upper(),
279 if not isinstance(numeric_level, int):
280 raise ValueError('Invalid level: %s' % config.config['logging_level'])
282 fmt = config.config['logging_format']
283 if config.config['logging_debug_threads']:
284 fmt = f'%(process)d.%(thread)d|{fmt}'
286 if config.config['logging_syslog']:
287 if sys.platform not in ('win32', 'cygwin'):
288 handler = SysLogHandler()
289 # for k, v in encoded_priorities.items():
290 # handler.encodePriority(k, v)
291 handler.setFormatter(
292 MillisecondAwareFormatter(
294 datefmt=config.config['logging_date_format'],
297 handler.setLevel(numeric_level)
298 handlers.append(handler)
300 if config.config['logging_filename']:
301 handler = RotatingFileHandler(
302 config.config['logging_filename'],
303 maxBytes = config.config['logging_filename_maxsize'],
304 backupCount = config.config['logging_filename_count'],
306 handler.setLevel(numeric_level)
307 handler.setFormatter(
308 MillisecondAwareFormatter(
310 datefmt=config.config['logging_date_format'],
313 handlers.append(handler)
315 if config.config['logging_console']:
316 handler = logging.StreamHandler(sys.stderr)
317 handler.setLevel(numeric_level)
318 handler.setFormatter(
319 MillisecondAwareFormatter(
321 datefmt=config.config['logging_date_format'],
324 handlers.append(handler)
326 if len(handlers) == 0:
327 handlers.append(logging.NullHandler())
329 for handler in handlers:
330 logger.addHandler(handler)
332 if config.config['logging_info_is_print']:
333 handler = logging.StreamHandler(sys.stdout)
334 handler.addFilter(OnlyInfoFilter())
335 logger.addHandler(handler)
337 if config.config['logging_squelch_repeats_enabled']:
338 for handler in handlers:
339 handler.addFilter(SquelchRepeatedMessagesFilter())
341 if config.config['logging_probabilistically_enabled']:
342 for handler in handlers:
343 handler.addFilter(ProbabilisticFilter())
345 logger.setLevel(numeric_level)
346 logger.propagate = False
348 if config.config['logging_captures_prints']:
350 global built_in_print
352 def print_and_also_log(*arg, **kwarg):
353 f = kwarg.get('file', None)
358 built_in_print(*arg, **kwarg)
359 builtins.print = print_and_also_log
364 def get_logger(name: str = ""):
365 logger = logging.getLogger(name)
366 return initialize_logging(logger)
369 def tprint(*args, **kwargs) -> None:
370 """Legacy function for printing a message augmented with thread id."""
372 if config.config['logging_debug_threads']:
373 from thread_utils import current_thread_id
374 print(f'{current_thread_id()}', end="")
375 print(*args, **kwargs)
380 def dprint(*args, **kwargs) -> None:
381 """Legacy function used to print to stderr."""
383 print(*args, file=sys.stderr, **kwargs)
386 class OutputMultiplexer(object):
388 A class that broadcasts printed messages to several sinks (including
389 various logging levels, different files, different file handles,
390 the house log, etc...)
393 class Destination(enum.IntEnum):
394 """Bits in the destination_bitv bitvector. Used to indicate the
395 output destination."""
398 LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor.
400 LOG_CRITICAL = 0x10 # ⎭
401 FILENAMES = 0x20 # Must provide a filename to the c'tor.
402 FILEHANDLES = 0x40 # Must provide a handle to the c'tor.
404 ALL_LOG_DESTINATIONS = (
405 LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL
407 ALL_OUTPUT_DESTINATIONS = 0x8F
410 destination_bitv: int,
413 filenames: Optional[Iterable[str]] = None,
414 handles: Optional[Iterable[io.TextIOWrapper]] = None):
416 logger = logging.getLogger(None)
419 if filenames is not None:
421 open(filename, 'wb', buffering=0) for filename in filenames
424 if destination_bitv & OutputMultiplexer.FILENAMES:
426 "Filenames argument is required if bitv & FILENAMES"
430 if handles is not None:
431 self.h = [handle for handle in handles]
433 if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES:
435 "Handle argument is required if bitv & FILEHANDLES"
439 self.set_destination_bitv(destination_bitv)
441 def get_destination_bitv(self):
442 return self.destination_bitv
444 def set_destination_bitv(self, destination_bitv: int):
445 if destination_bitv & self.Destination.FILENAMES and self.f is None:
447 "Filename argument is required if bitv & FILENAMES"
449 if destination_bitv & self.Destination.FILEHANDLES and self.h is None:
451 "Handle argument is required if bitv & FILEHANDLES"
453 self.destination_bitv = destination_bitv
455 def print(self, *args, **kwargs):
456 from string_utils import sprintf, strip_escape_sequences
457 end = kwargs.pop("end", None)
459 if not isinstance(end, str):
460 raise TypeError("end must be None or a string")
461 sep = kwargs.pop("sep", None)
463 if not isinstance(sep, str):
464 raise TypeError("sep must be None or a string")
466 raise TypeError("invalid keyword arguments to print()")
467 buf = sprintf(*args, end="", sep=sep)
475 self.destination_bitv & self.Destination.FILENAMES and
479 _.write(buf.encode('utf-8'))
483 self.destination_bitv & self.Destination.FILEHANDLES and
490 buf = strip_escape_sequences(buf)
491 if self.logger is not None:
492 if self.destination_bitv & self.Destination.LOG_DEBUG:
493 self.logger.debug(buf)
494 if self.destination_bitv & self.Destination.LOG_INFO:
495 self.logger.info(buf)
496 if self.destination_bitv & self.Destination.LOG_WARNING:
497 self.logger.warning(buf)
498 if self.destination_bitv & self.Destination.LOG_ERROR:
499 self.logger.error(buf)
500 if self.destination_bitv & self.Destination.LOG_CRITICAL:
501 self.logger.critical(buf)
502 if self.destination_bitv & self.Destination.HLOG:
506 if self.f is not None:
511 class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator):
513 A context that uses an OutputMultiplexer. e.g.
515 with OutputMultiplexerContext(
516 OutputMultiplexer.LOG_INFO |
517 OutputMultiplexer.LOG_DEBUG |
518 OutputMultiplexer.FILENAMES |
519 OutputMultiplexer.FILEHANDLES,
520 filenames = [ '/tmp/foo.log', '/var/log/bar.log' ],
523 mplex.print("This is a log message!")
527 destination_bitv: OutputMultiplexer.Destination,
541 def __exit__(self, etype, value, traceback) -> bool:
543 if etype is not None:
548 def hlog(message: str) -> None:
549 """Write a message to the house log."""
551 message = message.replace("'", "'\"'\"'")
552 os.system(f"/usr/bin/logger -p local7.info -- '{message}'")
555 if __name__ == '__main__':