import os
import sys
from inspect import stack
-from typing import List
import config
import logging_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(
+cfg = config.add_commandline_args(
f'Bootstrap ({__file__})',
'Args related to python program bootstrapper and Swiss army knife',
)
-args.add_argument(
+cfg.add_argument(
'--debug_unhandled_exceptions',
action=ActionNoYes,
default=False,
help='Break into pdb on top level unhandled exceptions.',
)
-args.add_argument(
+cfg.add_argument(
'--show_random_seed',
action=ActionNoYes,
default=False,
help='Should we display (and log.debug) the global random seed?',
)
-args.add_argument(
+cfg.add_argument(
'--set_random_seed',
type=int,
nargs=1,
metavar='SEED_INT',
help='Override the global random seed with a particular number.',
)
-args.add_argument(
+cfg.add_argument(
'--dump_all_objects',
action=ActionNoYes,
default=False,
help='Should we dump the Python import tree before main?',
)
-args.add_argument(
+cfg.add_argument(
'--audit_import_events',
action=ActionNoYes,
default=False,
help='Should we audit all import events?',
)
-args.add_argument(
+cfg.add_argument(
'--run_profiler',
action=ActionNoYes,
default=False,
help='Should we run cProfile on this code?',
)
-args.add_argument(
+cfg.add_argument(
'--trace_memory',
action=ActionNoYes,
default=False,
help='Should we record/report on memory utilization?',
)
-original_hook = sys.excepthook
+ORIGINAL_EXCEPTION_HOOK = sys.excepthook
def handle_uncaught_exception(exc_type, exc_value, exc_tb):
maybe attaches a debugger.
"""
- global original_hook
msg = f'Unhandled top level exception {exc_type}'
logger.exception(msg)
print(msg, file=sys.stderr)
else:
if not sys.stderr.isatty() or not sys.stdin.isatty():
# stdin or stderr is redirected, just do the normal thing
- original_hook(exc_type, exc_value, exc_tb)
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
else:
# a terminal is attached and stderr is not redirected, maybe debug.
import traceback
logger.info("Invoking the debugger...")
pdb.pm()
else:
- original_hook(exc_type, exc_value, exc_tb)
+ ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
class ImportInterceptor(importlib.abc.MetaPathFinder):
fname = 'unknown'
self.module_by_filename_cache[fname] = mod
- def should_ignore_filename(self, filename: str) -> bool:
+ @staticmethod
+ def should_ignore_filename(filename: str) -> bool:
return 'importlib' in filename or 'six.py' in filename
def find_module(self, fullname, path):
raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
- def find_spec(self, loaded_module, path=None, target=None):
+ def find_spec(self, loaded_module, path=None, _=None):
s = stack()
for x in range(3, len(s)):
filename = s[x].filename
- if self.should_ignore_filename(filename):
+ if ImportInterceptor.should_ignore_filename(filename):
continue
loading_function = s[x].function
#
# Also note: move bootstrap up in the global import list to catch
# more import events and have a more complete record.
-import_interceptor = None
+IMPORT_INTERCEPTOR = None
for arg in sys.argv:
if arg == '--audit_import_events':
- import_interceptor = ImportInterceptor()
- sys.meta_path.insert(0, import_interceptor)
+ IMPORT_INTERCEPTOR = ImportInterceptor()
+ sys.meta_path.insert(0, IMPORT_INTERCEPTOR)
def dump_all_objects() -> None:
- global import_interceptor
messages = {}
all_modules = sys.modules
for obj in object.__subclasses__():
mod_file = mod.__file__
else:
mod_file = 'unknown'
- if import_interceptor is not None:
- import_path = import_interceptor.find_importer(mod_name)
+ if IMPORT_INTERCEPTOR is not None:
+ import_path = IMPORT_INTERCEPTOR.find_importer(mod_name)
else:
import_path = 'unknown'
msg = f'{class_mod_name}::{klass} ({mod_file})'
# Maybe log some info about the python interpreter itself.
logger.debug(
- f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}'
+ 'Platform: %s, maxint=0x%x, byteorder=%s',
+ sys.platform, sys.maxsize, sys.byteorder
)
- logger.debug(f'Python interpreter version: {sys.version}')
- logger.debug(f'Python implementation: {sys.implementation}')
- logger.debug(f'Python C API version: {sys.api_version}')
- logger.debug(f'Python path: {sys.path}')
+ logger.debug('Python interpreter version: %s', sys.version)
+ logger.debug('Python implementation: %s', sys.implementation)
+ logger.debug('Python C API version: %s', sys.api_version)
+ logger.debug('Python path: %s', sys.path)
# Log something about the site_config, many things use it.
import site_config
- logger.debug(f'Global site_config: {site_config.get_config()}')
+ logger.debug('Global site_config: %s', site_config.get_config())
# Allow programs that don't bother to override the random seed
# to be replayed via the commandline.
random.seed(random_seed)
# Do it, invoke the user's code. Pay attention to how long it takes.
- logger.debug(f'Starting {entry_point.__name__} (program entry point)')
+ logger.debug('Starting %s (program entry point)', entry_point.__name__)
ret = None
import stopwatch
with stopwatch.Timer() as t:
ret = entry_point(*args, **kwargs)
- logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.')
+ logger.debug('%s (program entry point) returned %s.', entry_point.__name__, ret)
if config.config['trace_memory']:
snapshot = tracemalloc.take_snapshot()
dump_all_objects()
if config.config['audit_import_events']:
- global import_interceptor
- if import_interceptor is not None:
- print(import_interceptor.tree)
+ if IMPORT_INTERCEPTOR is not None:
+ print(IMPORT_INTERCEPTOR.tree)
walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
logger.debug(
'\n'
- f'user: {utime}s\n'
- f'system: {stime}s\n'
- f'child user: {cutime}s\n'
- f'child system: {cstime}s\n'
- f'machine uptime: {elapsed_time}s\n'
- f'walltime: {walltime}s'
+ 'user: %.4fs\n'
+ 'system: %.4fs\n'
+ 'child user: %.4fs\n'
+ 'child system: %.4fs\n'
+ 'machine uptime: %.4fs\n'
+ 'walltime: %.4fs',
+ utime, stime, cutime, cstime, elapsed_time, walltime
)
# If it doesn't return cleanly, call attention to the return value.
if ret is not None and ret != 0:
- logger.error(f'Exit {ret}')
+ logger.error('Exit %s', ret)
else:
- logger.debug(f'Exit {ret}')
+ logger.debug('Exit %s', ret)
sys.exit(ret)
return initialize_wrapper