More cleanup, yey!
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 """File-based locking helper."""
4
5 import datetime
6 import json
7 import logging
8 import os
9 import signal
10 import sys
11 import warnings
12 from dataclasses import dataclass
13 from typing import Optional
14
15 import config
16 import datetime_utils
17 import decorator_utils
18
19 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
20 cfg.add_argument(
21     '--lockfile_held_duration_warning_threshold_sec',
22     type=float,
23     default=60.0,
24     metavar='SECONDS',
25     help='If a lock is held for longer than this threshold we log a warning',
26 )
27 logger = logging.getLogger(__name__)
28
29
30 class LockFileException(Exception):
31     """An exception related to lock files."""
32
33     pass
34
35
36 @dataclass
37 class LockFileContents:
38     """The contents we'll write to each lock file."""
39
40     pid: int
41     commandline: str
42     expiration_timestamp: Optional[float]
43
44
45 class LockFile(object):
46     """A file locking mechanism that has context-manager support so you
47     can use it in a with statement.  e.g.
48
49     with LockFile('./foo.lock'):
50         # do a bunch of stuff... if the process dies we have a signal
51         # handler to do cleanup.  Other code (in this process or another)
52         # that tries to take the same lockfile will block.  There is also
53         # some logic for detecting stale locks.
54
55     """
56
57     def __init__(
58         self,
59         lockfile_path: str,
60         *,
61         do_signal_cleanup: bool = True,
62         expiration_timestamp: Optional[float] = None,
63         override_command: Optional[str] = None,
64     ) -> None:
65         self.is_locked: bool = False
66         self.lockfile: str = lockfile_path
67         self.locktime: Optional[int] = None
68         self.override_command: Optional[str] = override_command
69         if do_signal_cleanup:
70             signal.signal(signal.SIGINT, self._signal)
71             signal.signal(signal.SIGTERM, self._signal)
72         self.expiration_timestamp = expiration_timestamp
73
74     def locked(self):
75         return self.is_locked
76
77     def available(self):
78         return not os.path.exists(self.lockfile)
79
80     def try_acquire_lock_once(self) -> bool:
81         logger.debug("Trying to acquire %s.", self.lockfile)
82         try:
83             # Attempt to create the lockfile.  These flags cause
84             # os.open to raise an OSError if the file already
85             # exists.
86             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
87             with os.fdopen(fd, "a") as f:
88                 contents = self._get_lockfile_contents()
89                 logger.debug(contents)
90                 f.write(contents)
91             logger.debug('Success; I own %s.', self.lockfile)
92             self.is_locked = True
93             return True
94         except OSError:
95             pass
96         logger.warning('Couldn\'t acquire %s.', self.lockfile)
97         return False
98
99     def acquire_with_retries(
100         self,
101         *,
102         initial_delay: float = 1.0,
103         backoff_factor: float = 2.0,
104         max_attempts=5,
105     ) -> bool:
106         @decorator_utils.retry_if_false(
107             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
108         )
109         def _try_acquire_lock_with_retries() -> bool:
110             success = self.try_acquire_lock_once()
111             if not success and os.path.exists(self.lockfile):
112                 self._detect_stale_lockfile()
113             return success
114
115         if os.path.exists(self.lockfile):
116             self._detect_stale_lockfile()
117         return _try_acquire_lock_with_retries()
118
119     def release(self):
120         try:
121             os.unlink(self.lockfile)
122         except Exception as e:
123             logger.exception(e)
124         self.is_locked = False
125
126     def __enter__(self):
127         if self.acquire_with_retries():
128             self.locktime = datetime.datetime.now().timestamp()
129             return self
130         msg = f"Couldn't acquire {self.lockfile}; giving up."
131         logger.warning(msg)
132         raise LockFileException(msg)
133
134     def __exit__(self, _, value, traceback):
135         if self.locktime:
136             ts = datetime.datetime.now().timestamp()
137             duration = ts - self.locktime
138             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
139                 str_duration = datetime_utils.describe_duration_briefly(duration)
140                 msg = f'Held {self.lockfile} for {str_duration}'
141                 logger.warning(msg)
142                 warnings.warn(msg, stacklevel=2)
143         self.release()
144
145     def __del__(self):
146         if self.is_locked:
147             self.release()
148
149     def _signal(self, *args):
150         if self.is_locked:
151             self.release()
152
153     def _get_lockfile_contents(self) -> str:
154         if self.override_command:
155             cmd = self.override_command
156         else:
157             cmd = ' '.join(sys.argv)
158         contents = LockFileContents(
159             pid=os.getpid(),
160             commandline=cmd,
161             expiration_timestamp=self.expiration_timestamp,
162         )
163         return json.dumps(contents.__dict__)
164
165     def _detect_stale_lockfile(self) -> None:
166         try:
167             with open(self.lockfile, 'r') as rf:
168                 lines = rf.readlines()
169                 if len(lines) == 1:
170                     line = lines[0]
171                     line_dict = json.loads(line)
172                     contents = LockFileContents(**line_dict)
173                     logger.debug('Blocking lock contents="%s"', contents)
174
175                     # Does the PID exist still?
176                     try:
177                         os.kill(contents.pid, 0)
178                     except OSError:
179                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
180                         logger.warning(msg)
181                         self.release()
182
183                     # Has the lock expiration expired?
184                     if contents.expiration_timestamp is not None:
185                         now = datetime.datetime.now().timestamp()
186                         if now > contents.expiration_timestamp:
187                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
188                             logger.warning(msg)
189                             self.release()
190         except Exception:
191             pass