Adds support for zookeeper-based lockfiles.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.
8
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held.  Other copies will fail to start
11 if they detect this lock until the lock is released.  There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
14 etc...
15
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
18 machines.
19
20 """
21
22 from __future__ import annotations
23
24 import contextlib
25 import datetime
26 import json
27 import logging
28 import os
29 import signal
30 import sys
31 import warnings
32 from dataclasses import dataclass
33 from typing import Literal, Optional
34
35 import kazoo
36
37 from pyutils import argparse_utils, config, decorator_utils, zookeeper
38 from pyutils.datetimes import datetime_utils
39
40 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
41 cfg.add_argument(
42     "--lockfile_held_duration_warning_threshold",
43     type=argparse_utils.valid_duration,
44     default=datetime.timedelta(60.0),
45     metavar="DURATION",
46     help="If a lock is held for longer than this threshold we log a warning",
47 )
48 logger = logging.getLogger(__name__)
49
50
51 class LockFileException(Exception):
52     """An exception related to lock files."""
53
54     pass
55
56
57 @dataclass
58 class LocalLockFileContents:
59     """The contents we'll write to each lock file."""
60
61     pid: int
62     """The pid of the process that holds the lock"""
63
64     commandline: str
65     """The commandline of the process that holds the lock"""
66
67     expiration_timestamp: Optional[float]
68     """When this lock will expire as seconds since Epoch"""
69
70
71 class LockFile(contextlib.AbstractContextManager):
72     """A file locking mechanism that has context-manager support so you
73     can use it in a with statement.  e.g.::
74
75         with LockFile('./foo.lock'):
76             # do a bunch of stuff... if the process dies we have a signal
77             # handler to do cleanup.  Other code (in this process or another)
78             # that tries to take the same lockfile will block.  There is also
79             # some logic for detecting stale locks.
80     """
81
82     def __init__(
83         self,
84         lockfile_path: str,
85         *,
86         do_signal_cleanup: bool = True,
87         expiration_timestamp: Optional[float] = None,
88         override_command: Optional[str] = None,
89     ) -> None:
90         """C'tor.
91
92         Args:
93             lockfile_path: path of the lockfile to acquire; may begin
94                 with zk: to indicate a path in zookeeper rather than
95                 on the local filesystem.  Note that zookeeper-based
96                 locks require an expiration_timestamp as the stale
97                 detection semantics are skipped for non-local locks.
98             do_signal_cleanup: handle SIGINT and SIGTERM events by
99                 releasing the lock before exiting
100             expiration_timestamp: when our lease on the lock should
101                 expire (as seconds since the Epoch).  None means the
102                 lock will not expire until we explicltly release it.
103                 Note that this is required for zookeeper based locks.
104             override_command: don't use argv to determine our commandline
105                 rather use this instead if provided.
106         """
107         self.is_locked: bool = False
108         self.lockfile: str = ""
109         self.zk_client: Optional[kazoo.client.KazooClient] = None
110         self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
111
112         if lockfile_path.startswith("zk:"):
113             logger.debug("Lockfile is on Zookeeper.")
114             if expiration_timestamp is None:
115                 raise Exception("Zookeeper locks require an expiration timestamp")
116             self.lockfile = lockfile_path[3:]
117             if not self.lockfile.startswith("/leases"):
118                 self.lockfile = "/leases" + self.lockfile
119             self.zk_client = zookeeper.get_started_zk_client()
120         else:
121             logger.debug("Lockfile is local.")
122             self.lockfile = lockfile_path
123         self.locktime: Optional[float] = None
124         self.override_command: Optional[str] = override_command
125         if do_signal_cleanup:
126             signal.signal(signal.SIGINT, self._signal)
127             signal.signal(signal.SIGTERM, self._signal)
128         self.expiration_timestamp = expiration_timestamp
129
130     def locked(self) -> bool:
131         """Is it locked currently?"""
132         return self.is_locked
133
134     def _try_acquire_local_filesystem_lock(self) -> bool:
135         """Attempt to create the lockfile.  These flags cause os.open
136         to raise an OSError if the file already exists.
137         """
138         try:
139             logger.debug("Trying to acquire local lock %s.", self.lockfile)
140             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
141             with os.fdopen(fd, "a") as f:
142                 contents = self._construct_local_lockfile_contents()
143                 logger.debug(contents)
144                 f.write(contents)
145             return True
146         except OSError:
147             logger.warning("Couldn't acquire local lock %s.", self.lockfile)
148             return False
149
150     def _try_acquire_zk_lock(self) -> bool:
151         assert self.expiration_timestamp
152         self.zk_lease = zookeeper.RenewableReleasableLease(
153             self.zk_client,
154             self.lockfile,
155             datetime.timedelta(seconds=self.expiration_timestamp),
156             f"Pyutils lockfile pid={os.getpid()}",
157         )
158         return self.zk_lease
159
160     def try_acquire_lock_once(self) -> bool:
161         """Attempt to acquire the lock with no blocking.
162
163         Returns:
164             True if the lock was acquired and False otherwise.
165         """
166         success = False
167         if self.zk_client:
168             if self._try_acquire_zk_lock():
169                 success = True
170         else:
171             success = self._try_acquire_local_filesystem_lock()
172
173         if success:
174             self.locktime = datetime.datetime.now().timestamp()
175             logger.debug("Success; I own %s.", self.lockfile)
176             self.is_locked = True
177         return success
178
179     def acquire_with_retries(
180         self,
181         *,
182         initial_delay: float = 1.0,
183         backoff_factor: float = 2.0,
184         max_attempts=5,
185     ) -> bool:
186         """Attempt to acquire the lock repeatedly with retries and backoffs.
187
188         Args:
189             initial_delay: how long to wait before retrying the first time
190             backoff_factor: a float >= 1.0 the multiples the current retry
191                 delay each subsequent time we attempt to acquire and fail
192                 to do so.
193             max_attempts: maximum number of times to try before giving up
194                 and failing.
195
196         Returns:
197             True if the lock was acquired and False otherwise.
198         """
199
200         @decorator_utils.retry_if_false(
201             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
202         )
203         def _try_acquire_lock_with_retries() -> bool:
204             success = self.try_acquire_lock_once()
205             if not success and os.path.exists(self.lockfile):
206                 self._detect_stale_lockfile()
207             return success
208
209         if os.path.exists(self.lockfile):
210             self._detect_stale_lockfile()
211         return _try_acquire_lock_with_retries()
212
213     def release(self) -> None:
214         """Release the lock"""
215
216         if not self.zk_client:
217             try:
218                 os.unlink(self.lockfile)
219             except Exception as e:
220                 logger.exception(e)
221         else:
222             if self.zk_lease:
223                 self.zk_lease.release()
224             self.zk_client.stop()
225         self.is_locked = False
226
227     def __enter__(self):
228         if self.acquire_with_retries():
229             return self
230
231         msg = "Couldn't acquire lockfile; giving up."
232         if not self.zk_client:
233             raw_contents = self._read_lockfile()
234             if raw_contents:
235                 contents = LocalLockFileContents(**json.loads(raw_contents))
236                 msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
237         logger.warning(msg)
238         raise LockFileException(msg)
239
240     def __exit__(self, _, value, traceback) -> Literal[False]:
241         if self.locktime:
242             ts = datetime.datetime.now().timestamp()
243             duration = ts - self.locktime
244             if (
245                 duration
246                 >= config.config[
247                     "lockfile_held_duration_warning_threshold"
248                 ].total_seconds()
249             ):
250                 # Note: describe duration briefly only does 1s granularity...
251                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
252                 msg = f"Held {self.lockfile} for {str_duration}"
253                 logger.warning(msg)
254                 warnings.warn(msg, stacklevel=2)
255         self.release()
256         return False
257
258     def __del__(self):
259         if self.is_locked:
260             self.release()
261
262     def _signal(self, *args):
263         if self.is_locked:
264             self.release()
265
266     def _construct_local_lockfile_contents(self) -> str:
267         if not self.zk_client:
268             if self.override_command:
269                 cmd = self.override_command
270             else:
271                 cmd = " ".join(sys.argv)
272             contents = LocalLockFileContents(
273                 pid=os.getpid(),
274                 commandline=cmd,
275                 expiration_timestamp=self.expiration_timestamp,
276             )
277             return json.dumps(contents.__dict__)
278         raise Exception("Non-local lockfiles should not call this?!")
279
280     def _read_lockfile(self) -> Optional[str]:
281         if not self.zk_client:
282             try:
283                 with open(self.lockfile, "r") as rf:
284                     lines = rf.readlines()
285                     return lines[0]
286             except Exception as e:
287                 logger.exception(e)
288         return None
289
290     def _detect_stale_lockfile(self) -> None:
291         if not self.zk_client:
292             raw_contents = self._read_lockfile()
293             if not raw_contents:
294                 return
295             contents = LocalLockFileContents(**json.loads(raw_contents))
296             logger.debug('Blocking lock contents="%s"', contents)
297
298             # Does the PID exist still?
299             try:
300                 os.kill(contents.pid, 0)
301             except OSError:
302                 logger.warning(
303                     "Lockfile %s's pid (%d) is stale; force acquiring...",
304                     self.lockfile,
305                     contents.pid,
306                 )
307                 self.release()
308
309             # Has the lock expiration expired?
310             if contents.expiration_timestamp is not None:
311                 now = datetime.datetime.now().timestamp()
312                 if now > contents.expiration_timestamp:
313                     logger.warning(
314                         "Lockfile %s's expiration time has passed; force acquiring",
315                         self.lockfile,
316                     )
317                     self.release()