#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""This is a module for wrapping around python programs and doing some
+minor setup and tear down work for them. With it, you will get:
+
+* The ability to break into pdb on unhandled exceptions,
+* automatic support for :file:`config.py` (argument parsing)
+* automatic logging support for :file:`logging.py`,
+* the ability to enable code profiling,
+* the ability to enable module import auditing,
+* optional memory profiling for your program,
+* ability to set random seed via commandline,
+* automatic program timing and reporting,
+* more verbose error handling and reporting,
+
+Most of these are enabled and/or configured via commandline flags
+(see below).
+
+"""
+
import functools
+import importlib
+import importlib.abc
import logging
import os
-from inspect import stack
import sys
+from inspect import stack
+
+import config
+import logging_utils
+from argparse_utils import ActionNoYes
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
-from argparse_utils import ActionNoYes
-import config
-import logging_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(
+cfg = config.add_commandline_args(
f'Bootstrap ({__file__})',
- 'Args related to python program bootstrapper and Swiss army knife')
-args.add_argument(
+ 'Args related to python program bootstrapper and Swiss army knife',
+)
+cfg.add_argument(
'--debug_unhandled_exceptions',
action=ActionNoYes,
default=False,
- help='Break into pdb on top level unhandled exceptions.'
+ help='Break into pdb on top level unhandled exceptions.',
)
-args.add_argument(
+cfg.add_argument(
'--show_random_seed',
action=ActionNoYes,
default=False,
- help='Should we display (and log.debug) the global random seed?'
+ help='Should we display (and log.debug) the global random seed?',
)
-args.add_argument(
+cfg.add_argument(
'--set_random_seed',
type=int,
nargs=1,
default=None,
metavar='SEED_INT',
- help='Override the global random seed with a particular number.'
+ help='Override the global random seed with a particular number.',
)
-args.add_argument(
+cfg.add_argument(
'--dump_all_objects',
action=ActionNoYes,
default=False,
- help='Should we dump the Python import tree before main?'
+ help='Should we dump the Python import tree before main?',
)
-args.add_argument(
+cfg.add_argument(
'--audit_import_events',
action=ActionNoYes,
default=False,
help='Should we audit all import events?',
)
+cfg.add_argument(
+ '--run_profiler',
+ action=ActionNoYes,
+ default=False,
+ help='Should we run cProfile on this code?',
+)
+cfg.add_argument(
+ '--trace_memory',
+ action=ActionNoYes,
+ default=False,
+ help='Should we record/report on memory utilization?',
+)
-
-original_hook = sys.excepthook
+ORIGINAL_EXCEPTION_HOOK = sys.excepthook
def handle_uncaught_exception(exc_type, exc_value, exc_tb):
maybe attaches a debugger.
"""
- global original_hook
msg = f'Unhandled top level exception {exc_type}'
logger.exception(msg)
print(msg, file=sys.stderr)
sys.__excepthook__(exc_type, exc_value, exc_tb)
return
else:
- if (
- not sys.stderr.isatty() or
- not sys.stdin.isatty()
- ):
- # stdin or stderr is redirected, just do the normal thing
- original_hook(exc_type, exc_value, exc_tb)
- else:
- # a terminal is attached and stderr is not redirected, maybe debug.
- import traceback
- traceback.print_exception(exc_type, exc_value, exc_tb)
+ import io
+ import traceback
+
+ tb_output = io.StringIO()
+ traceback.print_tb(exc_tb, None, tb_output)
+ print(tb_output.getvalue(), file=sys.stderr)
+ logger.error(tb_output.getvalue())
+ tb_output.close()
+
+ # stdin or stderr is redirected, just do the normal thing
+ if not sys.stderr.isatty() or not sys.stdin.isatty():
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
+
+ else: # a terminal is attached and stderr isn't redirected, maybe debug.
if config.config['debug_unhandled_exceptions']:
- import pdb
logger.info("Invoking the debugger...")
+ import pdb
+
pdb.pm()
else:
- original_hook(exc_type, exc_value, exc_tb)
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
+
+
+class ImportInterceptor(importlib.abc.MetaPathFinder):
+ """An interceptor that always allows module load events but dumps a
+ record into the log and onto stdout when modules are loaded and
+ produces an audit of who imported what at the end of the run. It
+ can't see any load events that happen before it, though, so move
+ bootstrap up in your __main__'s import list just temporarily to
+ get a good view.
+ """
-class ImportInterceptor(object):
def __init__(self):
import collect.trie
+
self.module_by_filename_cache = {}
self.repopulate_modules_by_filename()
self.tree = collect.trie.Trie()
def repopulate_modules_by_filename(self):
self.module_by_filename_cache.clear()
- for mod in sys.modules:
- if hasattr(sys.modules[mod], '__file__'):
- fname = getattr(sys.modules[mod], '__file__')
+ for _, mod in sys.modules.copy().items(): # copy here because modules is volatile
+ if hasattr(mod, '__file__'):
+ fname = getattr(mod, '__file__')
else:
fname = 'unknown'
self.module_by_filename_cache[fname] = mod
- def should_ignore_filename(self, filename: str) -> bool:
+ @staticmethod
+ def should_ignore_filename(filename: str) -> bool:
return 'importlib' in filename or 'six.py' in filename
- def find_spec(self, loaded_module, path=None, target=None):
+ def find_module(self, fullname, path):
+ raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
+
+ def find_spec(self, loaded_module, path=None, _=None):
s = stack()
for x in range(3, len(s)):
filename = s[x].filename
- if self.should_ignore_filename(filename):
+ if ImportInterceptor.should_ignore_filename(filename):
continue
loading_function = s[x].function
logger.debug(msg)
print(msg)
+ def invalidate_caches(self):
+ pass
+
def find_importer(self, module: str):
if module in self.tree_node_by_module:
node = self.tree_node_by_module[module]
return []
-# # TODO: test this with python 3.8+
-# def audit_import_events(event, args):
-# if event == 'import':
-# module = args[0]
-# filename = args[1]
-# sys_path = args[2]
-# sys_meta_path = args[3]
-# sys_path_hooks = args[4]
-# logger.debug(msg)
-# print(msg)
-
-
# Audit import events? Note: this runs early in the lifetime of the
# process (assuming that import bootstrap happens early); config has
# (probably) not yet been loaded or parsed the commandline. Also,
#
# Also note: move bootstrap up in the global import list to catch
# more import events and have a more complete record.
-import_interceptor = None
+IMPORT_INTERCEPTOR = None
for arg in sys.argv:
if arg == '--audit_import_events':
- import_interceptor = ImportInterceptor()
- sys.meta_path = [import_interceptor] + sys.meta_path
- # if not hasattr(sys, 'frozen'):
- # if (
- # sys.version_info[0] == 3
- # and sys.version_info[1] >= 8
- # ):
- # sys.addaudithook(audit_import_events)
+ IMPORT_INTERCEPTOR = ImportInterceptor()
+ sys.meta_path.insert(0, IMPORT_INTERCEPTOR)
def dump_all_objects() -> None:
- global import_interceptor
+ """Helper code to dump all known python objects."""
+
messages = {}
all_modules = sys.modules
for obj in object.__subclasses__():
mod_file = mod.__file__
else:
mod_file = 'unknown'
- if import_interceptor is not None:
- import_path = import_interceptor.find_importer(mod_name)
+ if IMPORT_INTERCEPTOR is not None:
+ import_path = IMPORT_INTERCEPTOR.find_importer(mod_name)
else:
import_path = 'unknown'
msg = f'{class_mod_name}::{klass} ({mod_file})'
def initialize(entry_point):
"""
Remember to initialize config, initialize logging, set/log a random
- seed, etc... before running main.
+ seed, etc... before running main. If you use this decorator around
+ your main, like this::
+ import bootstrap
+
+ @bootstrap.initialize
+ def main():
+ whatever
+
+ if __name__ == '__main__':
+ main()
+
+ You get:
+
+ * The ability to break into pdb on unhandled exceptions,
+ * automatic support for :file:`config.py` (argument parsing)
+ * automatic logging support for :file:`logging.py`,
+ * the ability to enable code profiling,
+ * the ability to enable module import auditing,
+ * optional memory profiling for your program,
+ * ability to set random seed via commandline,
+ * automatic program timing and reporting,
+ * more verbose error handling and reporting,
+
+ Most of these are enabled and/or configured via commandline flags
+ (see below).
"""
+
@functools.wraps(entry_point)
def initialize_wrapper(*args, **kwargs):
# Hook top level unhandled exceptions, maybe invoke debugger.
# Try to figure out the name of the program entry point. Then
# parse configuration (based on cmdline flags, environment vars
# etc...)
- if (
- '__globals__' in entry_point.__dict__ and
- '__file__' in entry_point.__globals__
- ):
- config.parse(entry_point.__globals__['__file__'])
- else:
- config.parse(None)
+ entry_filename = None
+ entry_descr = None
+ try:
+ entry_filename = entry_point.__code__.co_filename
+ entry_descr = entry_point.__code__.__repr__()
+ except Exception:
+ if '__globals__' in entry_point.__dict__ and '__file__' in entry_point.__globals__:
+ entry_filename = entry_point.__globals__['__file__']
+ entry_descr = entry_filename
+ config.parse(entry_filename)
+
+ if config.config['trace_memory']:
+ import tracemalloc
+
+ tracemalloc.start()
# Initialize logging... and log some remembered messages from
# config module.
logging_utils.initialize_logging(logging.getLogger())
config.late_logging()
+ # Maybe log some info about the python interpreter itself.
+ logger.debug(
+ 'Platform: %s, maxint=0x%x, byteorder=%s', sys.platform, sys.maxsize, sys.byteorder
+ )
+ logger.debug('Python interpreter version: %s', sys.version)
+ logger.debug('Python implementation: %s', sys.implementation)
+ logger.debug('Python C API version: %s', sys.api_version)
+ if __debug__:
+ logger.debug('Python interpreter running in __debug__ mode.')
+ else:
+ logger.debug('Python interpreter running in optimized mode.')
+ logger.debug('Python path: %s', sys.path)
+
+ # Log something about the site_config, many things use it.
+ import site_config
+
+ logger.debug('Global site_config: %s', site_config.get_config())
+
# Allow programs that don't bother to override the random seed
# to be replayed via the commandline.
import random
+
random_seed = config.config['set_random_seed']
if random_seed is not None:
random_seed = random_seed[0]
random.seed(random_seed)
# Do it, invoke the user's code. Pay attention to how long it takes.
- logger.debug(f'Starting {entry_point.__name__} (program entry point)')
+ logger.debug('Starting %s (program entry point)', entry_descr)
ret = None
import stopwatch
- with stopwatch.Timer() as t:
- ret = entry_point(*args, **kwargs)
- logger.debug(
- f'{entry_point.__name__} (program entry point) returned {ret}.'
- )
+
+ if config.config['run_profiler']:
+ import cProfile
+ from pstats import SortKey
+
+ with stopwatch.Timer() as t:
+ cProfile.runctx(
+ "ret = entry_point(*args, **kwargs)",
+ globals(),
+ locals(),
+ None,
+ SortKey.CUMULATIVE,
+ )
+ else:
+ with stopwatch.Timer() as t:
+ ret = entry_point(*args, **kwargs)
+
+ logger.debug('%s (program entry point) returned %s.', entry_descr, ret)
+
+ if config.config['trace_memory']:
+ snapshot = tracemalloc.take_snapshot()
+ top_stats = snapshot.statistics('lineno')
+ print()
+ print("--trace_memory's top 10 memory using files:")
+ for stat in top_stats[:10]:
+ print(stat)
if config.config['dump_all_objects']:
dump_all_objects()
if config.config['audit_import_events']:
- global import_interceptor
- if import_interceptor is not None:
- print(import_interceptor.tree)
+ if IMPORT_INTERCEPTOR is not None:
+ print(IMPORT_INTERCEPTOR.tree)
walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
- logger.debug('\n'
- f'user: {utime}s\n'
- f'system: {stime}s\n'
- f'child user: {cutime}s\n'
- f'child system: {cstime}s\n'
- f'machine uptime: {elapsed_time}s\n'
- f'walltime: {walltime}s')
+ logger.debug(
+ '\n'
+ 'user: %.4fs\n'
+ 'system: %.4fs\n'
+ 'child user: %.4fs\n'
+ 'child system: %.4fs\n'
+ 'machine uptime: %.4fs\n'
+ 'walltime: %.4fs',
+ utime,
+ stime,
+ cutime,
+ cstime,
+ elapsed_time,
+ walltime,
+ )
# If it doesn't return cleanly, call attention to the return value.
if ret is not None and ret != 0:
- logger.error(f'Exit {ret}')
+ logger.error('Exit %s', ret)
else:
- logger.debug(f'Exit {ret}')
+ logger.debug('Exit %s', ret)
sys.exit(ret)
+
return initialize_wrapper