Cleanup more contextlib.AbstractContextManagers and Literal[False]s.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 """File-based locking helper."""
4
5 from __future__ import annotations
6
7 import contextlib
8 import datetime
9 import json
10 import logging
11 import os
12 import signal
13 import sys
14 import warnings
15 from dataclasses import dataclass
16 from typing import Literal, Optional
17
18 import config
19 import datetime_utils
20 import decorator_utils
21
22 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
23 cfg.add_argument(
24     '--lockfile_held_duration_warning_threshold_sec',
25     type=float,
26     default=60.0,
27     metavar='SECONDS',
28     help='If a lock is held for longer than this threshold we log a warning',
29 )
30 logger = logging.getLogger(__name__)
31
32
33 class LockFileException(Exception):
34     """An exception related to lock files."""
35
36     pass
37
38
39 @dataclass
40 class LockFileContents:
41     """The contents we'll write to each lock file."""
42
43     pid: int
44     commandline: str
45     expiration_timestamp: Optional[float]
46
47
48 class LockFile(contextlib.AbstractContextManager):
49     """A file locking mechanism that has context-manager support so you
50     can use it in a with statement.  e.g.
51
52     with LockFile('./foo.lock'):
53         # do a bunch of stuff... if the process dies we have a signal
54         # handler to do cleanup.  Other code (in this process or another)
55         # that tries to take the same lockfile will block.  There is also
56         # some logic for detecting stale locks.
57
58     """
59
60     def __init__(
61         self,
62         lockfile_path: str,
63         *,
64         do_signal_cleanup: bool = True,
65         expiration_timestamp: Optional[float] = None,
66         override_command: Optional[str] = None,
67     ) -> None:
68         self.is_locked: bool = False
69         self.lockfile: str = lockfile_path
70         self.locktime: Optional[int] = None
71         self.override_command: Optional[str] = override_command
72         if do_signal_cleanup:
73             signal.signal(signal.SIGINT, self._signal)
74             signal.signal(signal.SIGTERM, self._signal)
75         self.expiration_timestamp = expiration_timestamp
76
77     def locked(self):
78         return self.is_locked
79
80     def available(self):
81         return not os.path.exists(self.lockfile)
82
83     def try_acquire_lock_once(self) -> bool:
84         logger.debug("Trying to acquire %s.", self.lockfile)
85         try:
86             # Attempt to create the lockfile.  These flags cause
87             # os.open to raise an OSError if the file already
88             # exists.
89             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
90             with os.fdopen(fd, "a") as f:
91                 contents = self._get_lockfile_contents()
92                 logger.debug(contents)
93                 f.write(contents)
94             logger.debug('Success; I own %s.', self.lockfile)
95             self.is_locked = True
96             return True
97         except OSError:
98             pass
99         logger.warning('Couldn\'t acquire %s.', self.lockfile)
100         return False
101
102     def acquire_with_retries(
103         self,
104         *,
105         initial_delay: float = 1.0,
106         backoff_factor: float = 2.0,
107         max_attempts=5,
108     ) -> bool:
109         @decorator_utils.retry_if_false(
110             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
111         )
112         def _try_acquire_lock_with_retries() -> bool:
113             success = self.try_acquire_lock_once()
114             if not success and os.path.exists(self.lockfile):
115                 self._detect_stale_lockfile()
116             return success
117
118         if os.path.exists(self.lockfile):
119             self._detect_stale_lockfile()
120         return _try_acquire_lock_with_retries()
121
122     def release(self):
123         try:
124             os.unlink(self.lockfile)
125         except Exception as e:
126             logger.exception(e)
127         self.is_locked = False
128
129     def __enter__(self):
130         if self.acquire_with_retries():
131             self.locktime = datetime.datetime.now().timestamp()
132             return self
133         msg = f"Couldn't acquire {self.lockfile}; giving up."
134         logger.warning(msg)
135         raise LockFileException(msg)
136
137     def __exit__(self, _, value, traceback) -> Literal[False]:
138         if self.locktime:
139             ts = datetime.datetime.now().timestamp()
140             duration = ts - self.locktime
141             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
142                 # Note: describe duration briefly only does 1s granularity...
143                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
144                 msg = f'Held {self.lockfile} for {str_duration}'
145                 logger.warning(msg)
146                 warnings.warn(msg, stacklevel=2)
147         self.release()
148         return False
149
150     def __del__(self):
151         if self.is_locked:
152             self.release()
153
154     def _signal(self, *args):
155         if self.is_locked:
156             self.release()
157
158     def _get_lockfile_contents(self) -> str:
159         if self.override_command:
160             cmd = self.override_command
161         else:
162             cmd = ' '.join(sys.argv)
163         contents = LockFileContents(
164             pid=os.getpid(),
165             commandline=cmd,
166             expiration_timestamp=self.expiration_timestamp,
167         )
168         return json.dumps(contents.__dict__)
169
170     def _detect_stale_lockfile(self) -> None:
171         try:
172             with open(self.lockfile, 'r') as rf:
173                 lines = rf.readlines()
174                 if len(lines) == 1:
175                     line = lines[0]
176                     line_dict = json.loads(line)
177                     contents = LockFileContents(**line_dict)
178                     logger.debug('Blocking lock contents="%s"', contents)
179
180                     # Does the PID exist still?
181                     try:
182                         os.kill(contents.pid, 0)
183                     except OSError:
184                         logger.warning(
185                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
186                             self.lockfile,
187                             contents.pid,
188                         )
189                         self.release()
190
191                     # Has the lock expiration expired?
192                     if contents.expiration_timestamp is not None:
193                         now = datetime.datetime.now().timestamp()
194                         if now > contents.expiration_timestamp:
195                             logger.warning(
196                                 'Lockfile %s\'s expiration time has passed; force acquiring',
197                                 self.lockfile,
198                             )
199                             self.release()
200         except Exception:
201             pass  # If the lockfile doesn't exist or disappears, good.