ee7346bf1042e0a0b85e36273c0ef8182c05f7b3
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """File-based locking helper."""
6
7 from __future__ import annotations
8
9 import contextlib
10 import datetime
11 import json
12 import logging
13 import os
14 import signal
15 import sys
16 import warnings
17 from dataclasses import dataclass
18 from typing import Literal, Optional
19
20 from pyutils import argparse_utils, config, decorator_utils
21 from pyutils.datetimez import datetime_utils
22
23 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
24 cfg.add_argument(
25     '--lockfile_held_duration_warning_threshold',
26     type=argparse_utils.valid_duration,
27     default=datetime.timedelta(60.0),
28     metavar='DURATION',
29     help='If a lock is held for longer than this threshold we log a warning',
30 )
31 logger = logging.getLogger(__name__)
32
33
34 class LockFileException(Exception):
35     """An exception related to lock files."""
36
37     pass
38
39
40 @dataclass
41 class LockFileContents:
42     """The contents we'll write to each lock file."""
43
44     pid: int
45     """The pid of the process that holds the lock"""
46
47     commandline: str
48     """The commandline of the process that holds the lock"""
49
50     expiration_timestamp: Optional[float]
51     """When this lock will expire as seconds since Epoch"""
52
53
54 class LockFile(contextlib.AbstractContextManager):
55     """A file locking mechanism that has context-manager support so you
56     can use it in a with statement.  e.g.::
57
58         with LockFile('./foo.lock'):
59             # do a bunch of stuff... if the process dies we have a signal
60             # handler to do cleanup.  Other code (in this process or another)
61             # that tries to take the same lockfile will block.  There is also
62             # some logic for detecting stale locks.
63     """
64
65     def __init__(
66         self,
67         lockfile_path: str,
68         *,
69         do_signal_cleanup: bool = True,
70         expiration_timestamp: Optional[float] = None,
71         override_command: Optional[str] = None,
72     ) -> None:
73         """C'tor.
74
75         Args:
76             lockfile_path: path of the lockfile to acquire
77             do_signal_cleanup: handle SIGINT and SIGTERM events by
78                 releasing the lock before exiting
79             expiration_timestamp: when our lease on the lock should
80                 expire (as seconds since the Epoch).  None means the
81                 lock will not expire until we explicltly release it.
82             override_command: don't use argv to determine our commandline
83                 rather use this instead if provided.
84         """
85         self.is_locked: bool = False
86         self.lockfile: str = lockfile_path
87         self.locktime: Optional[int] = None
88         self.override_command: Optional[str] = override_command
89         if do_signal_cleanup:
90             signal.signal(signal.SIGINT, self._signal)
91             signal.signal(signal.SIGTERM, self._signal)
92         self.expiration_timestamp = expiration_timestamp
93
94     def locked(self):
95         """Is it locked currently?"""
96         return self.is_locked
97
98     def available(self):
99         """Is it available currently?"""
100         return not os.path.exists(self.lockfile)
101
102     def try_acquire_lock_once(self) -> bool:
103         """Attempt to acquire the lock with no blocking.
104
105         Returns:
106             True if the lock was acquired and False otherwise.
107         """
108         logger.debug("Trying to acquire %s.", self.lockfile)
109         try:
110             # Attempt to create the lockfile.  These flags cause
111             # os.open to raise an OSError if the file already
112             # exists.
113             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
114             with os.fdopen(fd, "a") as f:
115                 contents = self._get_lockfile_contents()
116                 logger.debug(contents)
117                 f.write(contents)
118             logger.debug('Success; I own %s.', self.lockfile)
119             self.is_locked = True
120             return True
121         except OSError:
122             pass
123         logger.warning('Couldn\'t acquire %s.', self.lockfile)
124         return False
125
126     def acquire_with_retries(
127         self,
128         *,
129         initial_delay: float = 1.0,
130         backoff_factor: float = 2.0,
131         max_attempts=5,
132     ) -> bool:
133         """Attempt to acquire the lock repeatedly with retries and backoffs.
134
135         Args:
136             initial_delay: how long to wait before retrying the first time
137             backoff_factor: a float >= 1.0 the multiples the current retry
138                 delay each subsequent time we attempt to acquire and fail
139                 to do so.
140             max_attempts: maximum number of times to try before giving up
141                 and failing.
142
143         Returns:
144             True if the lock was acquired and False otherwise.
145         """
146
147         @decorator_utils.retry_if_false(
148             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
149         )
150         def _try_acquire_lock_with_retries() -> bool:
151             success = self.try_acquire_lock_once()
152             if not success and os.path.exists(self.lockfile):
153                 self._detect_stale_lockfile()
154             return success
155
156         if os.path.exists(self.lockfile):
157             self._detect_stale_lockfile()
158         return _try_acquire_lock_with_retries()
159
160     def release(self):
161         """Release the lock"""
162         try:
163             os.unlink(self.lockfile)
164         except Exception as e:
165             logger.exception(e)
166         self.is_locked = False
167
168     def __enter__(self):
169         if self.acquire_with_retries():
170             self.locktime = datetime.datetime.now().timestamp()
171             return self
172         msg = f"Couldn't acquire {self.lockfile}; giving up."
173         logger.warning(msg)
174         raise LockFileException(msg)
175
176     def __exit__(self, _, value, traceback) -> Literal[False]:
177         if self.locktime:
178             ts = datetime.datetime.now().timestamp()
179             duration = ts - self.locktime
180             if (
181                 duration
182                 >= config.config[
183                     'lockfile_held_duration_warning_threshold'
184                 ].total_seconds()
185             ):
186                 # Note: describe duration briefly only does 1s granularity...
187                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
188                 msg = f'Held {self.lockfile} for {str_duration}'
189                 logger.warning(msg)
190                 warnings.warn(msg, stacklevel=2)
191         self.release()
192         return False
193
194     def __del__(self):
195         if self.is_locked:
196             self.release()
197
198     def _signal(self, *args):
199         if self.is_locked:
200             self.release()
201
202     def _get_lockfile_contents(self) -> str:
203         if self.override_command:
204             cmd = self.override_command
205         else:
206             cmd = ' '.join(sys.argv)
207         contents = LockFileContents(
208             pid=os.getpid(),
209             commandline=cmd,
210             expiration_timestamp=self.expiration_timestamp,
211         )
212         return json.dumps(contents.__dict__)
213
214     def _detect_stale_lockfile(self) -> None:
215         try:
216             with open(self.lockfile, 'r') as rf:
217                 lines = rf.readlines()
218                 if len(lines) == 1:
219                     line = lines[0]
220                     line_dict = json.loads(line)
221                     contents = LockFileContents(**line_dict)
222                     logger.debug('Blocking lock contents="%s"', contents)
223
224                     # Does the PID exist still?
225                     try:
226                         os.kill(contents.pid, 0)
227                     except OSError:
228                         logger.warning(
229                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
230                             self.lockfile,
231                             contents.pid,
232                         )
233                         self.release()
234
235                     # Has the lock expiration expired?
236                     if contents.expiration_timestamp is not None:
237                         now = datetime.datetime.now().timestamp()
238                         if now > contents.expiration_timestamp:
239                             logger.warning(
240                                 'Lockfile %s\'s expiration time has passed; force acquiring',
241                                 self.lockfile,
242                             )
243                             self.release()
244         except Exception:
245             pass  # If the lockfile doesn't exist or disappears, good.