3 # © Copyright 2021-2023, Scott Gasch
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held. Other copies will fail to start
11 if they detect this lock until the lock is released. There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
22 from __future__ import annotations
33 from dataclasses import dataclass
34 from typing import Literal, Optional
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
43 "--lockfile_held_duration_warning_threshold",
44 type=argparse_utils.valid_duration,
45 default=datetime.timedelta(60.0),
47 help="If a lock is held for longer than this threshold we log a warning",
49 logger = logging.getLogger(__name__)
52 class LockFileException(Exception):
53 """An exception related to lock files."""
59 class LocalLockFileContents:
60 """The contents we'll write to each lock file."""
63 """The pid of the process that holds the lock"""
66 """The commandline of the process that holds the lock"""
68 expiration_timestamp: Optional[float]
69 """When this lock will expire as seconds since Epoch"""
72 class LockFile(contextlib.AbstractContextManager):
73 """A file locking mechanism that has context-manager support so you
74 can use it in a with statement. e.g.::
76 with LockFile('./foo.lock'):
77 # do a bunch of stuff... if the process dies we have a signal
78 # handler to do cleanup. Other code (in this process or another)
79 # that tries to take the same lockfile will block. There is also
80 # some logic for detecting stale locks.
87 do_signal_cleanup: bool = True,
88 expiration_timestamp: Optional[float] = None,
89 override_command: Optional[str] = None,
94 lockfile_path: path of the lockfile to acquire; may begin
95 with zk: to indicate a path in zookeeper rather than
96 on the local filesystem. Note that zookeeper-based
97 locks require an expiration_timestamp as the stale
98 detection semantics are skipped for non-local locks.
99 do_signal_cleanup: handle SIGINT and SIGTERM events by
100 releasing the lock before exiting
101 expiration_timestamp: when our lease on the lock should
102 expire (as seconds since the Epoch). None means the
103 lock will not expire until we explicltly release it.
104 Note that this is required for zookeeper based locks.
105 override_command: don't use argv to determine our commandline
106 rather use this instead if provided.
108 self.is_locked: bool = False
109 self.lockfile: str = ""
110 self.zk_client: Optional[kazoo.client.KazooClient] = None
111 self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
113 if lockfile_path.startswith("zk:"):
114 logger.debug("Lockfile is on Zookeeper.")
115 if expiration_timestamp is None:
116 raise Exception("Zookeeper locks require an expiration timestamp")
117 self.lockfile = lockfile_path[3:]
118 if not self.lockfile.startswith("/leases"):
119 self.lockfile = "/leases" + self.lockfile
120 self.zk_client = zookeeper.get_started_zk_client()
122 logger.debug("Lockfile is local.")
123 self.lockfile = lockfile_path
124 self.locktime: Optional[float] = None
125 self.override_command: Optional[str] = override_command
126 if do_signal_cleanup:
127 signal.signal(signal.SIGINT, self._signal)
128 signal.signal(signal.SIGTERM, self._signal)
129 self.expiration_timestamp = expiration_timestamp
131 def locked(self) -> bool:
132 """Is it locked currently?"""
133 return self.is_locked
135 def _try_acquire_local_filesystem_lock(self) -> bool:
136 """Attempt to create the lockfile. These flags cause os.open
137 to raise an OSError if the file already exists.
140 logger.debug("Trying to acquire local lock %s.", self.lockfile)
141 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
142 with os.fdopen(fd, "a") as f:
143 contents = self._construct_local_lockfile_contents()
144 logger.debug(contents)
148 logger.warning("Couldn't acquire local lock %s.", self.lockfile)
151 def _try_acquire_zk_lock(self) -> bool:
152 assert self.expiration_timestamp
153 identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
154 if self.override_command:
155 identifier += f" running {self.override_command}"
156 self.zk_lease = zookeeper.RenewableReleasableLease(
159 datetime.timedelta(seconds=self.expiration_timestamp),
164 def try_acquire_lock_once(self) -> bool:
165 """Attempt to acquire the lock with no blocking.
168 True if the lock was acquired and False otherwise.
172 if self._try_acquire_zk_lock():
175 success = self._try_acquire_local_filesystem_lock()
178 self.locktime = datetime.datetime.now().timestamp()
179 logger.debug("Success; I own %s.", self.lockfile)
180 self.is_locked = True
183 def acquire_with_retries(
186 initial_delay: float = 1.0,
187 backoff_factor: float = 2.0,
188 max_attempts: int = 5,
190 """Attempt to acquire the lock repeatedly with retries and backoffs.
193 initial_delay: how long to wait before retrying the first time
194 backoff_factor: a float >= 1.0 the multiples the current retry
195 delay each subsequent time we attempt to acquire and fail
197 max_attempts: maximum number of times to try before giving up
201 True if the lock was acquired and False otherwise.
204 @decorator_utils.retry_if_false(
205 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
207 def _try_acquire_lock_with_retries() -> bool:
208 success = self.try_acquire_lock_once()
209 if not success and os.path.exists(self.lockfile):
210 self._detect_stale_lockfile()
213 if os.path.exists(self.lockfile):
214 self._detect_stale_lockfile()
215 return _try_acquire_lock_with_retries()
217 def release(self) -> None:
218 """Release the lock"""
220 if not self.zk_client:
222 os.unlink(self.lockfile)
224 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
227 self.zk_lease.release()
228 self.zk_client.stop()
229 self.is_locked = False
232 if self.acquire_with_retries():
235 msg = "Couldn't acquire lockfile; giving up."
236 if not self.zk_client:
237 raw_contents = self._read_lockfile()
239 contents = LocalLockFileContents(**json.loads(raw_contents))
240 msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
242 raise LockFileException(msg)
244 def __exit__(self, _, value, traceback) -> Literal[False]:
246 ts = datetime.datetime.now().timestamp()
247 duration = ts - self.locktime
248 warning_threshold = config.config[
249 "lockfile_held_duration_warning_threshold"
251 assert warning_threshold
252 if duration >= warning_threshold.total_seconds():
253 # Note: describe duration briefly only does second-level granularity...
254 str_duration = datetime_utils.describe_duration_briefly(int(duration))
255 msg = f"Held {self.lockfile} for {str_duration}"
257 warnings.warn(msg, stacklevel=2)
265 def _signal(self, *unused_args):
269 def _construct_local_lockfile_contents(self) -> str:
270 if not self.zk_client:
271 if self.override_command:
272 cmd = self.override_command
274 cmd = " ".join(sys.argv)
275 contents = LocalLockFileContents(
278 expiration_timestamp=self.expiration_timestamp,
280 return json.dumps(contents.__dict__)
281 raise Exception("Non-local lockfiles should not call this?!")
283 def _read_lockfile(self) -> Optional[str]:
284 if not self.zk_client:
286 with open(self.lockfile, "r") as rf:
287 lines = rf.readlines()
291 "Failed to read from path %s; giving up.", self.lockfile
295 def _detect_stale_lockfile(self) -> None:
296 if not self.zk_client:
297 raw_contents = self._read_lockfile()
300 contents = LocalLockFileContents(**json.loads(raw_contents))
301 logger.debug('Blocking lock contents="%s"', contents)
303 # Does the PID exist still?
305 os.kill(contents.pid, 0)
308 "Lockfile %s's pid (%d) is stale; force acquiring...",
314 # Has the lock expiration expired?
315 if contents.expiration_timestamp is not None:
316 now = datetime.datetime.now().timestamp()
317 if now > contents.expiration_timestamp:
319 "Lockfile %s's expiration time has passed; force acquiring",