#!/usr/bin/env python3
import functools
+import importlib
import logging
import os
-from inspect import stack
import sys
+from inspect import stack
+from typing import List
+
+import config
+import logging_utils
+from argparse_utils import ActionNoYes
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
-from argparse_utils import ActionNoYes
-import config
-import logging_utils
logger = logging.getLogger(__name__)
default=False,
help='Should we audit all import events?',
)
-
+args.add_argument(
+ '--run_profiler',
+ action=ActionNoYes,
+ default=False,
+ help='Should we run cProfile on this code?',
+)
+args.add_argument(
+ '--trace_memory',
+ action=ActionNoYes,
+ default=False,
+ help='Should we record/report on memory utilization?',
+)
original_hook = sys.excepthook
original_hook(exc_type, exc_value, exc_tb)
-class ImportInterceptor(object):
+class ImportInterceptor(importlib.abc.MetaPathFinder):
def __init__(self):
import collect.trie
def should_ignore_filename(self, filename: str) -> bool:
return 'importlib' in filename or 'six.py' in filename
+ def find_module(self, fullname, path):
+ raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
+
def find_spec(self, loaded_module, path=None, target=None):
s = stack()
for x in range(3, len(s)):
logger.debug(msg)
print(msg)
+ def invalidate_caches(self):
+ pass
+
def find_importer(self, module: str):
if module in self.tree_node_by_module:
node = self.tree_node_by_module[module]
return []
-# # TODO: test this with python 3.8+
-# def audit_import_events(event, args):
-# if event == 'import':
-# module = args[0]
-# filename = args[1]
-# sys_path = args[2]
-# sys_meta_path = args[3]
-# sys_path_hooks = args[4]
-# logger.debug(msg)
-# print(msg)
-
-
# Audit import events? Note: this runs early in the lifetime of the
# process (assuming that import bootstrap happens early); config has
# (probably) not yet been loaded or parsed the commandline. Also,
for arg in sys.argv:
if arg == '--audit_import_events':
import_interceptor = ImportInterceptor()
- sys.meta_path = [import_interceptor] + sys.meta_path
- # if not hasattr(sys, 'frozen'):
- # if (
- # sys.version_info[0] == 3
- # and sys.version_info[1] >= 8
- # ):
- # sys.addaudithook(audit_import_events)
+ sys.meta_path.insert(0, import_interceptor)
def dump_all_objects() -> None:
# Try to figure out the name of the program entry point. Then
# parse configuration (based on cmdline flags, environment vars
# etc...)
- if (
- '__globals__' in entry_point.__dict__
- and '__file__' in entry_point.__globals__
- ):
+ if '__globals__' in entry_point.__dict__ and '__file__' in entry_point.__globals__:
config.parse(entry_point.__globals__['__file__'])
else:
config.parse(None)
+ if config.config['trace_memory']:
+ import tracemalloc
+
+ tracemalloc.start()
+
# Initialize logging... and log some remembered messages from
# config module.
logging_utils.initialize_logging(logging.getLogger())
config.late_logging()
+ # Maybe log some info about the python interpreter itself.
+ logger.debug(
+ f'Platform: {sys.platform}, maxint=0x{sys.maxsize:x}, byteorder={sys.byteorder}'
+ )
+ logger.debug(f'Python interpreter version: {sys.version}')
+ logger.debug(f'Python implementation: {sys.implementation}')
+ logger.debug(f'Python C API version: {sys.api_version}')
+ logger.debug(f'Python path: {sys.path}')
+
# Allow programs that don't bother to override the random seed
# to be replayed via the commandline.
import random
ret = None
import stopwatch
- with stopwatch.Timer() as t:
- ret = entry_point(*args, **kwargs)
+ if config.config['run_profiler']:
+ import cProfile
+ from pstats import SortKey
+
+ with stopwatch.Timer() as t:
+ cProfile.runctx(
+ "ret = entry_point(*args, **kwargs)",
+ globals(),
+ locals(),
+ None,
+ SortKey.CUMULATIVE,
+ )
+ else:
+ with stopwatch.Timer() as t:
+ ret = entry_point(*args, **kwargs)
+
logger.debug(f'{entry_point.__name__} (program entry point) returned {ret}.')
+ if config.config['trace_memory']:
+ snapshot = tracemalloc.take_snapshot()
+ top_stats = snapshot.statistics('lineno')
+ print()
+ print("--trace_memory's top 10 memory using files:")
+ for stat in top_stats[:10]:
+ print(stat)
+
if config.config['dump_all_objects']:
dump_all_objects()