6 from inspect import stack
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
12 from argparse_utils import ActionNoYes
16 logger = logging.getLogger(__name__)
18 args = config.add_commandline_args(
19 f'Bootstrap ({__file__})',
20 'Args related to python program bootstrapper and Swiss army knife')
22 '--debug_unhandled_exceptions',
25 help='Break into pdb on top level unhandled exceptions.'
31 help='Should we display (and log.debug) the global random seed?'
39 help='Override the global random seed with a particular number.'
45 help='Should we dump the Python import tree before main?'
48 '--audit_import_events',
51 help='Should we audit all import events?',
55 original_hook = sys.excepthook
58 def handle_uncaught_exception(exc_type, exc_value, exc_tb):
60 Top-level exception handler for exceptions that make it past any exception
61 handlers in the python code being run. Logs the error and stacktrace then
62 maybe attaches a debugger.
65 msg = f'Unhandled top level exception {exc_type}'
67 print(msg, file=sys.stderr)
68 if issubclass(exc_type, KeyboardInterrupt):
69 sys.__excepthook__(exc_type, exc_value, exc_tb)
73 not sys.stderr.isatty() or
74 not sys.stdin.isatty()
76 # stdin or stderr is redirected, just do the normal thing
77 original_hook(exc_type, exc_value, exc_tb)
79 # a terminal is attached and stderr is not redirected, maybe debug.
81 traceback.print_exception(exc_type, exc_value, exc_tb)
82 if config.config['debug_unhandled_exceptions']:
84 logger.info("Invoking the debugger...")
87 original_hook(exc_type, exc_value, exc_tb)
90 class ImportInterceptor(object):
93 self.module_by_filename_cache = {}
94 self.repopulate_modules_by_filename()
95 self.tree = collect.trie.Trie()
96 self.tree_node_by_module = {}
98 def repopulate_modules_by_filename(self):
99 self.module_by_filename_cache.clear()
100 for mod in sys.modules:
101 if hasattr(sys.modules[mod], '__file__'):
102 fname = getattr(sys.modules[mod], '__file__')
105 self.module_by_filename_cache[fname] = mod
107 def should_ignore_filename(self, filename: str) -> bool:
108 return 'importlib' in filename or 'six.py' in filename
110 def find_spec(self, loaded_module, path=None, target=None):
112 for x in range(3, len(s)):
113 filename = s[x].filename
114 if self.should_ignore_filename(filename):
117 loading_function = s[x].function
118 if filename in self.module_by_filename_cache:
119 loading_module = self.module_by_filename_cache[filename]
121 self.repopulate_modules_by_filename()
122 loading_module = self.module_by_filename_cache.get(filename, 'unknown')
124 path = self.tree_node_by_module.get(loading_module, [])
125 path.extend([loaded_module])
126 self.tree.insert(path)
127 self.tree_node_by_module[loading_module] = path
129 msg = f'*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}'
133 msg = f'*** Import {loaded_module} from ?????'
137 def find_importer(self, module: str):
138 if module in self.tree_node_by_module:
139 node = self.tree_node_by_module[module]
144 # # TODO: test this with python 3.8+
145 # def audit_import_events(event, args):
146 # if event == 'import':
150 # sys_meta_path = args[3]
151 # sys_path_hooks = args[4]
156 # Audit import events? Note: this runs early in the lifetime of the
157 # process (assuming that import bootstrap happens early); config has
158 # (probably) not yet been loaded or parsed the commandline. Also,
159 # some things have probably already been imported while we weren't
160 # watching so this information may be incomplete.
162 # Also note: move bootstrap up in the global import list to catch
163 # more import events and have a more complete record.
164 import_interceptor = None
166 if arg == '--audit_import_events':
167 import_interceptor = ImportInterceptor()
168 sys.meta_path = [import_interceptor] + sys.meta_path
169 # if not hasattr(sys, 'frozen'):
171 # sys.version_info[0] == 3
172 # and sys.version_info[1] >= 8
174 # sys.addaudithook(audit_import_events)
177 def dump_all_objects() -> None:
178 global import_interceptor
180 all_modules = sys.modules
181 for obj in object.__subclasses__():
182 if not hasattr(obj, '__name__'):
185 if not hasattr(obj, '__module__'):
187 class_mod_name = obj.__module__
188 if class_mod_name in all_modules:
189 mod = all_modules[class_mod_name]
190 if not hasattr(mod, '__name__'):
191 mod_name = class_mod_name
193 mod_name = mod.__name__
194 if hasattr(mod, '__file__'):
195 mod_file = mod.__file__
198 if import_interceptor is not None:
199 import_path = import_interceptor.find_importer(mod_name)
201 import_path = 'unknown'
202 msg = f'{class_mod_name}::{klass} ({mod_file})'
203 if import_path != 'unknown' and len(import_path) > 0:
204 msg += f' imported by {import_path}'
205 messages[f'{class_mod_name}::{klass}'] = msg
206 for x in sorted(messages.keys()):
207 logger.debug(messages[x])
211 def initialize(entry_point):
213 Remember to initialize config, initialize logging, set/log a random
214 seed, etc... before running main.
217 @functools.wraps(entry_point)
218 def initialize_wrapper(*args, **kwargs):
219 # Hook top level unhandled exceptions, maybe invoke debugger.
220 if sys.excepthook == sys.__excepthook__:
221 sys.excepthook = handle_uncaught_exception
223 # Try to figure out the name of the program entry point. Then
224 # parse configuration (based on cmdline flags, environment vars
227 '__globals__' in entry_point.__dict__ and
228 '__file__' in entry_point.__globals__
230 config.parse(entry_point.__globals__['__file__'])
234 # Initialize logging... and log some remembered messages from
236 logging_utils.initialize_logging(logging.getLogger())
237 config.late_logging()
239 # Allow programs that don't bother to override the random seed
240 # to be replayed via the commandline.
242 random_seed = config.config['set_random_seed']
243 if random_seed is not None:
244 random_seed = random_seed[0]
246 random_seed = int.from_bytes(os.urandom(4), 'little')
248 if config.config['show_random_seed']:
249 msg = f'Global random seed is: {random_seed}'
252 random.seed(random_seed)
254 # Do it, invoke the user's code. Pay attention to how long it takes.
255 logger.debug(f'Starting {entry_point.__name__} (program entry point)')
258 with stopwatch.Timer() as t:
259 ret = entry_point(*args, **kwargs)
261 f'{entry_point.__name__} (program entry point) returned {ret}.'
264 if config.config['dump_all_objects']:
267 if config.config['audit_import_events']:
268 global import_interceptor
269 if import_interceptor is not None:
270 print(import_interceptor.tree)
273 (utime, stime, cutime, cstime, elapsed_time) = os.times()
276 f'system: {stime}s\n'
277 f'child user: {cutime}s\n'
278 f'child system: {cstime}s\n'
279 f'machine uptime: {elapsed_time}s\n'
280 f'walltime: {walltime}s')
282 # If it doesn't return cleanly, call attention to the return value.
283 if ret is not None and ret != 0:
284 logger.error(f'Exit {ret}')
286 logger.debug(f'Exit {ret}')
288 return initialize_wrapper