Make the processed cmdline available to callers.
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.  When one job is running this code keeps a file on disk to
8 indicate a lock is held.  Other copies will fail to start if they
9 detect this lock until the lock is released.  There are provisions in
10 the code for timing out locks, cleaning up a lock when a signal is
11 received, gracefully retrying lock acquisition on failure, etc...
12 """
13
14 from __future__ import annotations
15
16 import contextlib
17 import datetime
18 import json
19 import logging
20 import os
21 import signal
22 import sys
23 import warnings
24 from dataclasses import dataclass
25 from typing import Literal, Optional
26
27 from pyutils import argparse_utils, config, decorator_utils
28 from pyutils.datetimes import datetime_utils
29
30 cfg = config.add_commandline_args(f"Lockfile ({__file__})", "Args related to lockfiles")
31 cfg.add_argument(
32     "--lockfile_held_duration_warning_threshold",
33     type=argparse_utils.valid_duration,
34     default=datetime.timedelta(60.0),
35     metavar="DURATION",
36     help="If a lock is held for longer than this threshold we log a warning",
37 )
38 logger = logging.getLogger(__name__)
39
40
41 class LockFileException(Exception):
42     """An exception related to lock files."""
43
44     pass
45
46
47 @dataclass
48 class LockFileContents:
49     """The contents we'll write to each lock file."""
50
51     pid: int
52     """The pid of the process that holds the lock"""
53
54     commandline: str
55     """The commandline of the process that holds the lock"""
56
57     expiration_timestamp: Optional[float]
58     """When this lock will expire as seconds since Epoch"""
59
60
61 class LockFile(contextlib.AbstractContextManager):
62     """A file locking mechanism that has context-manager support so you
63     can use it in a with statement.  e.g.::
64
65         with LockFile('./foo.lock'):
66             # do a bunch of stuff... if the process dies we have a signal
67             # handler to do cleanup.  Other code (in this process or another)
68             # that tries to take the same lockfile will block.  There is also
69             # some logic for detecting stale locks.
70     """
71
72     def __init__(
73         self,
74         lockfile_path: str,
75         *,
76         do_signal_cleanup: bool = True,
77         expiration_timestamp: Optional[float] = None,
78         override_command: Optional[str] = None,
79     ) -> None:
80         """C'tor.
81
82         Args:
83             lockfile_path: path of the lockfile to acquire
84             do_signal_cleanup: handle SIGINT and SIGTERM events by
85                 releasing the lock before exiting
86             expiration_timestamp: when our lease on the lock should
87                 expire (as seconds since the Epoch).  None means the
88                 lock will not expire until we explicltly release it.
89             override_command: don't use argv to determine our commandline
90                 rather use this instead if provided.
91         """
92         self.is_locked: bool = False
93         self.lockfile: str = lockfile_path
94         self.locktime: Optional[float] = None
95         self.override_command: Optional[str] = override_command
96         if do_signal_cleanup:
97             signal.signal(signal.SIGINT, self._signal)
98             signal.signal(signal.SIGTERM, self._signal)
99         self.expiration_timestamp = expiration_timestamp
100
101     def locked(self) -> bool:
102         """Is it locked currently?"""
103         return self.is_locked
104
105     def available(self) -> bool:
106         """Is it available currently?"""
107         return not os.path.exists(self.lockfile)
108
109     def try_acquire_lock_once(self) -> bool:
110         """Attempt to acquire the lock with no blocking.
111
112         Returns:
113             True if the lock was acquired and False otherwise.
114         """
115         logger.debug("Trying to acquire %s.", self.lockfile)
116         try:
117             # Attempt to create the lockfile.  These flags cause
118             # os.open to raise an OSError if the file already
119             # exists.
120             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
121             with os.fdopen(fd, "a") as f:
122                 contents = self._get_lockfile_contents()
123                 logger.debug(contents)
124                 f.write(contents)
125             self.locktime = datetime.datetime.now().timestamp()
126             logger.debug("Success; I own %s.", self.lockfile)
127             self.is_locked = True
128             return True
129         except OSError:
130             pass
131         logger.warning("Couldn't acquire %s.", self.lockfile)
132         return False
133
134     def acquire_with_retries(
135         self,
136         *,
137         initial_delay: float = 1.0,
138         backoff_factor: float = 2.0,
139         max_attempts=5,
140     ) -> bool:
141         """Attempt to acquire the lock repeatedly with retries and backoffs.
142
143         Args:
144             initial_delay: how long to wait before retrying the first time
145             backoff_factor: a float >= 1.0 the multiples the current retry
146                 delay each subsequent time we attempt to acquire and fail
147                 to do so.
148             max_attempts: maximum number of times to try before giving up
149                 and failing.
150
151         Returns:
152             True if the lock was acquired and False otherwise.
153         """
154
155         @decorator_utils.retry_if_false(
156             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
157         )
158         def _try_acquire_lock_with_retries() -> bool:
159             success = self.try_acquire_lock_once()
160             if not success and os.path.exists(self.lockfile):
161                 self._detect_stale_lockfile()
162             return success
163
164         if os.path.exists(self.lockfile):
165             self._detect_stale_lockfile()
166         return _try_acquire_lock_with_retries()
167
168     def release(self) -> None:
169         """Release the lock"""
170         try:
171             os.unlink(self.lockfile)
172         except Exception as e:
173             logger.exception(e)
174         self.is_locked = False
175
176     def __enter__(self):
177         if self.acquire_with_retries():
178             return self
179         raw_contents = self._read_lockfile()
180         if raw_contents:
181             contents = LockFileContents(**json.loads(raw_contents))
182             msg = f"Couldn't acquire {self.lockfile} after several attempts.  It's held by pid={contents.pid} ({contents.commandline}).  Giving up."
183         else:
184             msg = "Couldn't acquire lockfile; giving up."
185         logger.warning(msg)
186         raise LockFileException(msg)
187
188     def __exit__(self, _, value, traceback) -> Literal[False]:
189         if self.locktime:
190             ts = datetime.datetime.now().timestamp()
191             duration = ts - self.locktime
192             if (
193                 duration
194                 >= config.config[
195                     "lockfile_held_duration_warning_threshold"
196                 ].total_seconds()
197             ):
198                 # Note: describe duration briefly only does 1s granularity...
199                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
200                 msg = f"Held {self.lockfile} for {str_duration}"
201                 logger.warning(msg)
202                 warnings.warn(msg, stacklevel=2)
203         self.release()
204         return False
205
206     def __del__(self):
207         if self.is_locked:
208             self.release()
209
210     def _signal(self, *args):
211         if self.is_locked:
212             self.release()
213
214     def _get_lockfile_contents(self) -> str:
215         if self.override_command:
216             cmd = self.override_command
217         else:
218             cmd = " ".join(sys.argv)
219         contents = LockFileContents(
220             pid=os.getpid(),
221             commandline=cmd,
222             expiration_timestamp=self.expiration_timestamp,
223         )
224         return json.dumps(contents.__dict__)
225
226     def _read_lockfile(self) -> Optional[str]:
227         try:
228             with open(self.lockfile, "r") as rf:
229                 lines = rf.readlines()
230                 return lines[0]
231         except Exception as e:
232             logger.exception(e)
233         return None
234
235     def _detect_stale_lockfile(self) -> None:
236         raw_contents = self._read_lockfile()
237         if not raw_contents:
238             return
239
240         contents = LockFileContents(**json.loads(raw_contents))
241         logger.debug('Blocking lock contents="%s"', contents)
242
243         # Does the PID exist still?
244         try:
245             os.kill(contents.pid, 0)
246         except OSError:
247             logger.warning(
248                 "Lockfile %s's pid (%d) is stale; force acquiring...",
249                 self.lockfile,
250                 contents.pid,
251             )
252             self.release()
253
254         # Has the lock expiration expired?
255         if contents.expiration_timestamp is not None:
256             now = datetime.datetime.now().timestamp()
257             if now > contents.expiration_timestamp:
258                 logger.warning(
259                     "Lockfile %s's expiration time has passed; force acquiring",
260                     self.lockfile,
261                 )
262                 self.release()