Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """File-based locking helper."""
6
7 from __future__ import annotations
8 import contextlib
9 import datetime
10 import json
11 import logging
12 import os
13 import signal
14 import sys
15 import warnings
16 from dataclasses import dataclass
17 from typing import Literal, Optional
18
19 import config
20 import datetime_utils
21 import decorator_utils
22
23 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
24 cfg.add_argument(
25     '--lockfile_held_duration_warning_threshold_sec',
26     type=float,
27     default=60.0,
28     metavar='SECONDS',
29     help='If a lock is held for longer than this threshold we log a warning',
30 )
31 logger = logging.getLogger(__name__)
32
33
34 class LockFileException(Exception):
35     """An exception related to lock files."""
36
37     pass
38
39
40 @dataclass
41 class LockFileContents:
42     """The contents we'll write to each lock file."""
43
44     pid: int
45     """The pid of the process that holds the lock"""
46
47     commandline: str
48     """The commandline of the process that holds the lock"""
49
50     expiration_timestamp: Optional[float]
51     """When this lock will expire as seconds since Epoch"""
52
53
54 class LockFile(contextlib.AbstractContextManager):
55     """A file locking mechanism that has context-manager support so you
56     can use it in a with statement.  e.g.::
57
58         with LockFile('./foo.lock'):
59             # do a bunch of stuff... if the process dies we have a signal
60             # handler to do cleanup.  Other code (in this process or another)
61             # that tries to take the same lockfile will block.  There is also
62             # some logic for detecting stale locks.
63     """
64
65     def __init__(
66         self,
67         lockfile_path: str,
68         *,
69         do_signal_cleanup: bool = True,
70         expiration_timestamp: Optional[float] = None,
71         override_command: Optional[str] = None,
72     ) -> None:
73         """C'tor.
74
75         Args:
76             lockfile_path: path of the lockfile to acquire
77             do_signal_cleanup: handle SIGINT and SIGTERM events by
78                 releasing the lock before exiting
79             expiration_timestamp: when our lease on the lock should
80                 expire (as seconds since the Epoch).  None means the
81                 lock will not expire until we explicltly release it.
82             override_command: don't use argv to determine our commandline
83                 rather use this instead if provided.
84         """
85         self.is_locked: bool = False
86         self.lockfile: str = lockfile_path
87         self.locktime: Optional[int] = None
88         self.override_command: Optional[str] = override_command
89         if do_signal_cleanup:
90             signal.signal(signal.SIGINT, self._signal)
91             signal.signal(signal.SIGTERM, self._signal)
92         self.expiration_timestamp = expiration_timestamp
93
94     def locked(self):
95         """Is it locked currently?"""
96         return self.is_locked
97
98     def available(self):
99         """Is it available currently?"""
100         return not os.path.exists(self.lockfile)
101
102     def try_acquire_lock_once(self) -> bool:
103         """Attempt to acquire the lock with no blocking.
104
105         Returns:
106             True if the lock was acquired and False otherwise.
107         """
108         logger.debug("Trying to acquire %s.", self.lockfile)
109         try:
110             # Attempt to create the lockfile.  These flags cause
111             # os.open to raise an OSError if the file already
112             # exists.
113             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
114             with os.fdopen(fd, "a") as f:
115                 contents = self._get_lockfile_contents()
116                 logger.debug(contents)
117                 f.write(contents)
118             logger.debug('Success; I own %s.', self.lockfile)
119             self.is_locked = True
120             return True
121         except OSError:
122             pass
123         logger.warning('Couldn\'t acquire %s.', self.lockfile)
124         return False
125
126     def acquire_with_retries(
127         self,
128         *,
129         initial_delay: float = 1.0,
130         backoff_factor: float = 2.0,
131         max_attempts=5,
132     ) -> bool:
133         """Attempt to acquire the lock repeatedly with retries and backoffs.
134
135         Args:
136             initial_delay: how long to wait before retrying the first time
137             backoff_factor: a float >= 1.0 the multiples the current retry
138                 delay each subsequent time we attempt to acquire and fail
139                 to do so.
140             max_attempts: maximum number of times to try before giving up
141                 and failing.
142
143         Returns:
144             True if the lock was acquired and False otherwise.
145         """
146
147         @decorator_utils.retry_if_false(
148             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
149         )
150         def _try_acquire_lock_with_retries() -> bool:
151             success = self.try_acquire_lock_once()
152             if not success and os.path.exists(self.lockfile):
153                 self._detect_stale_lockfile()
154             return success
155
156         if os.path.exists(self.lockfile):
157             self._detect_stale_lockfile()
158         return _try_acquire_lock_with_retries()
159
160     def release(self):
161         """Release the lock"""
162         try:
163             os.unlink(self.lockfile)
164         except Exception as e:
165             logger.exception(e)
166         self.is_locked = False
167
168     def __enter__(self):
169         if self.acquire_with_retries():
170             self.locktime = datetime.datetime.now().timestamp()
171             return self
172         msg = f"Couldn't acquire {self.lockfile}; giving up."
173         logger.warning(msg)
174         raise LockFileException(msg)
175
176     def __exit__(self, _, value, traceback) -> Literal[False]:
177         if self.locktime:
178             ts = datetime.datetime.now().timestamp()
179             duration = ts - self.locktime
180             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
181                 # Note: describe duration briefly only does 1s granularity...
182                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
183                 msg = f'Held {self.lockfile} for {str_duration}'
184                 logger.warning(msg)
185                 warnings.warn(msg, stacklevel=2)
186         self.release()
187         return False
188
189     def __del__(self):
190         if self.is_locked:
191             self.release()
192
193     def _signal(self, *args):
194         if self.is_locked:
195             self.release()
196
197     def _get_lockfile_contents(self) -> str:
198         if self.override_command:
199             cmd = self.override_command
200         else:
201             cmd = ' '.join(sys.argv)
202         contents = LockFileContents(
203             pid=os.getpid(),
204             commandline=cmd,
205             expiration_timestamp=self.expiration_timestamp,
206         )
207         return json.dumps(contents.__dict__)
208
209     def _detect_stale_lockfile(self) -> None:
210         try:
211             with open(self.lockfile, 'r') as rf:
212                 lines = rf.readlines()
213                 if len(lines) == 1:
214                     line = lines[0]
215                     line_dict = json.loads(line)
216                     contents = LockFileContents(**line_dict)
217                     logger.debug('Blocking lock contents="%s"', contents)
218
219                     # Does the PID exist still?
220                     try:
221                         os.kill(contents.pid, 0)
222                     except OSError:
223                         logger.warning(
224                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
225                             self.lockfile,
226                             contents.pid,
227                         )
228                         self.release()
229
230                     # Has the lock expiration expired?
231                     if contents.expiration_timestamp is not None:
232                         now = datetime.datetime.now().timestamp()
233                         if now > contents.expiration_timestamp:
234                             logger.warning(
235                                 'Lockfile %s\'s expiration time has passed; force acquiring',
236                                 self.lockfile,
237                             )
238                             self.release()
239         except Exception:
240             pass  # If the lockfile doesn't exist or disappears, good.