3 # © Copyright 2021-2023, Scott Gasch
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held. Other copies will fail to start
11 if they detect this lock until the lock is released. There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
22 from __future__ import annotations
33 from dataclasses import dataclass
34 from typing import Literal, Optional
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
43 "--lockfile_held_duration_warning_threshold",
44 type=argparse_utils.valid_duration,
45 default=datetime.timedelta(60.0),
47 help="If a lock is held for longer than this threshold we log a warning",
49 logger = logging.getLogger(__name__)
52 class LockFileException(Exception):
53 """An exception related to lock files."""
59 class LocalLockFileContents:
60 """The contents we'll write to each lock file."""
63 """The pid of the process that holds the lock"""
66 """The commandline of the process that holds the lock"""
68 expiration_timestamp: Optional[float]
69 """When this lock will expire as seconds since Epoch"""
72 class LockFile(contextlib.AbstractContextManager):
73 """A file locking mechanism that has context-manager support so you
74 can use it in a with statement. e.g.::
76 with LockFile('./foo.lock'):
77 # do a bunch of stuff... if the process dies we have a signal
78 # handler to do cleanup. Other code (in this process or another)
79 # that tries to take the same lockfile will block. There is also
80 # some logic for detecting stale locks.
87 do_signal_cleanup: bool = True,
88 expiration_timestamp: Optional[float] = None,
89 override_command: Optional[str] = None,
94 lockfile_path: path of the lockfile to acquire; may begin
95 with zk: to indicate a path in zookeeper rather than
96 on the local filesystem. Note that zookeeper-based
97 locks require an expiration_timestamp as the stale
98 detection semantics are skipped for non-local locks.
99 do_signal_cleanup: handle SIGINT and SIGTERM events by
100 releasing the lock before exiting
101 expiration_timestamp: when our lease on the lock should
102 expire (as seconds since the Epoch). None means the
103 lock will not expire until we explicltly release it.
104 Note that this is required for zookeeper based locks.
105 override_command: don't use argv to determine our commandline
106 rather use this instead if provided.
108 self.is_locked: bool = False
109 self.lockfile: str = ""
110 self.zk_client: Optional[kazoo.client.KazooClient] = None
111 self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
113 if lockfile_path.startswith("zk:"):
114 logger.debug("Lockfile is on Zookeeper.")
115 if expiration_timestamp is None:
116 raise Exception("Zookeeper locks require an expiration timestamp")
117 self.lockfile = lockfile_path[3:]
118 if not self.lockfile.startswith("/leases"):
119 self.lockfile = "/leases" + self.lockfile
120 self.zk_client = zookeeper.get_started_zk_client()
122 logger.debug("Lockfile is local.")
123 self.lockfile = lockfile_path
124 self.locktime: Optional[float] = None
125 self.override_command: Optional[str] = override_command
126 if do_signal_cleanup:
127 signal.signal(signal.SIGINT, self._signal)
128 signal.signal(signal.SIGTERM, self._signal)
129 self.expiration_timestamp = expiration_timestamp
131 def locked(self) -> bool:
132 """Is it locked currently?"""
133 return self.is_locked
135 def _try_acquire_local_filesystem_lock(self) -> bool:
136 """Attempt to create the lockfile. These flags cause os.open
137 to raise an OSError if the file already exists.
140 logger.debug("Trying to acquire local lock %s.", self.lockfile)
141 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
142 with os.fdopen(fd, "a") as f:
143 contents = self._construct_local_lockfile_contents()
144 logger.debug(contents)
148 logger.warning("Couldn't acquire local lock %s.", self.lockfile)
151 def _try_acquire_zk_lock(self) -> bool:
152 assert self.expiration_timestamp
153 identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
154 if self.override_command:
155 identifier += f" running {self.override_command}"
156 expiration_delta_seconds_from_now = (
157 self.expiration_timestamp - datetime.datetime.now().timestamp()
159 self.zk_lease = zookeeper.RenewableReleasableLease(
162 datetime.timedelta(seconds=expiration_delta_seconds_from_now),
167 def try_acquire_lock_once(self) -> bool:
168 """Attempt to acquire the lock with no blocking.
171 True if the lock was acquired and False otherwise.
175 if self._try_acquire_zk_lock():
178 success = self._try_acquire_local_filesystem_lock()
181 self.locktime = datetime.datetime.now().timestamp()
182 logger.debug("Success; I own %s.", self.lockfile)
183 self.is_locked = True
186 def acquire_with_retries(
189 initial_delay: float = 1.0,
190 backoff_factor: float = 2.0,
191 max_attempts: int = 5,
193 """Attempt to acquire the lock repeatedly with retries and backoffs.
196 initial_delay: how long to wait before retrying the first time
197 backoff_factor: a float >= 1.0 the multiples the current retry
198 delay each subsequent time we attempt to acquire and fail
200 max_attempts: maximum number of times to try before giving up
204 True if the lock was acquired and False otherwise.
207 @decorator_utils.retry_if_false(
208 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
210 def _try_acquire_lock_with_retries() -> bool:
211 success = self.try_acquire_lock_once()
212 if not success and os.path.exists(self.lockfile):
213 self._detect_stale_lockfile()
216 if os.path.exists(self.lockfile):
217 self._detect_stale_lockfile()
218 return _try_acquire_lock_with_retries()
220 def release(self) -> None:
221 """Release the lock"""
223 if not self.zk_client:
225 os.unlink(self.lockfile)
227 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
230 self.zk_lease.release()
231 self.zk_client.stop()
232 self.is_locked = False
235 if self.acquire_with_retries():
238 msg = "Couldn't acquire lockfile; giving up."
239 if not self.zk_client:
240 raw_contents = self._read_lockfile()
242 contents = LocalLockFileContents(**json.loads(raw_contents))
243 msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
245 raise LockFileException(msg)
247 def __exit__(self, _, value, traceback) -> Literal[False]:
249 ts = datetime.datetime.now().timestamp()
250 duration = ts - self.locktime
251 warning_threshold = config.config[
252 "lockfile_held_duration_warning_threshold"
254 assert warning_threshold
255 if duration >= warning_threshold.total_seconds():
256 # Note: describe duration briefly only does second-level granularity...
257 str_duration = datetime_utils.describe_duration_briefly(int(duration))
258 msg = f"Held {self.lockfile} for {str_duration}"
260 warnings.warn(msg, stacklevel=2)
268 def _signal(self, *unused_args):
272 def _construct_local_lockfile_contents(self) -> str:
273 if not self.zk_client:
274 if self.override_command:
275 cmd = self.override_command
277 cmd = " ".join(sys.argv)
278 contents = LocalLockFileContents(
281 expiration_timestamp=self.expiration_timestamp,
283 return json.dumps(contents.__dict__)
284 raise Exception("Non-local lockfiles should not call this?!")
286 def _read_lockfile(self) -> Optional[str]:
287 if not self.zk_client:
289 with open(self.lockfile, "r") as rf:
290 lines = rf.readlines()
294 "Failed to read from path %s; giving up.", self.lockfile
298 def _detect_stale_lockfile(self) -> None:
299 if not self.zk_client:
300 raw_contents = self._read_lockfile()
303 contents = LocalLockFileContents(**json.loads(raw_contents))
304 logger.debug('Blocking lock contents="%s"', contents)
306 # Does the PID exist still?
308 os.kill(contents.pid, 0)
311 "Lockfile %s's pid (%d) is stale; force acquiring...",
317 # Has the lock expiration expired?
318 if contents.expiration_timestamp is not None:
319 now = datetime.datetime.now().timestamp()
320 if now > contents.expiration_timestamp:
322 "Lockfile %s's expiration time has passed; force acquiring",