More spring cleaning.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.
8
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held.  Other copies will fail to start
11 if they detect this lock until the lock is released.  There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
14 etc...
15
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
18 machines.
19
20 """
21
22 from __future__ import annotations
23
24 import contextlib
25 import datetime
26 import json
27 import logging
28 import os
29 import platform
30 import signal
31 import sys
32 import warnings
33 from dataclasses import dataclass
34 from typing import Literal, Optional
35
36 import kazoo
37
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
40
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
42 cfg.add_argument(
43     "--lockfile_held_duration_warning_threshold",
44     type=argparse_utils.valid_duration,
45     default=datetime.timedelta(60.0),
46     metavar="DURATION",
47     help="If a lock is held for longer than this threshold we log a warning",
48 )
49 logger = logging.getLogger(__name__)
50
51
52 class LockFileException(Exception):
53     """An exception related to lock files."""
54
55     pass
56
57
58 @dataclass
59 class LocalLockFileContents:
60     """The contents we'll write to each lock file."""
61
62     pid: int
63     """The pid of the process that holds the lock"""
64
65     commandline: str
66     """The commandline of the process that holds the lock"""
67
68     expiration_timestamp: Optional[float]
69     """When this lock will expire as seconds since Epoch"""
70
71
72 class LockFile(contextlib.AbstractContextManager):
73     """A file locking mechanism that has context-manager support so you
74     can use it in a with statement.  e.g.::
75
76         with LockFile('./foo.lock'):
77             # do a bunch of stuff... if the process dies we have a signal
78             # handler to do cleanup.  Other code (in this process or another)
79             # that tries to take the same lockfile will block.  There is also
80             # some logic for detecting stale locks.
81     """
82
83     def __init__(
84         self,
85         lockfile_path: str,
86         *,
87         do_signal_cleanup: bool = True,
88         expiration_timestamp: Optional[float] = None,
89         override_command: Optional[str] = None,
90     ) -> None:
91         """C'tor.
92
93         Args:
94             lockfile_path: path of the lockfile to acquire; may begin
95                 with zk: to indicate a path in zookeeper rather than
96                 on the local filesystem.  Note that zookeeper-based
97                 locks require an expiration_timestamp as the stale
98                 detection semantics are skipped for non-local locks.
99             do_signal_cleanup: handle SIGINT and SIGTERM events by
100                 releasing the lock before exiting
101             expiration_timestamp: when our lease on the lock should
102                 expire (as seconds since the Epoch).  None means the
103                 lock will not expire until we explicltly release it.
104                 Note that this is required for zookeeper based locks.
105             override_command: don't use argv to determine our commandline
106                 rather use this instead if provided.
107         """
108         self.is_locked: bool = False
109         self.lockfile: str = ""
110         self.zk_client: Optional[kazoo.client.KazooClient] = None
111         self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
112
113         if lockfile_path.startswith("zk:"):
114             logger.debug("Lockfile is on Zookeeper.")
115             if expiration_timestamp is None:
116                 raise Exception("Zookeeper locks require an expiration timestamp")
117             self.lockfile = lockfile_path[3:]
118             if not self.lockfile.startswith("/leases"):
119                 self.lockfile = "/leases" + self.lockfile
120             self.zk_client = zookeeper.get_started_zk_client()
121         else:
122             logger.debug("Lockfile is local.")
123             self.lockfile = lockfile_path
124         self.locktime: Optional[float] = None
125         self.override_command: Optional[str] = override_command
126         if do_signal_cleanup:
127             signal.signal(signal.SIGINT, self._signal)
128             signal.signal(signal.SIGTERM, self._signal)
129         self.expiration_timestamp = expiration_timestamp
130
131     def locked(self) -> bool:
132         """Is it locked currently?"""
133         return self.is_locked
134
135     def _try_acquire_local_filesystem_lock(self) -> bool:
136         """Attempt to create the lockfile.  These flags cause os.open
137         to raise an OSError if the file already exists.
138         """
139         try:
140             logger.debug("Trying to acquire local lock %s.", self.lockfile)
141             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
142             with os.fdopen(fd, "a") as f:
143                 contents = self._construct_local_lockfile_contents()
144                 logger.debug(contents)
145                 f.write(contents)
146             return True
147         except OSError:
148             logger.warning("Couldn't acquire local lock %s.", self.lockfile)
149             return False
150
151     def _try_acquire_zk_lock(self) -> bool:
152         assert self.expiration_timestamp
153         identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
154         if self.override_command:
155             identifier += f" running {self.override_command}"
156         self.zk_lease = zookeeper.RenewableReleasableLease(
157             self.zk_client,
158             self.lockfile,
159             datetime.timedelta(seconds=self.expiration_timestamp),
160             identifier,
161         )
162         return self.zk_lease
163
164     def try_acquire_lock_once(self) -> bool:
165         """Attempt to acquire the lock with no blocking.
166
167         Returns:
168             True if the lock was acquired and False otherwise.
169         """
170         success = False
171         if self.zk_client:
172             if self._try_acquire_zk_lock():
173                 success = True
174         else:
175             success = self._try_acquire_local_filesystem_lock()
176
177         if success:
178             self.locktime = datetime.datetime.now().timestamp()
179             logger.debug("Success; I own %s.", self.lockfile)
180             self.is_locked = True
181         return success
182
183     def acquire_with_retries(
184         self,
185         *,
186         initial_delay: float = 1.0,
187         backoff_factor: float = 2.0,
188         max_attempts: int = 5,
189     ) -> bool:
190         """Attempt to acquire the lock repeatedly with retries and backoffs.
191
192         Args:
193             initial_delay: how long to wait before retrying the first time
194             backoff_factor: a float >= 1.0 the multiples the current retry
195                 delay each subsequent time we attempt to acquire and fail
196                 to do so.
197             max_attempts: maximum number of times to try before giving up
198                 and failing.
199
200         Returns:
201             True if the lock was acquired and False otherwise.
202         """
203
204         @decorator_utils.retry_if_false(
205             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
206         )
207         def _try_acquire_lock_with_retries() -> bool:
208             success = self.try_acquire_lock_once()
209             if not success and os.path.exists(self.lockfile):
210                 self._detect_stale_lockfile()
211             return success
212
213         if os.path.exists(self.lockfile):
214             self._detect_stale_lockfile()
215         return _try_acquire_lock_with_retries()
216
217     def release(self) -> None:
218         """Release the lock"""
219
220         if not self.zk_client:
221             try:
222                 os.unlink(self.lockfile)
223             except Exception:
224                 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
225         else:
226             if self.zk_lease:
227                 self.zk_lease.release()
228             self.zk_client.stop()
229         self.is_locked = False
230
231     def __enter__(self):
232         if self.acquire_with_retries():
233             return self
234
235         msg = "Couldn't acquire lockfile; giving up."
236         if not self.zk_client:
237             raw_contents = self._read_lockfile()
238             if raw_contents:
239                 contents = LocalLockFileContents(**json.loads(raw_contents))
240                 msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
241         logger.warning(msg)
242         raise LockFileException(msg)
243
244     def __exit__(self, _, value, traceback) -> Literal[False]:
245         if self.locktime:
246             ts = datetime.datetime.now().timestamp()
247             duration = ts - self.locktime
248             warning_threshold = config.config[
249                 "lockfile_held_duration_warning_threshold"
250             ]
251             assert warning_threshold
252             if duration >= warning_threshold.total_seconds():
253                 # Note: describe duration briefly only does second-level granularity...
254                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
255                 msg = f"Held {self.lockfile} for {str_duration}"
256                 logger.warning(msg)
257                 warnings.warn(msg, stacklevel=2)
258         self.release()
259         return False
260
261     def __del__(self):
262         if self.is_locked:
263             self.release()
264
265     def _signal(self, *unused_args):
266         if self.is_locked:
267             self.release()
268
269     def _construct_local_lockfile_contents(self) -> str:
270         if not self.zk_client:
271             if self.override_command:
272                 cmd = self.override_command
273             else:
274                 cmd = " ".join(sys.argv)
275             contents = LocalLockFileContents(
276                 pid=os.getpid(),
277                 commandline=cmd,
278                 expiration_timestamp=self.expiration_timestamp,
279             )
280             return json.dumps(contents.__dict__)
281         raise Exception("Non-local lockfiles should not call this?!")
282
283     def _read_lockfile(self) -> Optional[str]:
284         if not self.zk_client:
285             try:
286                 with open(self.lockfile, "r") as rf:
287                     lines = rf.readlines()
288                     return lines[0]
289             except Exception:
290                 logger.exception(
291                     "Failed to read from path %s; giving up.", self.lockfile
292                 )
293         return None
294
295     def _detect_stale_lockfile(self) -> None:
296         if not self.zk_client:
297             raw_contents = self._read_lockfile()
298             if not raw_contents:
299                 return
300             contents = LocalLockFileContents(**json.loads(raw_contents))
301             logger.debug('Blocking lock contents="%s"', contents)
302
303             # Does the PID exist still?
304             try:
305                 os.kill(contents.pid, 0)
306             except OSError:
307                 logger.warning(
308                     "Lockfile %s's pid (%d) is stale; force acquiring...",
309                     self.lockfile,
310                     contents.pid,
311                 )
312                 self.release()
313
314             # Has the lock expiration expired?
315             if contents.expiration_timestamp is not None:
316                 now = datetime.datetime.now().timestamp()
317                 if now > contents.expiration_timestamp:
318                     logger.warning(
319                         "Lockfile %s's expiration time has passed; force acquiring",
320                         self.lockfile,
321                     )
322                     self.release()