From: Scott Gasch Date: Tue, 13 Jul 2021 03:52:49 +0000 (-0700) Subject: Random changes. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=97fbe845e5dfdbda22521117c1783e1fd8515952;p=python_utils.git Random changes. --- diff --git a/bootstrap.py b/bootstrap.py index da421b6..3c886ef 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -3,13 +3,14 @@ import functools import logging import os +import pdb import sys -import time import traceback # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. -import argparse_utils + +from argparse_utils import ActionNoYes import config @@ -20,9 +21,9 @@ args = config.add_commandline_args( 'Args related to python program bootstrapper and Swiss army knife') args.add_argument( '--debug_unhandled_exceptions', - action=argparse_utils.ActionNoYes, + action=ActionNoYes, default=False, - help='Break into debugger on top level unhandled exceptions for interactive debugging' + help='Break into pdb on top level unhandled exceptions.' ) @@ -38,31 +39,41 @@ def handle_uncaught_exception( traceback.print_exception(exc_type, exc_value, exc_traceback) if config.config['debug_unhandled_exceptions']: logger.info("Invoking the debugger...") - breakpoint() + pdb.pm() -def initialize(funct): - import logging_utils +def initialize(entry_point): """Remember to initialize config and logging before running main.""" - @functools.wraps(funct) + @functools.wraps(entry_point) def initialize_wrapper(*args, **kwargs): sys.excepthook = handle_uncaught_exception - config.parse() + config.parse(entry_point.__globals__['__file__']) + + import logging_utils logging_utils.initialize_logging(logging.getLogger()) + config.late_logging() - logger.debug(f'Starting {funct.__name__}') - start = time.perf_counter() - ret = funct(*args, **kwargs) - end = time.perf_counter() - logger.debug(f'{funct} returned {ret}.') + + logger.debug(f'Starting {entry_point.__name__} (program entry point)') + + ret = None + import timer + with timer.Timer() as t: + ret = entry_point(*args, **kwargs) + logger.debug( + f'{entry_point.__name__} (program entry point) returned {ret}.' + ) + + walltime = t() (utime, stime, cutime, cstime, elapsed_time) = os.times() - logger.debug(f'\nuser: {utime}s\n' + logger.debug(f'\n' + f'user: {utime}s\n' f'system: {stime}s\n' f'child user: {cutime}s\n' f'child system: {cstime}s\n' f'elapsed: {elapsed_time}s\n' - f'walltime: {end - start}s\n') + f'walltime: {walltime}s\n') if ret != 0: logger.info(f'Exit {ret}') else: diff --git a/config.py b/config.py index 672e1ae..dccfc27 100644 --- a/config.py +++ b/config.py @@ -100,10 +100,12 @@ class LoadFromFile(argparse.Action): # A global parser that we will collect arguments into. +prog = os.path.basename(sys.argv[0]) args = argparse.ArgumentParser( - description=f"This program uses config.py ({__file__}) for global, cross-module configuration.", + description=None, formatter_class=argparse.ArgumentDefaultsHelpFormatter, fromfile_prefix_chars="@", + epilog=f'-----------------------------------------------------------------------------\n{prog} uses config.py ({__file__}) for global, cross-module configuration setup and parsing.\n-----------------------------------------------------------------------------' ) config_parse_called = False @@ -155,9 +157,7 @@ def is_flag_already_in_argv(var: str): return False -def parse() -> Dict[str, Any]: - import string_utils - +def parse(entry_module: str) -> Dict[str, Any]: """Main program should call this early in main()""" global config_parse_called if config_parse_called: @@ -165,8 +165,23 @@ def parse() -> Dict[str, Any]: config_parse_called = True global saved_messages + # If we're about to do the usage message dump, put the main module's + # argument group first in the list, please. + reordered_action_groups = [] + prog = sys.argv[0] + for arg in sys.argv: + if arg == '--help' or arg == '-h': + print(entry_module) + for group in args._action_groups: + if entry_module in group.title or prog in group.title: + reordered_action_groups.insert(0, group) + else: + reordered_action_groups.append(group) + args._action_groups = reordered_action_groups + # Examine the environment variables to settings that match - # known flags. + # known flags. For a flag called --example_flag the corresponding + # environment variable would be called EXAMPLE_FLAG. usage_message = args.format_usage() optional = False var = '' @@ -193,7 +208,8 @@ def parse() -> Dict[str, Any]: saved_messages.append( f'Initialized from environment: {var} = {value}' ) - if len(chunks) == 1 and string_utils.to_bool(value): + from string_utils import to_bool + if len(chunks) == 1 and to_bool(value): sys.argv.append(var) elif len(chunks) > 1: sys.argv.append(var) @@ -204,12 +220,13 @@ def parse() -> Dict[str, Any]: next # Parse (possibly augmented) commandline args with argparse normally. - #config.update(vars(args.parse_args())) known, unknown = args.parse_known_args() config.update(vars(known)) # Reconstruct the argv with unrecognized flags for the benefit of - # future argument parsers. + # future argument parsers. For example, unittest_main in python + # has some of its own flags. If we didn't recognize it, maybe + # someone else will. sys.argv = sys.argv[:1] + unknown if config['config_savefile']: diff --git a/datetime_utils.py b/datetime_utils.py index f2cae8b..7787c6f 100644 --- a/datetime_utils.py +++ b/datetime_utils.py @@ -51,12 +51,18 @@ def date_and_time_to_datetime(date: datetime.date, ) -def datetime_to_date(date: datetime.datetime) -> datetime.date: - return datetime.date( - date.year, - date.month, - date.day - ) +def datetime_to_date_and_time( + dt: datetime.datetime +) -> Tuple[datetime.date, datetime.time]: + return (dt.date(), dt.timetz()) + + +def datetime_to_date(dt: datetime.datetime) -> datetime.date: + return datetime_to_date_and_time(dt)[0] + + +def datetime_to_time(dt: datetime.datetime) -> datetime.time: + return datetime_to_date_and_time(dt)[1] # An enum to represent units with which we can compute deltas. @@ -330,6 +336,8 @@ def minute_number_to_time_string(minute_num: MinuteOfDay) -> str: def parse_duration(duration: str) -> int: """Parse a duration in string form.""" + if duration.isdigit(): + return int(duration) seconds = 0 m = re.search(r'(\d+) *d[ays]*', duration) if m is not None: diff --git a/decorator_utils.py b/decorator_utils.py index 2817239..0d5b3e3 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -192,7 +192,7 @@ def retry_predicate( tries: int, *, predicate: Callable[..., bool], - delay_sec: float = 3, + delay_sec: float = 3.0, backoff: float = 2.0, ): """Retries a function or method up to a certain number of times @@ -202,10 +202,10 @@ def retry_predicate( delay_sec sets the initial delay period in seconds. backoff is a multiplied (must be >1) used to modify the delay. predicate is a function that will be passed the retval of the - decorated function and must return True to stop or False to - retry. + decorated function and must return True to stop or False to + retry. """ - if backoff < 1: + if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) raise ValueError(msg) @@ -225,9 +225,11 @@ def retry_predicate( @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable + logger.debug(f'deco_retry: will make up to {mtries} attempts...') retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: + logger.debug('Predicate succeeded, deco_retry is done.') return retval logger.debug("Predicate failed, sleeping and retrying.") mtries -= 1 diff --git a/exec_utils.py b/exec_utils.py index c669f54..1b58740 100644 --- a/exec_utils.py +++ b/exec_utils.py @@ -2,10 +2,10 @@ import shlex import subprocess -from typing import List +from typing import List, Optional -def cmd_with_timeout(command: str, timeout_seconds: float) -> int: +def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int: return subprocess.check_call( ["/bin/bash", "-c", command], timeout=timeout_seconds ) diff --git a/file_utils.py b/file_utils.py index d545124..464b0e7 100644 --- a/file_utils.py +++ b/file_utils.py @@ -7,6 +7,7 @@ import errno import hashlib import logging import os +import pathlib import time from typing import Optional import glob @@ -219,6 +220,10 @@ def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]: return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief) +def touch_file(filename: str) -> bool: + return pathlib.Path(filename).touch() + + def expand_globs(in_filename: str): for filename in glob.glob(in_filename): yield filename diff --git a/lockfile.py b/lockfile.py new file mode 100644 index 0000000..ee8c255 --- /dev/null +++ b/lockfile.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 + +from dataclasses import dataclass +import datetime +import json +import logging +import os +import signal +import sys +from typing import Optional + +import decorator_utils + + +logger = logging.getLogger(__name__) + + +class LockFileException(Exception): + pass + + +@dataclass +class LockFileContents: + pid: int + commandline: str + expiration_timestamp: float + + +class LockFile(object): + """A file locking mechanism that has context-manager support so you + can use it in a with statement. + """ + + def __init__( + self, + lockfile_path: str, + *, + do_signal_cleanup: bool = True, + expiration_timestamp: Optional[float] = None, + ) -> None: + self.is_locked = False + self.lockfile = lockfile_path + if do_signal_cleanup: + signal.signal(signal.SIGINT, self._signal) + signal.signal(signal.SIGTERM, self._signal) + self.expiration_timestamp = expiration_timestamp + + def locked(self): + return self.is_locked + + def available(self): + return not os.path.exists(self.lockfile) + + def try_acquire_lock_once(self) -> bool: + logger.debug(f"Trying to acquire {self.lockfile}.") + try: + # Attempt to create the lockfile. These flags cause + # os.open to raise an OSError if the file already + # exists. + fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) + with os.fdopen(fd, "a") as f: + contents = self._get_lockfile_contents() + logger.debug(contents) + f.write(contents) + logger.debug(f'Success; I own {self.lockfile}.') + self.is_locked = True + return True + except OSError: + pass + logger.debug(f'Failed; I could not acquire {self.lockfile}.') + return False + + def acquire_with_retries( + self, + *, + initial_delay: float = 1.0, + backoff_factor: float = 2.0, + max_attempts = 5 + ) -> bool: + + @decorator_utils.retry_if_false(tries = max_attempts, + delay_sec = initial_delay, + backoff = backoff_factor) + def _try_acquire_lock_with_retries() -> bool: + success = self.try_acquire_lock_once() + if not success and os.path.exists(self.lockfile): + self._detect_stale_lockfile() + return success + + if os.path.exists(self.lockfile): + self._detect_stale_lockfile() + return _try_acquire_lock_with_retries() + + def release(self): + try: + os.unlink(self.lockfile) + except Exception as e: + logger.exception(e) + self.is_locked = False + + def __enter__(self): + if self.acquire_with_retries(): + return self + msg = f"Couldn't acquire {self.lockfile}; giving up." + logger.warning(msg) + raise LockFileException(msg) + + def __exit__(self, type, value, traceback): + self.release() + + def __del__(self): + if self.is_locked: + self.release() + + def _signal(self, *args): + if self.is_locked: + self.release() + + def _get_lockfile_contents(self) -> str: + contents = LockFileContents( + pid = os.getpid(), + commandline = ' '.join(sys.argv), + expiration_timestamp = self.expiration_timestamp + ) + return json.dumps(contents.__dict__) + + def _detect_stale_lockfile(self) -> None: + try: + with open(self.lockfile, 'r') as rf: + lines = rf.readlines() + if len(lines) == 1: + line = lines[0] + line_dict = json.loads(line) + contents = LockFileContents(**line_dict) + logger.debug(f'Blocking lock contents="{contents}"') + + # Does the PID exist still? + try: + os.kill(contents.pid, 0) + except OSError: + logger.debug('The pid seems stale; killing the lock.') + self.release() + + # Has the lock expiration expired? + if contents.expiration_timestamp is not None: + now = datetime.datetime.now().timestamp() + if now > contents.expiration_datetime: + logger.debug('The expiration time has passed; ' + + 'killing the lock') + self.release() + except Exception: + pass diff --git a/logging_utils.py b/logging_utils.py index 9c78f3f..328ea6f 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -36,7 +36,7 @@ cfg.add_argument( cfg.add_argument( '--logging_format', type=str, - default='%(levelname)s:%(asctime)s: %(message)s', + default='%(levelname).1s:%(asctime)s: %(message)s', help='The format for lines logged via the logger module.' ) cfg.add_argument( diff --git a/math_utils.py b/math_utils.py index 2e12699..56fb707 100644 --- a/math_utils.py +++ b/math_utils.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import functools import math from typing import List from heapq import heappush, heappop @@ -60,6 +61,7 @@ def truncate_float(n: float, decimals: int = 2): return int(n * multiplier) / multiplier +@functools.lru_cache(maxsize=1024, typed=True) def is_prime(n: int) -> bool: """Returns True if n is prime and False otherwise""" if not isinstance(n, int): diff --git a/text_utils.py b/text_utils.py index 76b5db6..93e4b63 100644 --- a/text_utils.py +++ b/text_utils.py @@ -167,3 +167,30 @@ def generate_padded_columns(text: List[str]) -> str: word = justify_string(word, width=width, alignment='l') out += f'{word} ' yield out + + +class Indenter: + """ + with Indenter() as i: + i.print('test') + with i: + i.print('-ing') + with i: + i.print('1, 2, 3') + """ + def __init__(self): + self.level = -1 + + def __enter__(self): + self.level += 1 + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self.level -= 1 + if self.level < -1: + self.level = -1 + + def print(self, *arg, **kwargs): + import string_utils + text = string_utils.sprintf(*arg, **kwargs) + print(" " * self.level + text) diff --git a/timer.py b/timer.py new file mode 100644 index 0000000..752c7ed --- /dev/null +++ b/timer.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 + +import time +from typing import Callable + + +class Timer(object): + """ + with timer.Timer() as t: + do_the_thing() + + walltime = t() + print(f'That took {walltime}s.') + """ + + def __init__(self) -> None: + self.start = None + self.end = None + pass + + def __enter__(self) -> Callable[[], float]: + self.start = time.perf_counter() + self.end = 0.0 + return lambda: self.end - self.start + + def __exit__(self, *args) -> bool: + self.end = time.perf_counter() + return True