Improve error message for logs.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.  When one job is running this code keeps a file on disk to
8 indicate a lock is held.  Other copies will fail to start if they
9 detect this lock until the lock is released.  There are provisions in
10 the code for timing out locks, cleaning up a lock when a signal is
11 received, gracefully retrying lock acquisition on failure, etc...
12 """
13
14 from __future__ import annotations
15
16 import contextlib
17 import datetime
18 import json
19 import logging
20 import os
21 import signal
22 import sys
23 import warnings
24 from dataclasses import dataclass
25 from typing import Literal, Optional
26
27 from pyutils import argparse_utils, config, decorator_utils
28 from pyutils.datetimes import datetime_utils
29
30 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
31 cfg.add_argument(
32     "--lockfile_held_duration_warning_threshold",
33     type=argparse_utils.valid_duration,
34     default=datetime.timedelta(60.0),
35     metavar="DURATION",
36     help="If a lock is held for longer than this threshold we log a warning",
37 )
38 logger = logging.getLogger(__name__)
39
40
41 class LockFileException(Exception):
42     """An exception related to lock files."""
43
44     pass
45
46
47 @dataclass
48 class LockFileContents:
49     """The contents we'll write to each lock file."""
50
51     pid: int
52     """The pid of the process that holds the lock"""
53
54     commandline: str
55     """The commandline of the process that holds the lock"""
56
57     expiration_timestamp: Optional[float]
58     """When this lock will expire as seconds since Epoch"""
59
60
61 class LockFile(contextlib.AbstractContextManager):
62     """A file locking mechanism that has context-manager support so you
63     can use it in a with statement.  e.g.::
64
65         with LockFile('./foo.lock'):
66             # do a bunch of stuff... if the process dies we have a signal
67             # handler to do cleanup.  Other code (in this process or another)
68             # that tries to take the same lockfile will block.  There is also
69             # some logic for detecting stale locks.
70     """
71
72     def __init__(
73         self,
74         lockfile_path: str,
75         *,
76         do_signal_cleanup: bool = True,
77         expiration_timestamp: Optional[float] = None,
78         override_command: Optional[str] = None,
79     ) -> None:
80         """C'tor.
81
82         Args:
83             lockfile_path: path of the lockfile to acquire
84             do_signal_cleanup: handle SIGINT and SIGTERM events by
85                 releasing the lock before exiting
86             expiration_timestamp: when our lease on the lock should
87                 expire (as seconds since the Epoch).  None means the
88                 lock will not expire until we explicltly release it.
89             override_command: don't use argv to determine our commandline
90                 rather use this instead if provided.
91         """
92         self.is_locked: bool = False
93         self.lockfile: str = lockfile_path
94         self.locktime: Optional[float] = None
95         self.override_command: Optional[str] = override_command
96         if do_signal_cleanup:
97             signal.signal(signal.SIGINT, self._signal)
98             signal.signal(signal.SIGTERM, self._signal)
99         self.expiration_timestamp = expiration_timestamp
100
101     def locked(self) -> bool:
102         """Is it locked currently?"""
103         return self.is_locked
104
105     def available(self) -> bool:
106         """Is it available currently?"""
107         return not os.path.exists(self.lockfile)
108
109     def try_acquire_lock_once(self) -> bool:
110         """Attempt to acquire the lock with no blocking.
111
112         Returns:
113             True if the lock was acquired and False otherwise.
114         """
115         logger.debug("Trying to acquire %s.", self.lockfile)
116         try:
117             # Attempt to create the lockfile.  These flags cause
118             # os.open to raise an OSError if the file already
119             # exists.
120             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
121             with os.fdopen(fd, "a") as f:
122                 contents = self._get_lockfile_contents()
123                 logger.debug(contents)
124                 f.write(contents)
125             self.locktime = datetime.datetime.now().timestamp()
126             logger.debug("Success; I own %s.", self.lockfile)
127             self.is_locked = True
128             return True
129         except OSError:
130             pass
131         logger.warning("Couldn't acquire %s.", self.lockfile)
132         return False
133
134     def acquire_with_retries(
135         self,
136         *,
137         initial_delay: float = 1.0,
138         backoff_factor: float = 2.0,
139         max_attempts=5,
140     ) -> bool:
141         """Attempt to acquire the lock repeatedly with retries and backoffs.
142
143         Args:
144             initial_delay: how long to wait before retrying the first time
145             backoff_factor: a float >= 1.0 the multiples the current retry
146                 delay each subsequent time we attempt to acquire and fail
147                 to do so.
148             max_attempts: maximum number of times to try before giving up
149                 and failing.
150
151         Returns:
152             True if the lock was acquired and False otherwise.
153         """
154
155         @decorator_utils.retry_if_false(
156             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
157         )
158         def _try_acquire_lock_with_retries() -> bool:
159             success = self.try_acquire_lock_once()
160             if not success and os.path.exists(self.lockfile):
161                 self._detect_stale_lockfile()
162             return success
163
164         if os.path.exists(self.lockfile):
165             self._detect_stale_lockfile()
166         return _try_acquire_lock_with_retries()
167
168     def release(self) -> None:
169         """Release the lock"""
170         try:
171             os.unlink(self.lockfile)
172         except Exception as e:
173             logger.exception(e)
174         self.is_locked = False
175
176     def __enter__(self):
177         if self.acquire_with_retries():
178             return self
179         contents = self._get_lockfile_contents()
180         msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
181         logger.warning(msg)
182         raise LockFileException(msg)
183
184     def __exit__(self, _, value, traceback) -> Literal[False]:
185         if self.locktime:
186             ts = datetime.datetime.now().timestamp()
187             duration = ts - self.locktime
188             if (
189                 duration
190                 >= config.config[
191                     "lockfile_held_duration_warning_threshold"
192                 ].total_seconds()
193             ):
194                 # Note: describe duration briefly only does 1s granularity...
195                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
196                 msg = f"Held {self.lockfile} for {str_duration}"
197                 logger.warning(msg)
198                 warnings.warn(msg, stacklevel=2)
199         self.release()
200         return False
201
202     def __del__(self):
203         if self.is_locked:
204             self.release()
205
206     def _signal(self, *args):
207         if self.is_locked:
208             self.release()
209
210     def _get_lockfile_contents(self) -> str:
211         if self.override_command:
212             cmd = self.override_command
213         else:
214             cmd = " ".join(sys.argv)
215         contents = LockFileContents(
216             pid=os.getpid(),
217             commandline=cmd,
218             expiration_timestamp=self.expiration_timestamp,
219         )
220         return json.dumps(contents.__dict__)
221
222     def _detect_stale_lockfile(self) -> None:
223         try:
224             with open(self.lockfile, "r") as rf:
225                 lines = rf.readlines()
226                 if len(lines) == 1:
227                     line = lines[0]
228                     line_dict = json.loads(line)
229                     contents = LockFileContents(**line_dict)
230                     logger.debug('Blocking lock contents="%s"', contents)
231
232                     # Does the PID exist still?
233                     try:
234                         os.kill(contents.pid, 0)
235                     except OSError:
236                         logger.warning(
237                             "Lockfile %s's pid (%d) is stale; force acquiring...",
238                             self.lockfile,
239                             contents.pid,
240                         )
241                         self.release()
242
243                     # Has the lock expiration expired?
244                     if contents.expiration_timestamp is not None:
245                         now = datetime.datetime.now().timestamp()
246                         if now > contents.expiration_timestamp:
247                             logger.warning(
248                                 "Lockfile %s's expiration time has passed; force acquiring",
249                                 self.lockfile,
250                             )
251                             self.release()
252         except Exception:
253             pass  # If the lockfile doesn't exist or disappears, good.