Improve identifier for zookeeper based lockfiles.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.
8
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held.  Other copies will fail to start
11 if they detect this lock until the lock is released.  There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
14 etc...
15
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
18 machines.
19
20 """
21
22 from __future__ import annotations
23
24 import contextlib
25 import datetime
26 import json
27 import logging
28 import os
29 import platform
30 import signal
31 import sys
32 import warnings
33 from dataclasses import dataclass
34 from typing import Literal, Optional
35
36 import kazoo
37
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
40
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
42 cfg.add_argument(
43     "--lockfile_held_duration_warning_threshold",
44     type=argparse_utils.valid_duration,
45     default=datetime.timedelta(60.0),
46     metavar="DURATION",
47     help="If a lock is held for longer than this threshold we log a warning",
48 )
49 logger = logging.getLogger(__name__)
50
51
52 class LockFileException(Exception):
53     """An exception related to lock files."""
54
55     pass
56
57
58 @dataclass
59 class LocalLockFileContents:
60     """The contents we'll write to each lock file."""
61
62     pid: int
63     """The pid of the process that holds the lock"""
64
65     commandline: str
66     """The commandline of the process that holds the lock"""
67
68     expiration_timestamp: Optional[float]
69     """When this lock will expire as seconds since Epoch"""
70
71
72 class LockFile(contextlib.AbstractContextManager):
73     """A file locking mechanism that has context-manager support so you
74     can use it in a with statement.  e.g.::
75
76         with LockFile('./foo.lock'):
77             # do a bunch of stuff... if the process dies we have a signal
78             # handler to do cleanup.  Other code (in this process or another)
79             # that tries to take the same lockfile will block.  There is also
80             # some logic for detecting stale locks.
81     """
82
83     def __init__(
84         self,
85         lockfile_path: str,
86         *,
87         do_signal_cleanup: bool = True,
88         expiration_timestamp: Optional[float] = None,
89         override_command: Optional[str] = None,
90     ) -> None:
91         """C'tor.
92
93         Args:
94             lockfile_path: path of the lockfile to acquire; may begin
95                 with zk: to indicate a path in zookeeper rather than
96                 on the local filesystem.  Note that zookeeper-based
97                 locks require an expiration_timestamp as the stale
98                 detection semantics are skipped for non-local locks.
99             do_signal_cleanup: handle SIGINT and SIGTERM events by
100                 releasing the lock before exiting
101             expiration_timestamp: when our lease on the lock should
102                 expire (as seconds since the Epoch).  None means the
103                 lock will not expire until we explicltly release it.
104                 Note that this is required for zookeeper based locks.
105             override_command: don't use argv to determine our commandline
106                 rather use this instead if provided.
107         """
108         self.is_locked: bool = False
109         self.lockfile: str = ""
110         self.zk_client: Optional[kazoo.client.KazooClient] = None
111         self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
112
113         if lockfile_path.startswith("zk:"):
114             logger.debug("Lockfile is on Zookeeper.")
115             if expiration_timestamp is None:
116                 raise Exception("Zookeeper locks require an expiration timestamp")
117             self.lockfile = lockfile_path[3:]
118             if not self.lockfile.startswith("/leases"):
119                 self.lockfile = "/leases" + self.lockfile
120             self.zk_client = zookeeper.get_started_zk_client()
121         else:
122             logger.debug("Lockfile is local.")
123             self.lockfile = lockfile_path
124         self.locktime: Optional[float] = None
125         self.override_command: Optional[str] = override_command
126         if do_signal_cleanup:
127             signal.signal(signal.SIGINT, self._signal)
128             signal.signal(signal.SIGTERM, self._signal)
129         self.expiration_timestamp = expiration_timestamp
130
131     def locked(self) -> bool:
132         """Is it locked currently?"""
133         return self.is_locked
134
135     def _try_acquire_local_filesystem_lock(self) -> bool:
136         """Attempt to create the lockfile.  These flags cause os.open
137         to raise an OSError if the file already exists.
138         """
139         try:
140             logger.debug("Trying to acquire local lock %s.", self.lockfile)
141             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
142             with os.fdopen(fd, "a") as f:
143                 contents = self._construct_local_lockfile_contents()
144                 logger.debug(contents)
145                 f.write(contents)
146             return True
147         except OSError:
148             logger.warning("Couldn't acquire local lock %s.", self.lockfile)
149             return False
150
151     def _try_acquire_zk_lock(self) -> bool:
152         assert self.expiration_timestamp
153         identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
154         if self.override_command:
155             identifier += f" running {self.override_command}"
156         self.zk_lease = zookeeper.RenewableReleasableLease(
157             self.zk_client,
158             self.lockfile,
159             datetime.timedelta(seconds=self.expiration_timestamp),
160             identifier,
161         )
162         return self.zk_lease
163
164     def try_acquire_lock_once(self) -> bool:
165         """Attempt to acquire the lock with no blocking.
166
167         Returns:
168             True if the lock was acquired and False otherwise.
169         """
170         success = False
171         if self.zk_client:
172             if self._try_acquire_zk_lock():
173                 success = True
174         else:
175             success = self._try_acquire_local_filesystem_lock()
176
177         if success:
178             self.locktime = datetime.datetime.now().timestamp()
179             logger.debug("Success; I own %s.", self.lockfile)
180             self.is_locked = True
181         return success
182
183     def acquire_with_retries(
184         self,
185         *,
186         initial_delay: float = 1.0,
187         backoff_factor: float = 2.0,
188         max_attempts=5,
189     ) -> bool:
190         """Attempt to acquire the lock repeatedly with retries and backoffs.
191
192         Args:
193             initial_delay: how long to wait before retrying the first time
194             backoff_factor: a float >= 1.0 the multiples the current retry
195                 delay each subsequent time we attempt to acquire and fail
196                 to do so.
197             max_attempts: maximum number of times to try before giving up
198                 and failing.
199
200         Returns:
201             True if the lock was acquired and False otherwise.
202         """
203
204         @decorator_utils.retry_if_false(
205             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
206         )
207         def _try_acquire_lock_with_retries() -> bool:
208             success = self.try_acquire_lock_once()
209             if not success and os.path.exists(self.lockfile):
210                 self._detect_stale_lockfile()
211             return success
212
213         if os.path.exists(self.lockfile):
214             self._detect_stale_lockfile()
215         return _try_acquire_lock_with_retries()
216
217     def release(self) -> None:
218         """Release the lock"""
219
220         if not self.zk_client:
221             try:
222                 os.unlink(self.lockfile)
223             except Exception as e:
224                 logger.exception(e)
225         else:
226             if self.zk_lease:
227                 self.zk_lease.release()
228             self.zk_client.stop()
229         self.is_locked = False
230
231     def __enter__(self):
232         if self.acquire_with_retries():
233             return self
234
235         msg = "Couldn't acquire lockfile; giving up."
236         if not self.zk_client:
237             raw_contents = self._read_lockfile()
238             if raw_contents:
239                 contents = LocalLockFileContents(**json.loads(raw_contents))
240                 msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
241         logger.warning(msg)
242         raise LockFileException(msg)
243
244     def __exit__(self, _, value, traceback) -> Literal[False]:
245         if self.locktime:
246             ts = datetime.datetime.now().timestamp()
247             duration = ts - self.locktime
248             if (
249                 duration
250                 >= config.config[
251                     "lockfile_held_duration_warning_threshold"
252                 ].total_seconds()
253             ):
254                 # Note: describe duration briefly only does 1s granularity...
255                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
256                 msg = f"Held {self.lockfile} for {str_duration}"
257                 logger.warning(msg)
258                 warnings.warn(msg, stacklevel=2)
259         self.release()
260         return False
261
262     def __del__(self):
263         if self.is_locked:
264             self.release()
265
266     def _signal(self, *args):
267         if self.is_locked:
268             self.release()
269
270     def _construct_local_lockfile_contents(self) -> str:
271         if not self.zk_client:
272             if self.override_command:
273                 cmd = self.override_command
274             else:
275                 cmd = " ".join(sys.argv)
276             contents = LocalLockFileContents(
277                 pid=os.getpid(),
278                 commandline=cmd,
279                 expiration_timestamp=self.expiration_timestamp,
280             )
281             return json.dumps(contents.__dict__)
282         raise Exception("Non-local lockfiles should not call this?!")
283
284     def _read_lockfile(self) -> Optional[str]:
285         if not self.zk_client:
286             try:
287                 with open(self.lockfile, "r") as rf:
288                     lines = rf.readlines()
289                     return lines[0]
290             except Exception as e:
291                 logger.exception(e)
292         return None
293
294     def _detect_stale_lockfile(self) -> None:
295         if not self.zk_client:
296             raw_contents = self._read_lockfile()
297             if not raw_contents:
298                 return
299             contents = LocalLockFileContents(**json.loads(raw_contents))
300             logger.debug('Blocking lock contents="%s"', contents)
301
302             # Does the PID exist still?
303             try:
304                 os.kill(contents.pid, 0)
305             except OSError:
306                 logger.warning(
307                     "Lockfile %s's pid (%d) is stale; force acquiring...",
308                     self.lockfile,
309                     contents.pid,
310                 )
311                 self.release()
312
313             # Has the lock expiration expired?
314             if contents.expiration_timestamp is not None:
315                 now = datetime.datetime.now().timestamp()
316                 if now > contents.expiration_timestamp:
317                     logger.warning(
318                         "Lockfile %s's expiration time has passed; force acquiring",
319                         self.lockfile,
320                     )
321                     self.release()