Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.
8
9 For local operations, when one job is running this code keeps a file
10 on disk to indicate a lock is held.  Other copies will fail to start
11 if they detect this lock until the lock is released.  There are
12 provisions in the code for timing out locks, cleaning up a lock when a
13 signal is received, gracefully retrying lock acquisition on failure,
14 etc...
15
16 Also allows for Zookeeper-based locks when lockfile path is prefixed
17 with 'zk:' in order to synchronize processes across different
18 machines.
19
20 """
21
22 from __future__ import annotations
23
24 import contextlib
25 import datetime
26 import json
27 import logging
28 import os
29 import platform
30 import signal
31 import sys
32 import warnings
33 from dataclasses import dataclass
34 from typing import Literal, Optional
35
36 import kazoo
37
38 from pyutils import argparse_utils, config, decorator_utils, zookeeper
39 from pyutils.datetimes import datetime_utils
40
41 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
42 cfg.add_argument(
43     "--lockfile_held_duration_warning_threshold",
44     type=argparse_utils.valid_duration,
45     default=datetime.timedelta(60.0),
46     metavar="DURATION",
47     help="If a lock is held for longer than this threshold we log a warning",
48 )
49 logger = logging.getLogger(__name__)
50
51
52 class LockFileException(Exception):
53     """An exception related to lock files."""
54
55     pass
56
57
58 @dataclass
59 class LocalLockFileContents:
60     """The contents we'll write to each lock file."""
61
62     pid: int
63     """The pid of the process that holds the lock"""
64
65     commandline: str
66     """The commandline of the process that holds the lock"""
67
68     expiration_timestamp: Optional[float]
69     """When this lock will expire as seconds since Epoch"""
70
71
72 class LockFile(contextlib.AbstractContextManager):
73     """A file locking mechanism that has context-manager support so you
74     can use it in a with statement.  e.g.::
75
76         with LockFile('./foo.lock'):
77             # do a bunch of stuff... if the process dies we have a signal
78             # handler to do cleanup.  Other code (in this process or another)
79             # that tries to take the same lockfile will block.  There is also
80             # some logic for detecting stale locks.
81     """
82
83     def __init__(
84         self,
85         lockfile_path: str,
86         *,
87         do_signal_cleanup: bool = True,
88         expiration_timestamp: Optional[float] = None,
89         override_command: Optional[str] = None,
90     ) -> None:
91         """C'tor.
92
93         Args:
94             lockfile_path: path of the lockfile to acquire; may begin
95                 with zk: to indicate a path in zookeeper rather than
96                 on the local filesystem.  Note that zookeeper-based
97                 locks require an expiration_timestamp as the stale
98                 detection semantics are skipped for non-local locks.
99             do_signal_cleanup: handle SIGINT and SIGTERM events by
100                 releasing the lock before exiting
101             expiration_timestamp: when our lease on the lock should
102                 expire (as seconds since the Epoch).  None means the
103                 lock will not expire until we explicltly release it.
104                 Note that this is required for zookeeper based locks.
105             override_command: don't use argv to determine our commandline
106                 rather use this instead if provided.
107
108         Raises:
109             Exception: Zookeeper lock path without an expiration timestamp
110         """
111         self.is_locked: bool = False
112         self.lockfile: str = ""
113         self.zk_client: Optional[kazoo.client.KazooClient] = None
114         self.zk_lease: Optional[zookeeper.RenewableReleasableLease] = None
115
116         if lockfile_path.startswith("zk:"):
117             logger.debug("Lockfile is on Zookeeper.")
118             if expiration_timestamp is None:
119                 raise Exception("Zookeeper locks require an expiration timestamp")
120             self.lockfile = lockfile_path[3:]
121             if not self.lockfile.startswith("/leases"):
122                 self.lockfile = "/leases" + self.lockfile
123             self.zk_client = zookeeper.get_started_zk_client()
124         else:
125             logger.debug("Lockfile is local.")
126             self.lockfile = lockfile_path
127         self.locktime: Optional[float] = None
128         self.override_command: Optional[str] = override_command
129         if do_signal_cleanup:
130             signal.signal(signal.SIGINT, self._signal)
131             signal.signal(signal.SIGTERM, self._signal)
132         self.expiration_timestamp = expiration_timestamp
133
134     def locked(self) -> bool:
135         """Is it locked currently?"""
136         return self.is_locked
137
138     def _try_acquire_local_filesystem_lock(self) -> bool:
139         """Attempt to create the lockfile.  These flags cause os.open
140         to raise an OSError if the file already exists.
141         """
142         try:
143             logger.debug("Trying to acquire local lock %s.", self.lockfile)
144             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
145             with os.fdopen(fd, "a") as f:
146                 contents = self._construct_local_lockfile_contents()
147                 logger.debug(contents)
148                 f.write(contents)
149             return True
150         except OSError:
151             logger.warning("Couldn't acquire local lock %s.", self.lockfile)
152             return False
153
154     def _try_acquire_zk_lock(self) -> bool:
155         assert self.expiration_timestamp
156         identifier = f"Lockfile for pid={os.getpid()} on machine {platform.node()}"
157         if self.override_command:
158             identifier += f" running {self.override_command}"
159         expiration_delta_seconds_from_now = (
160             self.expiration_timestamp - datetime.datetime.now().timestamp()
161         )
162         self.zk_lease = zookeeper.RenewableReleasableLease(
163             self.zk_client,
164             self.lockfile,
165             datetime.timedelta(seconds=expiration_delta_seconds_from_now),
166             identifier,
167         )
168         return self.zk_lease
169
170     def try_acquire_lock_once(self) -> bool:
171         """Attempt to acquire the lock with no blocking.
172
173         Returns:
174             True if the lock was acquired and False otherwise.
175         """
176         success = False
177         if self.zk_client:
178             if self._try_acquire_zk_lock():
179                 success = True
180         else:
181             success = self._try_acquire_local_filesystem_lock()
182
183         if success:
184             self.locktime = datetime.datetime.now().timestamp()
185             logger.debug("Success; I own %s.", self.lockfile)
186             self.is_locked = True
187         return success
188
189     def acquire_with_retries(
190         self,
191         *,
192         initial_delay: float = 1.0,
193         backoff_factor: float = 2.0,
194         max_attempts: int = 5,
195     ) -> bool:
196         """Attempt to acquire the lock repeatedly with retries and backoffs.
197
198         Args:
199             initial_delay: how long to wait before retrying the first time
200             backoff_factor: a float >= 1.0 the multiples the current retry
201                 delay each subsequent time we attempt to acquire and fail
202                 to do so.
203             max_attempts: maximum number of times to try before giving up
204                 and failing.
205
206         Returns:
207             True if the lock was acquired and False otherwise.
208         """
209
210         @decorator_utils.retry_if_false(
211             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
212         )
213         def _try_acquire_lock_with_retries() -> bool:
214             success = self.try_acquire_lock_once()
215             if not success and os.path.exists(self.lockfile):
216                 self._detect_stale_lockfile()
217             return success
218
219         if os.path.exists(self.lockfile):
220             self._detect_stale_lockfile()
221         return _try_acquire_lock_with_retries()
222
223     def release(self) -> None:
224         """Release the lock"""
225
226         if not self.zk_client:
227             try:
228                 os.unlink(self.lockfile)
229             except Exception:
230                 logger.exception("Failed to unlink path %s; giving up.", self.lockfile)
231         else:
232             if self.zk_lease:
233                 self.zk_lease.release()
234             self.zk_client.stop()
235         self.is_locked = False
236
237     def __enter__(self):
238         if self.acquire_with_retries():
239             return self
240
241         msg = "Couldn't acquire lockfile; giving up."
242         if not self.zk_client:
243             raw_contents = self._read_lockfile()
244             if raw_contents:
245                 contents = LocalLockFileContents(**json.loads(raw_contents))
246                 msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
247         logger.warning(msg)
248         raise LockFileException(msg)
249
250     def __exit__(self, _, value, traceback) -> Literal[False]:
251         if self.locktime:
252             ts = datetime.datetime.now().timestamp()
253             duration = ts - self.locktime
254             warning_threshold = config.config[
255                 "lockfile_held_duration_warning_threshold"
256             ]
257             assert warning_threshold
258             if duration >= warning_threshold.total_seconds():
259                 # Note: describe duration briefly only does second-level granularity...
260                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
261                 msg = f"Held {self.lockfile} for {str_duration}"
262                 logger.warning(msg)
263                 warnings.warn(msg, stacklevel=2)
264         self.release()
265         return False
266
267     def __del__(self):
268         if self.is_locked:
269             self.release()
270
271     def _signal(self, *unused_args):
272         if self.is_locked:
273             self.release()
274
275     def _construct_local_lockfile_contents(self) -> str:
276         if not self.zk_client:
277             if self.override_command:
278                 cmd = self.override_command
279             else:
280                 cmd = " ".join(sys.argv)
281             contents = LocalLockFileContents(
282                 pid=os.getpid(),
283                 commandline=cmd,
284                 expiration_timestamp=self.expiration_timestamp,
285             )
286             return json.dumps(contents.__dict__)
287         raise Exception("Non-local lockfiles should not call this?!")
288
289     def _read_lockfile(self) -> Optional[str]:
290         if not self.zk_client:
291             try:
292                 with open(self.lockfile, "r") as rf:
293                     lines = rf.readlines()
294                     return lines[0]
295             except Exception:
296                 logger.exception(
297                     "Failed to read from path %s; giving up.", self.lockfile
298                 )
299         return None
300
301     def _detect_stale_lockfile(self) -> None:
302         if not self.zk_client:
303             raw_contents = self._read_lockfile()
304             if not raw_contents:
305                 return
306             contents = LocalLockFileContents(**json.loads(raw_contents))
307             logger.debug('Blocking lock contents="%s"', contents)
308
309             # Does the PID exist still?
310             try:
311                 os.kill(contents.pid, 0)
312             except OSError:
313                 logger.warning(
314                     "Lockfile %s's pid (%d) is stale; force acquiring...",
315                     self.lockfile,
316                     contents.pid,
317                 )
318                 self.release()
319
320             # Has the lock expiration expired?
321             if contents.expiration_timestamp is not None:
322                 now = datetime.datetime.now().timestamp()
323                 if now > contents.expiration_timestamp:
324                     logger.warning(
325                         "Lockfile %s's expiration time has passed; force acquiring",
326                         self.lockfile,
327                     )
328                     self.release()