3 # © Copyright 2021-2022, Scott Gasch
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held. Other copies will fail to start
11 if they detect this lock until the lock is released. There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
22 from __future__ import annotations
32 from dataclasses import dataclass
33 from typing import Literal, Optional
37 from pyutils import argparse_utils, config, decorator_utils, zookeeper
38 from pyutils.datetimes import datetime_utils
40 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
42 "--lockfile_held_duration_warning_threshold",
43 type=argparse_utils.valid_duration,
44 default=datetime.timedelta(60.0),
46 help="If a lock is held for longer than this threshold we log a warning",
48 logger = logging.getLogger(__name__)
51 class LockFileException(Exception):
52 """An exception related to lock files."""
58 class LocalLockFileContents:
59 """The contents we'll write to each lock file."""
62 """The pid of the process that holds the lock"""
65 """The commandline of the process that holds the lock"""
67 expiration_timestamp: Optional[float]
68 """When this lock will expire as seconds since Epoch"""
71 class LockFile(contextlib.AbstractContextManager):
72 """A file locking mechanism that has context-manager support so you
73 can use it in a with statement. e.g.::
75 with LockFile('./foo.lock'):
76 # do a bunch of stuff... if the process dies we have a signal
77 # handler to do cleanup. Other code (in this process or another)
78 # that tries to take the same lockfile will block. There is also
79 # some logic for detecting stale locks.
86 do_signal_cleanup: bool = True,
87 expiration_timestamp: Optional[float] = None,
88 override_command: Optional[str] = None,
93 lockfile_path: path of the lockfile to acquire; may begin
94 with zk: to indicate a path in zookeeper rather than
95 on the local filesystem. Note that zookeeper-based
96 locks require an expiration_timestamp as the stale
97 detection semantics are skipped for non-local locks.
98 do_signal_cleanup: handle SIGINT and SIGTERM events by
99 releasing the lock before exiting
100 expiration_timestamp: when our lease on the lock should
101 expire (as seconds since the Epoch). None means the
102 lock will not expire until we explicltly release it.
103 Note that this is required for zookeeper based locks.
104 override_command: don't use argv to determine our commandline
105 rather use this instead if provided.
107 self.is_locked: bool = False
108 self.lockfile: str = ""
109 self.zk_client: Optional[kazoo.client.KazooClient] = None
110 self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
112 if lockfile_path.startswith("zk:"):
113 logger.debug("Lockfile is on Zookeeper.")
114 if expiration_timestamp is None:
115 raise Exception("Zookeeper locks require an expiration timestamp")
116 self.lockfile = lockfile_path[3:]
117 if not self.lockfile.startswith("/leases"):
118 self.lockfile = "/leases" + self.lockfile
119 self.zk_client = zookeeper.get_started_zk_client()
121 logger.debug("Lockfile is local.")
122 self.lockfile = lockfile_path
123 self.locktime: Optional[float] = None
124 self.override_command: Optional[str] = override_command
125 if do_signal_cleanup:
126 signal.signal(signal.SIGINT, self._signal)
127 signal.signal(signal.SIGTERM, self._signal)
128 self.expiration_timestamp = expiration_timestamp
130 def locked(self) -> bool:
131 """Is it locked currently?"""
132 return self.is_locked
134 def _try_acquire_local_filesystem_lock(self) -> bool:
135 """Attempt to create the lockfile. These flags cause os.open
136 to raise an OSError if the file already exists.
139 logger.debug("Trying to acquire local lock %s.", self.lockfile)
140 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
141 with os.fdopen(fd, "a") as f:
142 contents = self._construct_local_lockfile_contents()
143 logger.debug(contents)
147 logger.warning("Couldn't acquire local lock %s.", self.lockfile)
150 def _try_acquire_zk_lock(self) -> bool:
151 assert self.expiration_timestamp
152 self.zk_lease = zookeeper.RenewableReleasableLease(
155 datetime.timedelta(seconds=self.expiration_timestamp),
156 f"Pyutils lockfile pid={os.getpid()}",
160 def try_acquire_lock_once(self) -> bool:
161 """Attempt to acquire the lock with no blocking.
164 True if the lock was acquired and False otherwise.
168 if self._try_acquire_zk_lock():
171 success = self._try_acquire_local_filesystem_lock()
174 self.locktime = datetime.datetime.now().timestamp()
175 logger.debug("Success; I own %s.", self.lockfile)
176 self.is_locked = True
179 def acquire_with_retries(
182 initial_delay: float = 1.0,
183 backoff_factor: float = 2.0,
186 """Attempt to acquire the lock repeatedly with retries and backoffs.
189 initial_delay: how long to wait before retrying the first time
190 backoff_factor: a float >= 1.0 the multiples the current retry
191 delay each subsequent time we attempt to acquire and fail
193 max_attempts: maximum number of times to try before giving up
197 True if the lock was acquired and False otherwise.
200 @decorator_utils.retry_if_false(
201 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
203 def _try_acquire_lock_with_retries() -> bool:
204 success = self.try_acquire_lock_once()
205 if not success and os.path.exists(self.lockfile):
206 self._detect_stale_lockfile()
209 if os.path.exists(self.lockfile):
210 self._detect_stale_lockfile()
211 return _try_acquire_lock_with_retries()
213 def release(self) -> None:
214 """Release the lock"""
216 if not self.zk_client:
218 os.unlink(self.lockfile)
219 except Exception as e:
223 self.zk_lease.release()
224 self.zk_client.stop()
225 self.is_locked = False
228 if self.acquire_with_retries():
231 msg = "Couldn't acquire lockfile; giving up."
232 if not self.zk_client:
233 raw_contents = self._read_lockfile()
235 contents = LocalLockFileContents(**json.loads(raw_contents))
236 msg = f"Couldn't acquire {self.lockfile} after several attempts. It's held by pid={contents.pid} ({contents.commandline}). Giving up."
238 raise LockFileException(msg)
240 def __exit__(self, _, value, traceback) -> Literal[False]:
242 ts = datetime.datetime.now().timestamp()
243 duration = ts - self.locktime
247 "lockfile_held_duration_warning_threshold"
250 # Note: describe duration briefly only does 1s granularity...
251 str_duration = datetime_utils.describe_duration_briefly(int(duration))
252 msg = f"Held {self.lockfile} for {str_duration}"
254 warnings.warn(msg, stacklevel=2)
262 def _signal(self, *args):
266 def _construct_local_lockfile_contents(self) -> str:
267 if not self.zk_client:
268 if self.override_command:
269 cmd = self.override_command
271 cmd = " ".join(sys.argv)
272 contents = LocalLockFileContents(
275 expiration_timestamp=self.expiration_timestamp,
277 return json.dumps(contents.__dict__)
278 raise Exception("Non-local lockfiles should not call this?!")
280 def _read_lockfile(self) -> Optional[str]:
281 if not self.zk_client:
283 with open(self.lockfile, "r") as rf:
284 lines = rf.readlines()
286 except Exception as e:
290 def _detect_stale_lockfile(self) -> None:
291 if not self.zk_client:
292 raw_contents = self._read_lockfile()
295 contents = LocalLockFileContents(**json.loads(raw_contents))
296 logger.debug('Blocking lock contents="%s"', contents)
298 # Does the PID exist still?
300 os.kill(contents.pid, 0)
303 "Lockfile %s's pid (%d) is stale; force acquiring...",
309 # Has the lock expiration expired?
310 if contents.expiration_timestamp is not None:
311 now = datetime.datetime.now().timestamp()
312 if now > contents.expiration_timestamp:
314 "Lockfile %s's expiration time has passed; force acquiring",