#!/usr/bin/env python3 from dataclasses import dataclass import datetime import json import logging import os import signal import sys from typing import Optional import config import datetime_utils import decorator_utils cfg = config.add_commandline_args( f'Lockfile ({__file__})', 'Args related to lockfiles') cfg.add_argument( '--lockfile_held_duration_warning_threshold_sec', type=float, default=10.0, metavar='SECONDS', help='If a lock is held for longer than this threshold we log a warning' ) logger = logging.getLogger(__name__) class LockFileException(Exception): pass @dataclass class LockFileContents: pid: int commandline: str expiration_timestamp: float class LockFile(object): """A file locking mechanism that has context-manager support so you can use it in a with statement. e.g. with LockFile('./foo.lock'): # do a bunch of stuff... if the process dies we have a signal # handler to do cleanup. Other code (in this process or another) # that tries to take the same lockfile will block. There is also # some logic for detecting stale locks. """ def __init__( self, lockfile_path: str, *, do_signal_cleanup: bool = True, expiration_timestamp: Optional[float] = None, override_command: Optional[str] = None, ) -> None: self.is_locked = False self.lockfile = lockfile_path self.override_command = override_command if do_signal_cleanup: signal.signal(signal.SIGINT, self._signal) signal.signal(signal.SIGTERM, self._signal) self.expiration_timestamp = expiration_timestamp def locked(self): return self.is_locked def available(self): return not os.path.exists(self.lockfile) def try_acquire_lock_once(self) -> bool: logger.debug(f"Trying to acquire {self.lockfile}.") try: # Attempt to create the lockfile. These flags cause # os.open to raise an OSError if the file already # exists. fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) with os.fdopen(fd, "a") as f: contents = self._get_lockfile_contents() logger.debug(contents) f.write(contents) logger.debug(f'Success; I own {self.lockfile}.') self.is_locked = True return True except OSError: pass logger.warning(f'Could not acquire {self.lockfile}.') return False def acquire_with_retries( self, *, initial_delay: float = 1.0, backoff_factor: float = 2.0, max_attempts = 5 ) -> bool: @decorator_utils.retry_if_false(tries = max_attempts, delay_sec = initial_delay, backoff = backoff_factor) def _try_acquire_lock_with_retries() -> bool: success = self.try_acquire_lock_once() if not success and os.path.exists(self.lockfile): self._detect_stale_lockfile() return success if os.path.exists(self.lockfile): self._detect_stale_lockfile() return _try_acquire_lock_with_retries() def release(self): try: os.unlink(self.lockfile) except Exception as e: logger.exception(e) self.is_locked = False def __enter__(self): if self.acquire_with_retries(): self.locktime = datetime.datetime.now().timestamp() return self msg = f"Couldn't acquire {self.lockfile}; giving up." logger.warning(msg) raise LockFileException(msg) def __exit__(self, type, value, traceback): if self.locktime: ts = datetime.datetime.now().timestamp() duration = ts - self.locktime if duration >= config.config['lockfile_held_duration_warning_threshold_sec']: str_duration = datetime_utils.describe_duration_briefly(duration) logger.warning(f'Held {self.lockfile} for {str_duration}') self.release() def __del__(self): if self.is_locked: self.release() def _signal(self, *args): if self.is_locked: self.release() def _get_lockfile_contents(self) -> str: if self.override_command: cmd = self.override_command else: cmd = ' '.join(sys.argv) print(cmd) contents = LockFileContents( pid = os.getpid(), commandline = cmd, expiration_timestamp = self.expiration_timestamp, ) return json.dumps(contents.__dict__) def _detect_stale_lockfile(self) -> None: try: with open(self.lockfile, 'r') as rf: lines = rf.readlines() if len(lines) == 1: line = lines[0] line_dict = json.loads(line) contents = LockFileContents(**line_dict) logger.debug(f'Blocking lock contents="{contents}"') # Does the PID exist still? try: os.kill(contents.pid, 0) except OSError: logger.warning(f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; ' + 'force acquiring') self.release() # Has the lock expiration expired? if contents.expiration_timestamp is not None: now = datetime.datetime.now().timestamp() if now > contents.expiration_datetime: logger.warning(f'Lockfile {self.lockfile} expiration time has passed; ' + 'force acquiring') self.release() except Exception: pass