More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a lockfile implementation I created for use with cronjobs
6 on my machine to prevent multiple copies of a job from running in
7 parallel.  When one job is running this code keeps a file on disk to
8 indicate a lock is held.  Other copies will fail to start if they
9 detect this lock until the lock is released.  There are provisions in
10 the code for timing out locks, cleaning up a lock when a signal is
11 received, gracefully retrying lock acquisition on failure, etc...
12 """
13
14 from __future__ import annotations
15
16 import contextlib
17 import datetime
18 import json
19 import logging
20 import os
21 import signal
22 import sys
23 import warnings
24 from dataclasses import dataclass
25 from typing import Literal, Optional
26
27 from pyutils import argparse_utils, config, decorator_utils
28 from pyutils.datetimez import datetime_utils
29
30 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
31 cfg.add_argument(
32     '--lockfile_held_duration_warning_threshold',
33     type=argparse_utils.valid_duration,
34     default=datetime.timedelta(60.0),
35     metavar='DURATION',
36     help='If a lock is held for longer than this threshold we log a warning',
37 )
38 logger = logging.getLogger(__name__)
39
40
41 class LockFileException(Exception):
42     """An exception related to lock files."""
43
44     pass
45
46
47 @dataclass
48 class LockFileContents:
49     """The contents we'll write to each lock file."""
50
51     pid: int
52     """The pid of the process that holds the lock"""
53
54     commandline: str
55     """The commandline of the process that holds the lock"""
56
57     expiration_timestamp: Optional[float]
58     """When this lock will expire as seconds since Epoch"""
59
60
61 class LockFile(contextlib.AbstractContextManager):
62     """A file locking mechanism that has context-manager support so you
63     can use it in a with statement.  e.g.::
64
65         with LockFile('./foo.lock'):
66             # do a bunch of stuff... if the process dies we have a signal
67             # handler to do cleanup.  Other code (in this process or another)
68             # that tries to take the same lockfile will block.  There is also
69             # some logic for detecting stale locks.
70     """
71
72     def __init__(
73         self,
74         lockfile_path: str,
75         *,
76         do_signal_cleanup: bool = True,
77         expiration_timestamp: Optional[float] = None,
78         override_command: Optional[str] = None,
79     ) -> None:
80         """C'tor.
81
82         Args:
83             lockfile_path: path of the lockfile to acquire
84             do_signal_cleanup: handle SIGINT and SIGTERM events by
85                 releasing the lock before exiting
86             expiration_timestamp: when our lease on the lock should
87                 expire (as seconds since the Epoch).  None means the
88                 lock will not expire until we explicltly release it.
89             override_command: don't use argv to determine our commandline
90                 rather use this instead if provided.
91         """
92         self.is_locked: bool = False
93         self.lockfile: str = lockfile_path
94         self.locktime: Optional[int] = None
95         self.override_command: Optional[str] = override_command
96         if do_signal_cleanup:
97             signal.signal(signal.SIGINT, self._signal)
98             signal.signal(signal.SIGTERM, self._signal)
99         self.expiration_timestamp = expiration_timestamp
100
101     def locked(self) -> bool:
102         """Is it locked currently?"""
103         return self.is_locked
104
105     def available(self) -> bool:
106         """Is it available currently?"""
107         return not os.path.exists(self.lockfile)
108
109     def try_acquire_lock_once(self) -> bool:
110         """Attempt to acquire the lock with no blocking.
111
112         Returns:
113             True if the lock was acquired and False otherwise.
114         """
115         logger.debug("Trying to acquire %s.", self.lockfile)
116         try:
117             # Attempt to create the lockfile.  These flags cause
118             # os.open to raise an OSError if the file already
119             # exists.
120             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
121             with os.fdopen(fd, "a") as f:
122                 contents = self._get_lockfile_contents()
123                 logger.debug(contents)
124                 f.write(contents)
125             logger.debug('Success; I own %s.', self.lockfile)
126             self.is_locked = True
127             return True
128         except OSError:
129             pass
130         logger.warning('Couldn\'t acquire %s.', self.lockfile)
131         return False
132
133     def acquire_with_retries(
134         self,
135         *,
136         initial_delay: float = 1.0,
137         backoff_factor: float = 2.0,
138         max_attempts=5,
139     ) -> bool:
140         """Attempt to acquire the lock repeatedly with retries and backoffs.
141
142         Args:
143             initial_delay: how long to wait before retrying the first time
144             backoff_factor: a float >= 1.0 the multiples the current retry
145                 delay each subsequent time we attempt to acquire and fail
146                 to do so.
147             max_attempts: maximum number of times to try before giving up
148                 and failing.
149
150         Returns:
151             True if the lock was acquired and False otherwise.
152         """
153
154         @decorator_utils.retry_if_false(
155             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
156         )
157         def _try_acquire_lock_with_retries() -> bool:
158             success = self.try_acquire_lock_once()
159             if not success and os.path.exists(self.lockfile):
160                 self._detect_stale_lockfile()
161             return success
162
163         if os.path.exists(self.lockfile):
164             self._detect_stale_lockfile()
165         return _try_acquire_lock_with_retries()
166
167     def release(self) -> None:
168         """Release the lock"""
169         try:
170             os.unlink(self.lockfile)
171         except Exception as e:
172             logger.exception(e)
173         self.is_locked = False
174
175     def __enter__(self):
176         if self.acquire_with_retries():
177             self.locktime = datetime.datetime.now().timestamp()
178             return self
179         msg = f"Couldn't acquire {self.lockfile}; giving up."
180         logger.warning(msg)
181         raise LockFileException(msg)
182
183     def __exit__(self, _, value, traceback) -> Literal[False]:
184         if self.locktime:
185             ts = datetime.datetime.now().timestamp()
186             duration = ts - self.locktime
187             if (
188                 duration
189                 >= config.config[
190                     'lockfile_held_duration_warning_threshold'
191                 ].total_seconds()
192             ):
193                 # Note: describe duration briefly only does 1s granularity...
194                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
195                 msg = f'Held {self.lockfile} for {str_duration}'
196                 logger.warning(msg)
197                 warnings.warn(msg, stacklevel=2)
198         self.release()
199         return False
200
201     def __del__(self):
202         if self.is_locked:
203             self.release()
204
205     def _signal(self, *args):
206         if self.is_locked:
207             self.release()
208
209     def _get_lockfile_contents(self) -> str:
210         if self.override_command:
211             cmd = self.override_command
212         else:
213             cmd = ' '.join(sys.argv)
214         contents = LockFileContents(
215             pid=os.getpid(),
216             commandline=cmd,
217             expiration_timestamp=self.expiration_timestamp,
218         )
219         return json.dumps(contents.__dict__)
220
221     def _detect_stale_lockfile(self) -> None:
222         try:
223             with open(self.lockfile, 'r') as rf:
224                 lines = rf.readlines()
225                 if len(lines) == 1:
226                     line = lines[0]
227                     line_dict = json.loads(line)
228                     contents = LockFileContents(**line_dict)
229                     logger.debug('Blocking lock contents="%s"', contents)
230
231                     # Does the PID exist still?
232                     try:
233                         os.kill(contents.pid, 0)
234                     except OSError:
235                         logger.warning(
236                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
237                             self.lockfile,
238                             contents.pid,
239                         )
240                         self.release()
241
242                     # Has the lock expiration expired?
243                     if contents.expiration_timestamp is not None:
244                         now = datetime.datetime.now().timestamp()
245                         if now > contents.expiration_timestamp:
246                             logger.warning(
247                                 'Lockfile %s\'s expiration time has passed; force acquiring',
248                                 self.lockfile,
249                             )
250                             self.release()
251         except Exception:
252             pass  # If the lockfile doesn't exist or disappears, good.