3 # © Copyright 2021-2022, Scott Gasch
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel. When one job is running this code keeps a file on disk to
8 indicate a lock is held. Other copies will fail to start if they
9 detect this lock until the lock is released. There are provisions in
10 the code for timing out locks, cleaning up a lock when a signal is
11 received, gracefully retrying lock acquisition on failure, etc...
14 from __future__ import annotations
24 from dataclasses import dataclass
25 from typing import Literal, Optional
27 from pyutils import argparse_utils, config, decorator_utils
28 from pyutils.datetimes import datetime_utils
30 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
32 "--lockfile_held_duration_warning_threshold",
33 type=argparse_utils.valid_duration,
34 default=datetime.timedelta(60.0),
36 help="If a lock is held for longer than this threshold we log a warning",
38 logger = logging.getLogger(__name__)
41 class LockFileException(Exception):
42 """An exception related to lock files."""
48 class LockFileContents:
49 """The contents we'll write to each lock file."""
52 """The pid of the process that holds the lock"""
55 """The commandline of the process that holds the lock"""
57 expiration_timestamp: Optional[float]
58 """When this lock will expire as seconds since Epoch"""
61 class LockFile(contextlib.AbstractContextManager):
62 """A file locking mechanism that has context-manager support so you
63 can use it in a with statement. e.g.::
65 with LockFile('./foo.lock'):
66 # do a bunch of stuff... if the process dies we have a signal
67 # handler to do cleanup. Other code (in this process or another)
68 # that tries to take the same lockfile will block. There is also
69 # some logic for detecting stale locks.
76 do_signal_cleanup: bool = True,
77 expiration_timestamp: Optional[float] = None,
78 override_command: Optional[str] = None,
83 lockfile_path: path of the lockfile to acquire
84 do_signal_cleanup: handle SIGINT and SIGTERM events by
85 releasing the lock before exiting
86 expiration_timestamp: when our lease on the lock should
87 expire (as seconds since the Epoch). None means the
88 lock will not expire until we explicltly release it.
89 override_command: don't use argv to determine our commandline
90 rather use this instead if provided.
92 self.is_locked: bool = False
93 self.lockfile: str = lockfile_path
94 self.locktime: Optional[float] = None
95 self.override_command: Optional[str] = override_command
97 signal.signal(signal.SIGINT, self._signal)
98 signal.signal(signal.SIGTERM, self._signal)
99 self.expiration_timestamp = expiration_timestamp
101 def locked(self) -> bool:
102 """Is it locked currently?"""
103 return self.is_locked
105 def available(self) -> bool:
106 """Is it available currently?"""
107 return not os.path.exists(self.lockfile)
109 def try_acquire_lock_once(self) -> bool:
110 """Attempt to acquire the lock with no blocking.
113 True if the lock was acquired and False otherwise.
115 logger.debug("Trying to acquire %s.", self.lockfile)
117 # Attempt to create the lockfile. These flags cause
118 # os.open to raise an OSError if the file already
120 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
121 with os.fdopen(fd, "a") as f:
122 contents = self._get_lockfile_contents()
123 logger.debug(contents)
125 self.locktime = datetime.datetime.now().timestamp()
126 logger.debug("Success; I own %s.", self.lockfile)
127 self.is_locked = True
131 logger.warning("Couldn't acquire %s.", self.lockfile)
134 def acquire_with_retries(
137 initial_delay: float = 1.0,
138 backoff_factor: float = 2.0,
141 """Attempt to acquire the lock repeatedly with retries and backoffs.
144 initial_delay: how long to wait before retrying the first time
145 backoff_factor: a float >= 1.0 the multiples the current retry
146 delay each subsequent time we attempt to acquire and fail
148 max_attempts: maximum number of times to try before giving up
152 True if the lock was acquired and False otherwise.
155 @decorator_utils.retry_if_false(
156 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
158 def _try_acquire_lock_with_retries() -> bool:
159 success = self.try_acquire_lock_once()
160 if not success and os.path.exists(self.lockfile):
161 self._detect_stale_lockfile()
164 if os.path.exists(self.lockfile):
165 self._detect_stale_lockfile()
166 return _try_acquire_lock_with_retries()
168 def release(self) -> None:
169 """Release the lock"""
171 os.unlink(self.lockfile)
172 except Exception as e:
174 self.is_locked = False
177 if self.acquire_with_retries():
179 raw_contents = self._read_lockfile()
181 contents = LockFileContents(**json.loads(raw_contents))
182 msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
184 msg = "Couldn't acquire lockfile; giving up."
186 raise LockFileException(msg)
188 def __exit__(self, _, value, traceback) -> Literal[False]:
190 ts = datetime.datetime.now().timestamp()
191 duration = ts - self.locktime
195 "lockfile_held_duration_warning_threshold"
198 # Note: describe duration briefly only does 1s granularity...
199 str_duration = datetime_utils.describe_duration_briefly(int(duration))
200 msg = f"Held {self.lockfile} for {str_duration}"
202 warnings.warn(msg, stacklevel=2)
210 def _signal(self, *args):
214 def _get_lockfile_contents(self) -> str:
215 if self.override_command:
216 cmd = self.override_command
218 cmd = " ".join(sys.argv)
219 contents = LockFileContents(
222 expiration_timestamp=self.expiration_timestamp,
224 return json.dumps(contents.__dict__)
226 def _read_lockfile(self) -> Optional[str]:
228 with open(self.lockfile, "r") as rf:
229 lines = rf.readlines()
231 except Exception as e:
235 def _detect_stale_lockfile(self) -> None:
236 raw_contents = self._read_lockfile()
240 contents = LockFileContents(**json.loads(raw_contents))
241 logger.debug('Blocking lock contents="%s"', contents)
243 # Does the PID exist still?
245 os.kill(contents.pid, 0)
248 "Lockfile %s's pid (%d) is stale; force acquiring...",
254 # Has the lock expiration expired?
255 if contents.expiration_timestamp is not None:
256 now = datetime.datetime.now().timestamp()
257 if now > contents.expiration_timestamp:
259 "Lockfile %s's expiration time has passed; force acquiring",