Fix a bug around the computation of zookeeper lockfile expirations.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.
8
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held.  Other copies will fail to start
11 if they detect this lock until the lock is released.  There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
14 etc...
15
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
18 machines.
19
20 """
21
22 from __future__ import annotations
23
24 import contextlib
25 import datetime
26 import json
27 import logging
28 import os
29 import platform
30 import signal
31 import sys
32 import warnings
33 from dataclasses import dataclass
34 from typing import Literal, Optional
35
36 import kazoo
37
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
40
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
42 cfg.add_argument(
43     "--lockfile_held_duration_warning_threshold",
44     type=argparse_utils.valid_duration,
45     default=datetime.timedelta(60.0),
46     metavar="DURATION",
47     help="If a lock is held for longer than this threshold we log a warning",
48 )
49 logger = logging.getLogger(__name__)
50
51
52 class LockFileException(Exception):
53     """An exception related to lock files."""
54
55     pass
56
57
58 @dataclass
59 class LocalLockFileContents:
60     """The contents we'll write to each lock file."""
61
62     pid: int
63     """The pid of the process that holds the lock"""
64
65     commandline: str
66     """The commandline of the process that holds the lock"""
67
68     expiration_timestamp: Optional[float]
69     """When this lock will expire as seconds since Epoch"""
70
71
72 class LockFile(contextlib.AbstractContextManager):
73     """A file locking mechanism that has context-manager support so you
74     can use it in a with statement.  e.g.::
75
76         with LockFile('./foo.lock'):
77             # do a bunch of stuff... if the process dies we have a signal
78             # handler to do cleanup.  Other code (in this process or another)
79             # that tries to take the same lockfile will block.  There is also
80             # some logic for detecting stale locks.
81     """
82
83     def __init__(
84         self,
85         lockfile_path: str,
86         *,
87         do_signal_cleanup: bool = True,
88         expiration_timestamp: Optional[float] = None,
89         override_command: Optional[str] = None,
90     ) -> None:
91         """C'tor.
92
93         Args:
94             lockfile_path: path of the lockfile to acquire; may begin
95                 with zk: to indicate a path in zookeeper rather than
96                 on the local filesystem.  Note that zookeeper-based
97                 locks require an expiration_timestamp as the stale
98                 detection semantics are skipped for non-local locks.
99             do_signal_cleanup: handle SIGINT and SIGTERM events by
100                 releasing the lock before exiting
101             expiration_timestamp: when our lease on the lock should
102                 expire (as seconds since the Epoch).  None means the
103                 lock will not expire until we explicltly release it.
104                 Note that this is required for zookeeper based locks.
105             override_command: don't use argv to determine our commandline
106                 rather use this instead if provided.
107         """
108         self.is_locked: bool = False
109         self.lockfile: str = ""
110         self.zk_client: Optional[kazoo.client.KazooClient] = None
111         self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
112
113         if lockfile_path.startswith("zk:"):
114             logger.debug("Lockfile is on Zookeeper.")
115             if expiration_timestamp is None:
116                 raise Exception("Zookeeper locks require an expiration timestamp")
117             self.lockfile = lockfile_path[3:]
118             if not self.lockfile.startswith("/leases"):
119                 self.lockfile = "/leases" + self.lockfile
120             self.zk_client = zookeeper.get_started_zk_client()
121         else:
122             logger.debug("Lockfile is local.")
123             self.lockfile = lockfile_path
124         self.locktime: Optional[float] = None
125         self.override_command: Optional[str] = override_command
126         if do_signal_cleanup:
127             signal.signal(signal.SIGINT, self._signal)
128             signal.signal(signal.SIGTERM, self._signal)
129         self.expiration_timestamp = expiration_timestamp
130
131     def locked(self) -> bool:
132         """Is it locked currently?"""
133         return self.is_locked
134
135     def _try_acquire_local_filesystem_lock(self) -> bool:
136         """Attempt to create the lockfile.  These flags cause os.open
137         to raise an OSError if the file already exists.
138         """
139         try:
140             logger.debug("Trying to acquire local lock %s.", self.lockfile)
141             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
142             with os.fdopen(fd, "a") as f:
143                 contents = self._construct_local_lockfile_contents()
144                 logger.debug(contents)
145                 f.write(contents)
146             return True
147         except OSError:
148             logger.warning("Couldn't acquire local lock %s.", self.lockfile)
149             return False
150
151     def _try_acquire_zk_lock(self) -> bool:
152         assert self.expiration_timestamp
153         identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
154         if self.override_command:
155             identifier += f" running {self.override_command}"
156         expiration_delta_seconds_from_now = (
157             self.expiration_timestamp - datetime.datetime.now().timestamp()
158         )
159         self.zk_lease = zookeeper.RenewableReleasableLease(
160             self.zk_client,
161             self.lockfile,
162             datetime.timedelta(seconds=expiration_delta_seconds_from_now),
163             identifier,
164         )
165         return self.zk_lease
166
167     def try_acquire_lock_once(self) -> bool:
168         """Attempt to acquire the lock with no blocking.
169
170         Returns:
171             True if the lock was acquired and False otherwise.
172         """
173         success = False
174         if self.zk_client:
175             if self._try_acquire_zk_lock():
176                 success = True
177         else:
178             success = self._try_acquire_local_filesystem_lock()
179
180         if success:
181             self.locktime = datetime.datetime.now().timestamp()
182             logger.debug("Success; I own %s.", self.lockfile)
183             self.is_locked = True
184         return success
185
186     def acquire_with_retries(
187         self,
188         *,
189         initial_delay: float = 1.0,
190         backoff_factor: float = 2.0,
191         max_attempts: int = 5,
192     ) -> bool:
193         """Attempt to acquire the lock repeatedly with retries and backoffs.
194
195         Args:
196             initial_delay: how long to wait before retrying the first time
197             backoff_factor: a float >= 1.0 the multiples the current retry
198                 delay each subsequent time we attempt to acquire and fail
199                 to do so.
200             max_attempts: maximum number of times to try before giving up
201                 and failing.
202
203         Returns:
204             True if the lock was acquired and False otherwise.
205         """
206
207         @decorator_utils.retry_if_false(
208             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
209         )
210         def _try_acquire_lock_with_retries() -> bool:
211             success = self.try_acquire_lock_once()
212             if not success and os.path.exists(self.lockfile):
213                 self._detect_stale_lockfile()
214             return success
215
216         if os.path.exists(self.lockfile):
217             self._detect_stale_lockfile()
218         return _try_acquire_lock_with_retries()
219
220     def release(self) -> None:
221         """Release the lock"""
222
223         if not self.zk_client:
224             try:
225                 os.unlink(self.lockfile)
226             except Exception:
227                 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
228         else:
229             if self.zk_lease:
230                 self.zk_lease.release()
231             self.zk_client.stop()
232         self.is_locked = False
233
234     def __enter__(self):
235         if self.acquire_with_retries():
236             return self
237
238         msg = "Couldn't acquire lockfile; giving up."
239         if not self.zk_client:
240             raw_contents = self._read_lockfile()
241             if raw_contents:
242                 contents = LocalLockFileContents(**json.loads(raw_contents))
243                 msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
244         logger.warning(msg)
245         raise LockFileException(msg)
246
247     def __exit__(self, _, value, traceback) -> Literal[False]:
248         if self.locktime:
249             ts = datetime.datetime.now().timestamp()
250             duration = ts - self.locktime
251             warning_threshold = config.config[
252                 "lockfile_held_duration_warning_threshold"
253             ]
254             assert warning_threshold
255             if duration >= warning_threshold.total_seconds():
256                 # Note: describe duration briefly only does second-level granularity...
257                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
258                 msg = f"Held {self.lockfile} for {str_duration}"
259                 logger.warning(msg)
260                 warnings.warn(msg, stacklevel=2)
261         self.release()
262         return False
263
264     def __del__(self):
265         if self.is_locked:
266             self.release()
267
268     def _signal(self, *unused_args):
269         if self.is_locked:
270             self.release()
271
272     def _construct_local_lockfile_contents(self) -> str:
273         if not self.zk_client:
274             if self.override_command:
275                 cmd = self.override_command
276             else:
277                 cmd = " ".join(sys.argv)
278             contents = LocalLockFileContents(
279                 pid=os.getpid(),
280                 commandline=cmd,
281                 expiration_timestamp=self.expiration_timestamp,
282             )
283             return json.dumps(contents.__dict__)
284         raise Exception("Non-local lockfiles should not call this?!")
285
286     def _read_lockfile(self) -> Optional[str]:
287         if not self.zk_client:
288             try:
289                 with open(self.lockfile, "r") as rf:
290                     lines = rf.readlines()
291                     return lines[0]
292             except Exception:
293                 logger.exception(
294                     "Failed to read from path %s; giving up.", self.lockfile
295                 )
296         return None
297
298     def _detect_stale_lockfile(self) -> None:
299         if not self.zk_client:
300             raw_contents = self._read_lockfile()
301             if not raw_contents:
302                 return
303             contents = LocalLockFileContents(**json.loads(raw_contents))
304             logger.debug('Blocking lock contents="%s"', contents)
305
306             # Does the PID exist still?
307             try:
308                 os.kill(contents.pid, 0)
309             except OSError:
310                 logger.warning(
311                     "Lockfile %s's pid (%d) is stale; force acquiring...",
312                     self.lockfile,
313                     contents.pid,
314                 )
315                 self.release()
316
317             # Has the lock expiration expired?
318             if contents.expiration_timestamp is not None:
319                 now = datetime.datetime.now().timestamp()
320                 if now > contents.expiration_timestamp:
321                     logger.warning(
322                         "Lockfile %s's expiration time has passed; force acquiring",
323                         self.lockfile,
324                     )
325                     self.release()