from typing import Literal, Optional
from pyutils import argparse_utils, config, decorator_utils
-from pyutils.datetimez import datetime_utils
+from pyutils.datetimes import datetime_utils
-cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
+cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
cfg.add_argument(
- '--lockfile_held_duration_warning_threshold',
+ "--lockfile_held_duration_warning_threshold",
type=argparse_utils.valid_duration,
default=datetime.timedelta(60.0),
- metavar='DURATION',
- help='If a lock is held for longer than this threshold we log a warning',
+ metavar="DURATION",
+ help="If a lock is held for longer than this threshold we log a warning",
)
logger = logging.getLogger(__name__)
"""
self.is_locked: bool = False
self.lockfile: str = lockfile_path
- self.locktime: Optional[int] = None
+ self.locktime: Optional[float] = None
self.override_command: Optional[str] = override_command
if do_signal_cleanup:
signal.signal(signal.SIGINT, self._signal)
contents = self._get_lockfile_contents()
logger.debug(contents)
f.write(contents)
- logger.debug('Success; I own %s.', self.lockfile)
+ self.locktime = datetime.datetime.now().timestamp()
+ logger.debug("Success; I own %s.", self.lockfile)
self.is_locked = True
return True
except OSError:
pass
- logger.warning('Couldn\'t acquire %s.', self.lockfile)
+ logger.warning("Couldn't acquire %s.", self.lockfile)
return False
def acquire_with_retries(
def __enter__(self):
if self.acquire_with_retries():
- self.locktime = datetime.datetime.now().timestamp()
return self
- msg = f"Couldn't acquire {self.lockfile}; giving up."
+ contents = self._get_lockfile_contents()
+ msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
logger.warning(msg)
raise LockFileException(msg)
if (
duration
>= config.config[
- 'lockfile_held_duration_warning_threshold'
+ "lockfile_held_duration_warning_threshold"
].total_seconds()
):
# Note: describe duration briefly only does 1s granularity...
str_duration = datetime_utils.describe_duration_briefly(int(duration))
- msg = f'Held {self.lockfile} for {str_duration}'
+ msg = f"Held {self.lockfile} for {str_duration}"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
self.release()
if self.override_command:
cmd = self.override_command
else:
- cmd = ' '.join(sys.argv)
+ cmd = " ".join(sys.argv)
contents = LockFileContents(
pid=os.getpid(),
commandline=cmd,
def _detect_stale_lockfile(self) -> None:
try:
- with open(self.lockfile, 'r') as rf:
+ with open(self.lockfile, "r") as rf:
lines = rf.readlines()
if len(lines) == 1:
line = lines[0]
os.kill(contents.pid, 0)
except OSError:
logger.warning(
- 'Lockfile %s\'s pid (%d) is stale; force acquiring...',
+ "Lockfile %s's pid (%d) is stale; force acquiring...",
self.lockfile,
contents.pid,
)
now = datetime.datetime.now().timestamp()
if now > contents.expiration_timestamp:
logger.warning(
- 'Lockfile %s\'s expiration time has passed; force acquiring',
+ "Lockfile %s's expiration time has passed; force acquiring",
self.lockfile,
)
self.release()