#!/usr/bin/env python3 import functools import importlib import logging import os import sys from inspect import stack from typing import List import config import logging_utils from argparse_utils import ActionNoYes # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. logger = logging.getLogger(__name__) args = config.add_commandline_args( f'Bootstrap ({__file__})', 'Args related to python program bootstrapper and Swiss army knife', ) args.add_argument( '--debug_unhandled_exceptions', action=ActionNoYes, default=False, help='Break into pdb on top level unhandled exceptions.', ) args.add_argument( '--show_random_seed', action=ActionNoYes, default=False, help='Should we display (and log.debug) the global random seed?', ) args.add_argument( '--set_random_seed', type=int, nargs=1, default=None, metavar='SEED_INT', help='Override the global random seed with a particular number.', ) args.add_argument( '--dump_all_objects', action=ActionNoYes, default=False, help='Should we dump the Python import tree before main?', ) args.add_argument( '--audit_import_events', action=ActionNoYes, default=False, help='Should we audit all import events?', ) args.add_argument( '--run_profiler', action=ActionNoYes, default=False, help='Should we run cProfile on this code?', ) args.add_argument( '--trace_memory', action=ActionNoYes, default=False, help='Should we record/report on memory utilization?', ) original_hook = sys.excepthook def handle_uncaught_exception(exc_type, exc_value, exc_tb): """ Top-level exception handler for exceptions that make it past any exception handlers in the python code being run. Logs the error and stacktrace then maybe attaches a debugger. """ global original_hook msg = f'Unhandled top level exception {exc_type}' logger.exception(msg) print(msg, file=sys.stderr) if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_tb) return else: if not sys.stderr.isatty() or not sys.stdin.isatty(): # stdin or stderr is redirected, just do the normal thing original_hook(exc_type, exc_value, exc_tb) else: # a terminal is attached and stderr is not redirected, maybe debug. import traceback traceback.print_exception(exc_type, exc_value, exc_tb) if config.config['debug_unhandled_exceptions']: import pdb logger.info("Invoking the debugger...") pdb.pm() else: original_hook(exc_type, exc_value, exc_tb) class ImportInterceptor(importlib.abc.MetaPathFinder): def __init__(self): import collect.trie self.module_by_filename_cache = {} self.repopulate_modules_by_filename() self.tree = collect.trie.Trie() self.tree_node_by_module = {} def repopulate_modules_by_filename(self): self.module_by_filename_cache.clear() for mod in sys.modules: if hasattr(sys.modules[mod], '__file__'): fname = getattr(sys.modules[mod], '__file__') else: fname = 'unknown' self.module_by_filename_cache[fname] = mod def should_ignore_filename(self, filename: str) -> bool: return 'importlib' in filename or 'six.py' in filename def find_module(self, fullname, path): raise Exception( "This method has been deprecated since Python 3.4, please upgrade." ) def find_spec(self, loaded_module, path=None, target=None): s = stack() for x in range(3, len(s)): filename = s[x].filename if self.should_ignore_filename(filename): continue loading_function = s[x].function if filename in self.module_by_filename_cache: loading_module = self.module_by_filename_cache[filename] else: self.repopulate_modules_by_filename() loading_module = self.module_by_filename_cache.get(filename, 'unknown') path = self.tree_node_by_module.get(loading_module, []) path.extend([loaded_module]) self.tree.insert(path) self.tree_node_by_module[loading_module] = path msg = f'*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}' logger.debug(msg) print(msg) return msg = f'*** Import {loaded_module} from ?????' logger.debug(msg) print(msg) def invalidate_caches(self): pass def find_importer(self, module: str): if module in self.tree_node_by_module: node = self.tree_node_by_module[module] return node return [] # Audit import events? Note: this runs early in the lifetime of the # process (assuming that import bootstrap happens early); config has # (probably) not yet been loaded or parsed the commandline. Also, # some things have probably already been imported while we weren't # watching so this information may be incomplete. # # Also note: move bootstrap up in the global import list to catch # more import events and have a more complete record. import_interceptor = None for arg in sys.argv: if arg == '--audit_import_events': import_interceptor = ImportInterceptor() sys.meta_path.insert(0, import_interceptor) def dump_all_objects() -> None: global import_interceptor messages = {} all_modules = sys.modules for obj in object.__subclasses__(): if not hasattr(obj, '__name__'): continue klass = obj.__name__ if not hasattr(obj, '__module__'): continue class_mod_name = obj.__module__ if class_mod_name in all_modules: mod = all_modules[class_mod_name] if not hasattr(mod, '__name__'): mod_name = class_mod_name else: mod_name = mod.__name__ if hasattr(mod, '__file__'): mod_file = mod.__file__ else: mod_file = 'unknown' if import_interceptor is not None: import_path = import_interceptor.find_importer(mod_name) else: import_path = 'unknown' msg = f'{class_mod_name}::{klass} ({mod_file})' if import_path != 'unknown' and len(import_path) > 0: msg += f' imported by {import_path}' messages[f'{class_mod_name}::{klass}'] = msg for x in sorted(messages.keys()): logger.debug(messages[x]) print(messages[x]) def initialize(entry_point): """ Remember to initialize config, initialize logging, set/log a random seed, etc... before running main. """ @functools.wraps(entry_point) def initialize_wrapper(*args, **kwargs): # Hook top level unhandled exceptions, maybe invoke debugger. if sys.excepthook == sys.__excepthook__: sys.excepthook = handle_uncaught_exception # Try to figure out the name of the program entry point. Then # parse configuration (based on cmdline flags, environment vars # etc...) if ( '__globals__' in entry_point.__dict__ and '__file__' in entry_point.__globals__ ): config.parse(entry_point.__globals__['__file__']) else: config.parse(None) if config.config['trace_memory']: import tracemalloc tracemalloc.start() # Initialize logging... and log some remembered messages from # config module. logging_utils.initialize_logging(logging.getLogger()) config.late_logging() # Maybe log some info about the python interpreter itself. logger.debug( f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}' ) logger.debug(f'Python interpreter version: {sys.version}') logger.debug(f'Python implementation: {sys.implementation}') logger.debug(f'Python C API version: {sys.api_version}') logger.debug(f'Python path: {sys.path}') # Allow programs that don't bother to override the random seed # to be replayed via the commandline. import random random_seed = config.config['set_random_seed'] if random_seed is not None: random_seed = random_seed[0] else: random_seed = int.from_bytes(os.urandom(4), 'little') if config.config['show_random_seed']: msg = f'Global random seed is: {random_seed}' logger.debug(msg) print(msg) random.seed(random_seed) # Do it, invoke the user's code. Pay attention to how long it takes. logger.debug(f'Starting {entry_point.__name__} (program entry point)') ret = None import stopwatch if config.config['run_profiler']: import cProfile from pstats import SortKey with stopwatch.Timer() as t: cProfile.runctx( "ret = entry_point(*args, **kwargs)", globals(), locals(), None, SortKey.CUMULATIVE, ) else: with stopwatch.Timer() as t: ret = entry_point(*args, **kwargs) logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.') if config.config['trace_memory']: snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') print() print("--trace_memory's top 10 memory using files:") for stat in top_stats[:10]: print(stat) if config.config['dump_all_objects']: dump_all_objects() if config.config['audit_import_events']: global import_interceptor if import_interceptor is not None: print(import_interceptor.tree) walltime = t() (utime, stime, cutime, cstime, elapsed_time) = os.times() logger.debug( '\n' f'user: {utime}s\n' f'system: {stime}s\n' f'child user: {cutime}s\n' f'child system: {cstime}s\n' f'machine uptime: {elapsed_time}s\n' f'walltime: {walltime}s' ) # If it doesn't return cleanly, call attention to the return value. if ret is not None and ret != 0: logger.error(f'Exit {ret}') else: logger.debug(f'Exit {ret}') sys.exit(ret) return initialize_wrapper