3 # © Copyright 2021-2023, Scott Gasch
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held. Other copies will fail to start
11 if they detect this lock until the lock is released. There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
22 from __future__ import annotations
33 from dataclasses import dataclass
34 from typing import Literal, Optional
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
43 "--lockfile_held_duration_warning_threshold",
44 type=argparse_utils.valid_duration,
45 default=datetime.timedelta(60.0),
47 help="If a lock is held for longer than this threshold we log a warning",
49 logger = logging.getLogger(__name__)
52 class LockFileException(Exception):
53 """An exception related to lock files."""
59 class LocalLockFileContents:
60 """The contents we'll write to each lock file."""
63 """The pid of the process that holds the lock"""
66 """The commandline of the process that holds the lock"""
68 expiration_timestamp: Optional[float]
69 """When this lock will expire as seconds since Epoch"""
72 class LockFile(contextlib.AbstractContextManager):
73 """A file locking mechanism that has context-manager support so you
74 can use it in a with statement. e.g.::
76 with LockFile('./foo.lock'):
77 # do a bunch of stuff... if the process dies we have a signal
78 # handler to do cleanup. Other code (in this process or another)
79 # that tries to take the same lockfile will block. There is also
80 # some logic for detecting stale locks.
87 do_signal_cleanup: bool = True,
88 expiration_timestamp: Optional[float] = None,
89 override_command: Optional[str] = None,
94 lockfile_path: path of the lockfile to acquire; may begin
95 with zk: to indicate a path in zookeeper rather than
96 on the local filesystem. Note that zookeeper-based
97 locks require an expiration_timestamp as the stale
98 detection semantics are skipped for non-local locks.
99 do_signal_cleanup: handle SIGINT and SIGTERM events by
100 releasing the lock before exiting
101 expiration_timestamp: when our lease on the lock should
102 expire (as seconds since the Epoch). None means the
103 lock will not expire until we explicltly release it.
104 Note that this is required for zookeeper based locks.
105 override_command: don't use argv to determine our commandline
106 rather use this instead if provided.
109 Exception: Zookeeper lock path without an expiration timestamp
111 self.is_locked: bool = False
112 self.lockfile: str = ""
113 self.zk_client: Optional[kazoo.client.KazooClient] = None
114 self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
116 if lockfile_path.startswith("zk:"):
117 logger.debug("Lockfile is on Zookeeper.")
118 if expiration_timestamp is None:
119 raise Exception("Zookeeper locks require an expiration timestamp")
120 self.lockfile = lockfile_path[3:]
121 if not self.lockfile.startswith("/leases"):
122 self.lockfile = "/leases" + self.lockfile
123 self.zk_client = zookeeper.get_started_zk_client()
125 logger.debug("Lockfile is local.")
126 self.lockfile = lockfile_path
127 self.locktime: Optional[float] = None
128 self.override_command: Optional[str] = override_command
129 if do_signal_cleanup:
130 signal.signal(signal.SIGINT, self._signal)
131 signal.signal(signal.SIGTERM, self._signal)
132 self.expiration_timestamp = expiration_timestamp
134 def locked(self) -> bool:
135 """Is it locked currently?"""
136 return self.is_locked
138 def _try_acquire_local_filesystem_lock(self) -> bool:
139 """Attempt to create the lockfile. These flags cause os.open
140 to raise an OSError if the file already exists.
143 logger.debug("Trying to acquire local lock %s.", self.lockfile)
144 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
145 with os.fdopen(fd, "a") as f:
146 contents = self._construct_local_lockfile_contents()
147 logger.debug(contents)
151 logger.warning("Couldn't acquire local lock %s.", self.lockfile)
154 def _try_acquire_zk_lock(self) -> bool:
155 assert self.expiration_timestamp
156 identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
157 if self.override_command:
158 identifier += f" running {self.override_command}"
159 expiration_delta_seconds_from_now = (
160 self.expiration_timestamp - datetime.datetime.now().timestamp()
162 self.zk_lease = zookeeper.RenewableReleasableLease(
165 datetime.timedelta(seconds=expiration_delta_seconds_from_now),
170 def try_acquire_lock_once(self) -> bool:
171 """Attempt to acquire the lock with no blocking.
174 True if the lock was acquired and False otherwise.
178 if self._try_acquire_zk_lock():
181 success = self._try_acquire_local_filesystem_lock()
184 self.locktime = datetime.datetime.now().timestamp()
185 logger.debug("Success; I own %s.", self.lockfile)
186 self.is_locked = True
189 def acquire_with_retries(
192 initial_delay: float = 1.0,
193 backoff_factor: float = 2.0,
194 max_attempts: int = 5,
196 """Attempt to acquire the lock repeatedly with retries and backoffs.
199 initial_delay: how long to wait before retrying the first time
200 backoff_factor: a float >= 1.0 the multiples the current retry
201 delay each subsequent time we attempt to acquire and fail
203 max_attempts: maximum number of times to try before giving up
207 True if the lock was acquired and False otherwise.
210 @decorator_utils.retry_if_false(
211 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
213 def _try_acquire_lock_with_retries() -> bool:
214 success = self.try_acquire_lock_once()
215 if not success and os.path.exists(self.lockfile):
216 self._detect_stale_lockfile()
219 if os.path.exists(self.lockfile):
220 self._detect_stale_lockfile()
221 return _try_acquire_lock_with_retries()
223 def release(self) -> None:
224 """Release the lock"""
226 if not self.zk_client:
228 os.unlink(self.lockfile)
230 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
233 self.zk_lease.release()
234 self.zk_client.stop()
235 self.is_locked = False
238 if self.acquire_with_retries():
241 msg = "Couldn't acquire lockfile; giving up."
242 if not self.zk_client:
243 raw_contents = self._read_lockfile()
245 contents = LocalLockFileContents(**json.loads(raw_contents))
246 msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
248 raise LockFileException(msg)
250 def __exit__(self, _, value, traceback) -> Literal[False]:
252 ts = datetime.datetime.now().timestamp()
253 duration = ts - self.locktime
254 warning_threshold = config.config[
255 "lockfile_held_duration_warning_threshold"
257 assert warning_threshold
258 if duration >= warning_threshold.total_seconds():
259 # Note: describe duration briefly only does second-level granularity...
260 str_duration = datetime_utils.describe_duration_briefly(int(duration))
261 msg = f"Held {self.lockfile} for {str_duration}"
263 warnings.warn(msg, stacklevel=2)
271 def _signal(self, *unused_args):
275 def _construct_local_lockfile_contents(self) -> str:
276 if not self.zk_client:
277 if self.override_command:
278 cmd = self.override_command
280 cmd = " ".join(sys.argv)
281 contents = LocalLockFileContents(
284 expiration_timestamp=self.expiration_timestamp,
286 return json.dumps(contents.__dict__)
287 raise Exception("Non-local lockfiles should not call this?!")
289 def _read_lockfile(self) -> Optional[str]:
290 if not self.zk_client:
292 with open(self.lockfile, "r") as rf:
293 lines = rf.readlines()
297 "Failed to read from path %s; giving up.", self.lockfile
301 def _detect_stale_lockfile(self) -> None:
302 if not self.zk_client:
303 raw_contents = self._read_lockfile()
306 contents = LocalLockFileContents(**json.loads(raw_contents))
307 logger.debug('Blocking lock contents="%s"', contents)
309 # Does the PID exist still?
311 os.kill(contents.pid, 0)
314 "Lockfile %s's pid (%d) is stale; force acquiring...",
320 # Has the lock expiration expired?
321 if contents.expiration_timestamp is not None:
322 now = datetime.datetime.now().timestamp()
323 if now > contents.expiration_timestamp:
325 "Lockfile %s's expiration time has passed; force acquiring",