import functools
import logging
import os
+import pdb
import sys
-import time
import traceback
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
-import argparse_utils
+
+from argparse_utils import ActionNoYes
import config
'Args related to python program bootstrapper and Swiss army knife')
args.add_argument(
'--debug_unhandled_exceptions',
- action=argparse_utils.ActionNoYes,
+ action=ActionNoYes,
default=False,
- help='Break into debugger on top level unhandled exceptions for interactive debugging'
+ help='Break into pdb on top level unhandled exceptions.'
)
traceback.print_exception(exc_type, exc_value, exc_traceback)
if config.config['debug_unhandled_exceptions']:
logger.info("Invoking the debugger...")
- breakpoint()
+ pdb.pm()
-def initialize(funct):
- import logging_utils
+def initialize(entry_point):
"""Remember to initialize config and logging before running main."""
- @functools.wraps(funct)
+ @functools.wraps(entry_point)
def initialize_wrapper(*args, **kwargs):
sys.excepthook = handle_uncaught_exception
- config.parse()
+ config.parse(entry_point.__globals__['__file__'])
+
+ import logging_utils
logging_utils.initialize_logging(logging.getLogger())
+
config.late_logging()
- logger.debug(f'Starting {funct.__name__}')
- start = time.perf_counter()
- ret = funct(*args, **kwargs)
- end = time.perf_counter()
- logger.debug(f'{funct} returned {ret}.')
+
+ logger.debug(f'Starting {entry_point.__name__} (program entry point)')
+
+ ret = None
+ import timer
+ with timer.Timer() as t:
+ ret = entry_point(*args, **kwargs)
+ logger.debug(
+ f'{entry_point.__name__} (program entry point) returned {ret}.'
+ )
+
+ walltime = t()
(utime, stime, cutime, cstime, elapsed_time) = os.times()
- logger.debug(f'\nuser: {utime}s\n'
+ logger.debug(f'\n'
+ f'user: {utime}s\n'
f'system: {stime}s\n'
f'child user: {cutime}s\n'
f'child system: {cstime}s\n'
f'elapsed: {elapsed_time}s\n'
- f'walltime: {end - start}s\n')
+ f'walltime: {walltime}s\n')
if ret != 0:
logger.info(f'Exit {ret}')
else:
# A global parser that we will collect arguments into.
+prog = os.path.basename(sys.argv[0])
args = argparse.ArgumentParser(
- description=f"This program uses config.py ({__file__}) for global, cross-module configuration.",
+ description=None,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
fromfile_prefix_chars="@",
+ epilog=f'-----------------------------------------------------------------------------\n{prog} uses config.py ({__file__}) for global, cross-module configuration setup and parsing.\n-----------------------------------------------------------------------------'
)
config_parse_called = False
return False
-def parse() -> Dict[str, Any]:
- import string_utils
-
+def parse(entry_module: str) -> Dict[str, Any]:
"""Main program should call this early in main()"""
global config_parse_called
if config_parse_called:
config_parse_called = True
global saved_messages
+ # If we're about to do the usage message dump, put the main module's
+ # argument group first in the list, please.
+ reordered_action_groups = []
+ prog = sys.argv[0]
+ for arg in sys.argv:
+ if arg == '--help' or arg == '-h':
+ print(entry_module)
+ for group in args._action_groups:
+ if entry_module in group.title or prog in group.title:
+ reordered_action_groups.insert(0, group)
+ else:
+ reordered_action_groups.append(group)
+ args._action_groups = reordered_action_groups
+
# Examine the environment variables to settings that match
- # known flags.
+ # known flags. For a flag called --example_flag the corresponding
+ # environment variable would be called EXAMPLE_FLAG.
usage_message = args.format_usage()
optional = False
var = ''
saved_messages.append(
f'Initialized from environment: {var} = {value}'
)
- if len(chunks) == 1 and string_utils.to_bool(value):
+ from string_utils import to_bool
+ if len(chunks) == 1 and to_bool(value):
sys.argv.append(var)
elif len(chunks) > 1:
sys.argv.append(var)
next
# Parse (possibly augmented) commandline args with argparse normally.
- #config.update(vars(args.parse_args()))
known, unknown = args.parse_known_args()
config.update(vars(known))
# Reconstruct the argv with unrecognized flags for the benefit of
- # future argument parsers.
+ # future argument parsers. For example, unittest_main in python
+ # has some of its own flags. If we didn't recognize it, maybe
+ # someone else will.
sys.argv = sys.argv[:1] + unknown
if config['config_savefile']:
)
-def datetime_to_date(date: datetime.datetime) -> datetime.date:
- return datetime.date(
- date.year,
- date.month,
- date.day
- )
+def datetime_to_date_and_time(
+ dt: datetime.datetime
+) -> Tuple[datetime.date, datetime.time]:
+ return (dt.date(), dt.timetz())
+
+
+def datetime_to_date(dt: datetime.datetime) -> datetime.date:
+ return datetime_to_date_and_time(dt)[0]
+
+
+def datetime_to_time(dt: datetime.datetime) -> datetime.time:
+ return datetime_to_date_and_time(dt)[1]
# An enum to represent units with which we can compute deltas.
def parse_duration(duration: str) -> int:
"""Parse a duration in string form."""
+ if duration.isdigit():
+ return int(duration)
seconds = 0
m = re.search(r'(\d+) *d[ays]*', duration)
if m is not None:
tries: int,
*,
predicate: Callable[..., bool],
- delay_sec: float = 3,
+ delay_sec: float = 3.0,
backoff: float = 2.0,
):
"""Retries a function or method up to a certain number of times
delay_sec sets the initial delay period in seconds.
backoff is a multiplied (must be >1) used to modify the delay.
predicate is a function that will be passed the retval of the
- decorated function and must return True to stop or False to
- retry.
+ decorated function and must return True to stop or False to
+ retry.
"""
- if backoff < 1:
+ if backoff < 1.0:
msg = f"backoff must be greater than or equal to 1, got {backoff}"
logger.critical(msg)
raise ValueError(msg)
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
+ logger.debug(f'deco_retry: will make up to {mtries} attempts...')
retval = f(*args, **kwargs)
while mtries > 0:
if predicate(retval) is True:
+ logger.debug('Predicate succeeded, deco_retry is done.')
return retval
logger.debug("Predicate failed, sleeping and retrying.")
mtries -= 1
import shlex
import subprocess
-from typing import List
+from typing import List, Optional
-def cmd_with_timeout(command: str, timeout_seconds: float) -> int:
+def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
return subprocess.check_call(
["/bin/bash", "-c", command], timeout=timeout_seconds
)
import hashlib
import logging
import os
+import pathlib
import time
from typing import Optional
import glob
return describe_file_timestamp(filename, lambda x: x.st_mtime, brief=brief)
+def touch_file(filename: str) -> bool:
+ return pathlib.Path(filename).touch()
+
+
def expand_globs(in_filename: str):
for filename in glob.glob(in_filename):
yield filename
--- /dev/null
+#!/usr/bin/env python3
+
+from dataclasses import dataclass
+import datetime
+import json
+import logging
+import os
+import signal
+import sys
+from typing import Optional
+
+import decorator_utils
+
+
+logger = logging.getLogger(__name__)
+
+
+class LockFileException(Exception):
+ pass
+
+
+@dataclass
+class LockFileContents:
+ pid: int
+ commandline: str
+ expiration_timestamp: float
+
+
+class LockFile(object):
+ """A file locking mechanism that has context-manager support so you
+ can use it in a with statement.
+ """
+
+ def __init__(
+ self,
+ lockfile_path: str,
+ *,
+ do_signal_cleanup: bool = True,
+ expiration_timestamp: Optional[float] = None,
+ ) -> None:
+ self.is_locked = False
+ self.lockfile = lockfile_path
+ if do_signal_cleanup:
+ signal.signal(signal.SIGINT, self._signal)
+ signal.signal(signal.SIGTERM, self._signal)
+ self.expiration_timestamp = expiration_timestamp
+
+ def locked(self):
+ return self.is_locked
+
+ def available(self):
+ return not os.path.exists(self.lockfile)
+
+ def try_acquire_lock_once(self) -> bool:
+ logger.debug(f"Trying to acquire {self.lockfile}.")
+ try:
+ # Attempt to create the lockfile. These flags cause
+ # os.open to raise an OSError if the file already
+ # exists.
+ fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
+ with os.fdopen(fd, "a") as f:
+ contents = self._get_lockfile_contents()
+ logger.debug(contents)
+ f.write(contents)
+ logger.debug(f'Success; I own {self.lockfile}.')
+ self.is_locked = True
+ return True
+ except OSError:
+ pass
+ logger.debug(f'Failed; I could not acquire {self.lockfile}.')
+ return False
+
+ def acquire_with_retries(
+ self,
+ *,
+ initial_delay: float = 1.0,
+ backoff_factor: float = 2.0,
+ max_attempts = 5
+ ) -> bool:
+
+ @decorator_utils.retry_if_false(tries = max_attempts,
+ delay_sec = initial_delay,
+ backoff = backoff_factor)
+ def _try_acquire_lock_with_retries() -> bool:
+ success = self.try_acquire_lock_once()
+ if not success and os.path.exists(self.lockfile):
+ self._detect_stale_lockfile()
+ return success
+
+ if os.path.exists(self.lockfile):
+ self._detect_stale_lockfile()
+ return _try_acquire_lock_with_retries()
+
+ def release(self):
+ try:
+ os.unlink(self.lockfile)
+ except Exception as e:
+ logger.exception(e)
+ self.is_locked = False
+
+ def __enter__(self):
+ if self.acquire_with_retries():
+ return self
+ msg = f"Couldn't acquire {self.lockfile}; giving up."
+ logger.warning(msg)
+ raise LockFileException(msg)
+
+ def __exit__(self, type, value, traceback):
+ self.release()
+
+ def __del__(self):
+ if self.is_locked:
+ self.release()
+
+ def _signal(self, *args):
+ if self.is_locked:
+ self.release()
+
+ def _get_lockfile_contents(self) -> str:
+ contents = LockFileContents(
+ pid = os.getpid(),
+ commandline = ' '.join(sys.argv),
+ expiration_timestamp = self.expiration_timestamp
+ )
+ return json.dumps(contents.__dict__)
+
+ def _detect_stale_lockfile(self) -> None:
+ try:
+ with open(self.lockfile, 'r') as rf:
+ lines = rf.readlines()
+ if len(lines) == 1:
+ line = lines[0]
+ line_dict = json.loads(line)
+ contents = LockFileContents(**line_dict)
+ logger.debug(f'Blocking lock contents="{contents}"')
+
+ # Does the PID exist still?
+ try:
+ os.kill(contents.pid, 0)
+ except OSError:
+ logger.debug('The pid seems stale; killing the lock.')
+ self.release()
+
+ # Has the lock expiration expired?
+ if contents.expiration_timestamp is not None:
+ now = datetime.datetime.now().timestamp()
+ if now > contents.expiration_datetime:
+ logger.debug('The expiration time has passed; ' +
+ 'killing the lock')
+ self.release()
+ except Exception:
+ pass
cfg.add_argument(
'--logging_format',
type=str,
- default='%(levelname)s:%(asctime)s: %(message)s',
+ default='%(levelname).1s:%(asctime)s: %(message)s',
help='The format for lines logged via the logger module.'
)
cfg.add_argument(
#!/usr/bin/env python3
+import functools
import math
from typing import List
from heapq import heappush, heappop
return int(n * multiplier) / multiplier
def is_prime(n: int) -> bool:
"""Returns True if n is prime and False otherwise"""
if not isinstance(n, int):
word = justify_string(word, width=width, alignment='l')
out += f'{word} '
yield out
+
+
+class Indenter:
+ """
+ with Indenter() as i:
+ i.print('test')
+ with i:
+ i.print('-ing')
+ with i:
+ i.print('1, 2, 3')
+ """
+ def __init__(self):
+ self.level = -1
+
+ def __enter__(self):
+ self.level += 1
+ return self
+
+ def __exit__(self, exc_type, exc_value, exc_tb):
+ self.level -= 1
+ if self.level < -1:
+ self.level = -1
+
+ def print(self, *arg, **kwargs):
+ import string_utils
+ text = string_utils.sprintf(*arg, **kwargs)
+ print(" " * self.level + text)
--- /dev/null
+#!/usr/bin/env python3
+
+import time
+from typing import Callable
+
+
+class Timer(object):
+ """
+ with timer.Timer() as t:
+ do_the_thing()
+
+ walltime = t()
+ print(f'That took {walltime}s.')
+ """
+
+ def __init__(self) -> None:
+ self.start = None
+ self.end = None
+ pass
+
+ def __enter__(self) -> Callable[[], float]:
+ self.start = time.perf_counter()
+ self.end = 0.0
+ return lambda: self.end - self.start
+
+ def __exit__(self, *args) -> bool:
+ self.end = time.perf_counter()
+ return True