X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=src%2Fpyutils%2Ffiles%2Flockfile.py;h=0febca6bf5b754682f5e91f8604c70e566fd8a0d;hb=993b0992473c12294ed659e52b532e1c8cf9cd1e;hp=11bb1001156127eaa5ded750d6fadb74f77884fe;hpb=69566c003b4f1c3a4905f37d3735d7921502d14a;p=pyutils.git diff --git a/src/pyutils/files/lockfile.py b/src/pyutils/files/lockfile.py index 11bb100..0febca6 100644 --- a/src/pyutils/files/lockfile.py +++ b/src/pyutils/files/lockfile.py @@ -2,7 +2,14 @@ # © Copyright 2021-2022, Scott Gasch -"""File-based locking helper.""" +"""This is a lockfile implementation I created for use with cronjobs +on my machine to prevent multiple copies of a job from running in +parallel. When one job is running this code keeps a file on disk to +indicate a lock is held. Other copies will fail to start if they +detect this lock until the lock is released. There are provisions in +the code for timing out locks, cleaning up a lock when a signal is +received, gracefully retrying lock acquisition on failure, etc... +""" from __future__ import annotations @@ -17,15 +24,15 @@ import warnings from dataclasses import dataclass from typing import Literal, Optional -from pyutils import config, decorator_utils +from pyutils import argparse_utils, config, decorator_utils from pyutils.datetimez import datetime_utils cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles') cfg.add_argument( - '--lockfile_held_duration_warning_threshold_sec', - type=float, - default=60.0, - metavar='SECONDS', + '--lockfile_held_duration_warning_threshold', + type=argparse_utils.valid_duration, + default=datetime.timedelta(60.0), + metavar='DURATION', help='If a lock is held for longer than this threshold we log a warning', ) logger = logging.getLogger(__name__) @@ -91,11 +98,11 @@ class LockFile(contextlib.AbstractContextManager): signal.signal(signal.SIGTERM, self._signal) self.expiration_timestamp = expiration_timestamp - def locked(self): + def locked(self) -> bool: """Is it locked currently?""" return self.is_locked - def available(self): + def available(self) -> bool: """Is it available currently?""" return not os.path.exists(self.lockfile) @@ -157,7 +164,7 @@ class LockFile(contextlib.AbstractContextManager): self._detect_stale_lockfile() return _try_acquire_lock_with_retries() - def release(self): + def release(self) -> None: """Release the lock""" try: os.unlink(self.lockfile) @@ -179,7 +186,9 @@ class LockFile(contextlib.AbstractContextManager): duration = ts - self.locktime if ( duration - >= config.config['lockfile_held_duration_warning_threshold_sec'] + >= config.config[ + 'lockfile_held_duration_warning_threshold' + ].total_seconds() ): # Note: describe duration briefly only does 1s granularity... str_duration = datetime_utils.describe_duration_briefly(int(duration))