import logging
import os
import sys
+import uuid
from inspect import stack
from pyutils import config, logging_utils
logger = logging.getLogger(__name__)
cfg = config.add_commandline_args(
- f'Bootstrap ({__file__})',
- 'Args related to python program bootstrapper and Swiss army knife',
+ f"Bootstrap ({__file__})",
+ "Args related to python program bootstrapper and Swiss army knife",
)
cfg.add_argument(
- '--debug_unhandled_exceptions',
+ "--debug_unhandled_exceptions",
action=ActionNoYes,
default=False,
- help='Break into pdb on top level unhandled exceptions.',
+ help="Break into pdb on top level unhandled exceptions.",
)
cfg.add_argument(
- '--show_random_seed',
+ "--show_random_seed",
action=ActionNoYes,
default=False,
- help='Should we display (and log.debug) the global random seed?',
+ help="Should we display (and log.debug) the global random seed?",
)
cfg.add_argument(
- '--set_random_seed',
+ "--set_random_seed",
type=int,
nargs=1,
default=None,
- metavar='SEED_INT',
- help='Override the global random seed with a particular number.',
+ metavar="SEED_INT",
+ help="Override the global random seed with a particular number.",
)
cfg.add_argument(
- '--dump_all_objects',
+ "--dump_all_objects",
action=ActionNoYes,
default=False,
- help='Should we dump the Python import tree before main?',
+ help="Should we dump the Python import tree before main?",
)
cfg.add_argument(
- '--audit_import_events',
+ "--audit_import_events",
action=ActionNoYes,
default=False,
- help='Should we audit all import events?',
+ help="Should we audit all import events?",
)
cfg.add_argument(
- '--run_profiler',
+ "--run_profiler",
action=ActionNoYes,
default=False,
- help='Should we run cProfile on this code?',
+ help="Should we run cProfile on this code?",
)
cfg.add_argument(
- '--trace_memory',
+ "--trace_memory",
action=ActionNoYes,
default=False,
- help='Should we record/report on memory utilization?',
+ help="Should we record/report on memory utilization?",
)
ORIGINAL_EXCEPTION_HOOK = sys.excepthook
maybe attaches a debugger.
"""
- msg = f'Unhandled top level exception {exc_type}'
+ msg = f"Unhandled top level exception {exc_type}"
logger.exception(msg)
print(msg, file=sys.stderr)
if issubclass(exc_type, KeyboardInterrupt):
ORIGINAL_EXCEPTION_HOOK(exc_type, exc_value, exc_tb)
else: # a terminal is attached and stderr isn't redirected, maybe debug.
- if config.config['debug_unhandled_exceptions']:
+ if config.config["debug_unhandled_exceptions"]:
logger.info("Invoking the debugger...")
import pdb
_,
mod,
) in sys.modules.copy().items(): # copy here because modules is volatile
- if hasattr(mod, '__file__'):
- fname = getattr(mod, '__file__')
+ if hasattr(mod, "__file__"):
+ fname = getattr(mod, "__file__")
else:
- fname = 'unknown'
+ fname = "unknown"
self.module_by_filename_cache[fname] = mod
@staticmethod
def should_ignore_filename(filename: str) -> bool:
- return 'importlib' in filename or 'six.py' in filename
+ return "importlib" in filename or "six.py" in filename
def find_module(self, fullname, path):
raise Exception(
loading_module = self.module_by_filename_cache[filename]
else:
self.repopulate_modules_by_filename()
- loading_module = self.module_by_filename_cache.get(filename, 'unknown')
+ loading_module = self.module_by_filename_cache.get(filename, "unknown")
path = self.tree_node_by_module.get(loading_module, [])
path.extend([loaded_module])
self.tree.insert(path)
self.tree_node_by_module[loading_module] = path
- msg = f'*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}'
+ msg = f"*** Import {loaded_module} from {filename}:{s[x].lineno} in {loading_module}::{loading_function}"
logger.debug(msg)
print(msg)
return
- msg = f'*** Import {loaded_module} from ?????'
+ msg = f"*** Import {loaded_module} from ?????"
logger.debug(msg)
print(msg)
# more import events and have a more complete record.
IMPORT_INTERCEPTOR = None
for arg in sys.argv:
- if arg == '--audit_import_events':
+ if arg == "--audit_import_events":
IMPORT_INTERCEPTOR = ImportInterceptor()
sys.meta_path.insert(0, IMPORT_INTERCEPTOR)
messages = {}
all_modules = sys.modules
for obj in object.__subclasses__():
- if not hasattr(obj, '__name__'):
+ if not hasattr(obj, "__name__"):
continue
klass = obj.__name__
- if not hasattr(obj, '__module__'):
+ if not hasattr(obj, "__module__"):
continue
class_mod_name = obj.__module__
if class_mod_name in all_modules:
mod = all_modules[class_mod_name]
- if not hasattr(mod, '__name__'):
+ if not hasattr(mod, "__name__"):
mod_name = class_mod_name
else:
mod_name = mod.__name__
- if hasattr(mod, '__file__'):
+ if hasattr(mod, "__file__"):
mod_file = mod.__file__
else:
- mod_file = 'unknown'
+ mod_file = "unknown"
if IMPORT_INTERCEPTOR is not None:
import_path = IMPORT_INTERCEPTOR.find_importer(mod_name)
else:
- import_path = 'unknown'
- msg = f'{class_mod_name}::{klass} ({mod_file})'
- if import_path != 'unknown' and len(import_path) > 0:
- msg += f' imported by {import_path}'
- messages[f'{class_mod_name}::{klass}'] = msg
+ import_path = "unknown"
+ msg = f"{class_mod_name}::{klass} ({mod_file})"
+ if import_path != "unknown" and len(import_path) > 0:
+ msg += f" imported by {import_path}"
+ messages[f"{class_mod_name}::{klass}"] = msg
for x in sorted(messages.keys()):
logger.debug(messages[x])
print(messages[x])
entry_descr = entry_point.__code__.__repr__()
except Exception:
if (
- '__globals__' in entry_point.__dict__
- and '__file__' in entry_point.__globals__
+ "__globals__" in entry_point.__dict__
+ and "__file__" in entry_point.__globals__
):
- entry_filename = entry_point.__globals__['__file__']
+ entry_filename = entry_point.__globals__["__file__"]
entry_descr = entry_filename
config.parse(entry_filename)
- if config.config['trace_memory']:
+ if config.config["trace_memory"]:
import tracemalloc
tracemalloc.start()
# Initialize logging... and log some remembered messages from
- # config module.
+ # config module. Also logs about the logging config if we're
+ # in debug mode.
logging_utils.initialize_logging(logging.getLogger())
config.late_logging()
- # Maybe log some info about the python interpreter itself.
+ # Log some info about the python interpreter itself if we're
+ # in debug mode.
logger.debug(
- 'Platform: %s, maxint=0x%x, byteorder=%s',
+ "Platform: %s, maxint=0x%x, byteorder=%s",
sys.platform,
sys.maxsize,
sys.byteorder,
)
- logger.debug('Python interpreter version: %s', sys.version)
- logger.debug('Python implementation: %s', sys.implementation)
- logger.debug('Python C API version: %s', sys.api_version)
+ logger.debug("Python interpreter version: %s", sys.version)
+ logger.debug("Python implementation: %s", sys.implementation)
+ logger.debug("Python C API version: %s", sys.api_version)
if __debug__:
- logger.debug('Python interpreter running in __debug__ mode.')
+ logger.debug("Python interpreter running in __debug__ mode.")
else:
- logger.debug('Python interpreter running in optimized mode.')
- logger.debug('Python path: %s', sys.path)
+ logger.debug("Python interpreter running in optimized mode.")
+ logger.debug("Python path: %s", sys.path)
+
+ # Dump some info about the physical machine we're running on
+ # if we're ing debug mode.
+ if "SC_PAGE_SIZE" in os.sysconf_names and "SC_PHYS_PAGES" in os.sysconf_names:
+ logger.debug(
+ "Physical memory: %.1fGb",
+ os.sysconf("SC_PAGE_SIZE")
+ * os.sysconf("SC_PHYS_PAGES")
+ / float(1024**3),
+ )
+ logger.debug("Logical processors: %s", os.cpu_count())
# Allow programs that don't bother to override the random seed
# to be replayed via the commandline.
import random
- random_seed = config.config['set_random_seed']
+ random_seed = config.config["set_random_seed"]
if random_seed is not None:
random_seed = random_seed[0]
else:
- random_seed = int.from_bytes(os.urandom(4), 'little')
-
- if config.config['show_random_seed']:
- msg = f'Global random seed is: {random_seed}'
+ random_seed = int.from_bytes(os.urandom(4), "little")
+ if config.config["show_random_seed"]:
+ msg = f"Global random seed is: {random_seed}"
logger.debug(msg)
print(msg)
random.seed(random_seed)
+ # Give each run a unique identifier if we're in debug mode.
+ logger.debug("This run's UUID: %s", str(uuid.uuid4()))
+
# Do it, invoke the user's code. Pay attention to how long it takes.
- logger.debug('Starting %s (program entry point)', entry_descr)
+ logger.debug(
+ "Starting %s (program entry point) ---------------------- ", entry_descr
+ )
ret = None
from pyutils import stopwatch
- if config.config['run_profiler']:
+ if config.config["run_profiler"]:
import cProfile
from pstats import SortKey
with stopwatch.Timer() as t:
ret = entry_point(*args, **kwargs)
- logger.debug('%s (program entry point) returned %s.', entry_descr, ret)
+ logger.debug("%s (program entry point) returned %s.", entry_descr, ret)
- if config.config['trace_memory']:
+ if config.config["trace_memory"]:
snapshot = tracemalloc.take_snapshot()
- top_stats = snapshot.statistics('lineno')
+ top_stats = snapshot.statistics("lineno")
print()
print("--trace_memory's top 10 memory using files:")
for stat in top_stats[:10]:
print(stat)
- if config.config['dump_all_objects']:
+ if config.config["dump_all_objects"]:
dump_all_objects()
- if config.config['audit_import_events']:
+ if config.config["audit_import_events"]:
if IMPORT_INTERCEPTOR is not None:
print(IMPORT_INTERCEPTOR.tree)
walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
logger.debug(
- '\n'
- 'user: %.4fs\n'
- 'system: %.4fs\n'
- 'child user: %.4fs\n'
- 'child system: %.4fs\n'
- 'machine uptime: %.4fs\n'
- 'walltime: %.4fs',
+ "\n"
+ "user: %.4fs\n"
+ "system: %.4fs\n"
+ "child user: %.4fs\n"
+ "child system: %.4fs\n"
+ "machine uptime: %.4fs\n"
+ "walltime: %.4fs",
utime,
stime,
cutime,
# If it doesn't return cleanly, call attention to the return value.
if ret is not None and ret != 0:
- logger.error('Exit %s', ret)
+ logger.error("Exit %s", ret)
else:
- logger.debug('Exit %s', ret)
+ logger.debug("Exit %s", ret)
sys.exit(ret)
return initialize_wrapper