import decorator_utils
-cfg = config.add_commandline_args(
- f'Lockfile ({__file__})',
- 'Args related to lockfiles')
+cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
cfg.add_argument(
'--lockfile_held_duration_warning_threshold_sec',
type=float,
default=10.0,
metavar='SECONDS',
- help='If a lock is held for longer than this threshold we log a warning'
+ help='If a lock is held for longer than this threshold we log a warning',
)
logger = logging.getLogger(__name__)
# some logic for detecting stale locks.
"""
+
def __init__(
- self,
- lockfile_path: str,
- *,
- do_signal_cleanup: bool = True,
- expiration_timestamp: Optional[float] = None,
- override_command: Optional[str] = None,
+ self,
+ lockfile_path: str,
+ *,
+ do_signal_cleanup: bool = True,
+ expiration_timestamp: Optional[float] = None,
+ override_command: Optional[str] = None,
) -> None:
self.is_locked = False
self.lockfile = lockfile_path
return False
def acquire_with_retries(
- self,
- *,
- initial_delay: float = 1.0,
- backoff_factor: float = 2.0,
- max_attempts = 5
+ self,
+ *,
+ initial_delay: float = 1.0,
+ backoff_factor: float = 2.0,
+ max_attempts=5,
) -> bool:
-
- @decorator_utils.retry_if_false(tries = max_attempts,
- delay_sec = initial_delay,
- backoff = backoff_factor)
+ @decorator_utils.retry_if_false(
+ tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
+ )
def _try_acquire_lock_with_retries() -> bool:
success = self.try_acquire_lock_once()
if not success and os.path.exists(self.lockfile):
if self.locktime:
ts = datetime.datetime.now().timestamp()
duration = ts - self.locktime
- if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
+ if (
+ duration
+ >= config.config['lockfile_held_duration_warning_threshold_sec']
+ ):
str_duration = datetime_utils.describe_duration_briefly(duration)
msg = f'Held {self.lockfile} for {str_duration}'
logger.warning(msg)
else:
cmd = ' '.join(sys.argv)
contents = LockFileContents(
- pid = os.getpid(),
- commandline = cmd,
- expiration_timestamp = self.expiration_timestamp,
+ pid=os.getpid(),
+ commandline=cmd,
+ expiration_timestamp=self.expiration_timestamp,
)
return json.dumps(contents.__dict__)