Scale back warnings.warn and add stacklevels= where appropriate.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 from dataclasses import dataclass
4 import datetime
5 import json
6 import logging
7 import os
8 import signal
9 import sys
10 from typing import Optional
11 import warnings
12
13 import config
14 import datetime_utils
15 import decorator_utils
16
17
18 cfg = config.add_commandline_args(
19     f'Lockfile ({__file__})',
20     'Args related to lockfiles')
21 cfg.add_argument(
22     '--lockfile_held_duration_warning_threshold_sec',
23     type=float,
24     default=10.0,
25     metavar='SECONDS',
26     help='If a lock is held for longer than this threshold we log a warning'
27 )
28 logger = logging.getLogger(__name__)
29
30
31 class LockFileException(Exception):
32     pass
33
34
35 @dataclass
36 class LockFileContents:
37     pid: int
38     commandline: str
39     expiration_timestamp: float
40
41
42 class LockFile(object):
43     """A file locking mechanism that has context-manager support so you
44     can use it in a with statement.  e.g.
45
46     with LockFile('./foo.lock'):
47         # do a bunch of stuff... if the process dies we have a signal
48         # handler to do cleanup.  Other code (in this process or another)
49         # that tries to take the same lockfile will block.  There is also
50         # some logic for detecting stale locks.
51
52     """
53     def __init__(
54             self,
55             lockfile_path: str,
56             *,
57             do_signal_cleanup: bool = True,
58             expiration_timestamp: Optional[float] = None,
59             override_command: Optional[str] = None,
60     ) -> None:
61         self.is_locked = False
62         self.lockfile = lockfile_path
63         self.override_command = override_command
64         if do_signal_cleanup:
65             signal.signal(signal.SIGINT, self._signal)
66             signal.signal(signal.SIGTERM, self._signal)
67         self.expiration_timestamp = expiration_timestamp
68
69     def locked(self):
70         return self.is_locked
71
72     def available(self):
73         return not os.path.exists(self.lockfile)
74
75     def try_acquire_lock_once(self) -> bool:
76         logger.debug(f"Trying to acquire {self.lockfile}.")
77         try:
78             # Attempt to create the lockfile.  These flags cause
79             # os.open to raise an OSError if the file already
80             # exists.
81             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
82             with os.fdopen(fd, "a") as f:
83                 contents = self._get_lockfile_contents()
84                 logger.debug(contents)
85                 f.write(contents)
86             logger.debug(f'Success; I own {self.lockfile}.')
87             self.is_locked = True
88             return True
89         except OSError:
90             pass
91         msg = f'Could not acquire {self.lockfile}.'
92         logger.warning(msg)
93         return False
94
95     def acquire_with_retries(
96             self,
97             *,
98             initial_delay: float = 1.0,
99             backoff_factor: float = 2.0,
100             max_attempts = 5
101     ) -> bool:
102
103         @decorator_utils.retry_if_false(tries = max_attempts,
104                                         delay_sec = initial_delay,
105                                         backoff = backoff_factor)
106         def _try_acquire_lock_with_retries() -> bool:
107             success = self.try_acquire_lock_once()
108             if not success and os.path.exists(self.lockfile):
109                 self._detect_stale_lockfile()
110             return success
111
112         if os.path.exists(self.lockfile):
113             self._detect_stale_lockfile()
114         return _try_acquire_lock_with_retries()
115
116     def release(self):
117         try:
118             os.unlink(self.lockfile)
119         except Exception as e:
120             logger.exception(e)
121         self.is_locked = False
122
123     def __enter__(self):
124         if self.acquire_with_retries():
125             self.locktime = datetime.datetime.now().timestamp()
126             return self
127         msg = f"Couldn't acquire {self.lockfile}; giving up."
128         logger.warning(msg)
129         raise LockFileException(msg)
130
131     def __exit__(self, type, value, traceback):
132         if self.locktime:
133             ts = datetime.datetime.now().timestamp()
134             duration = ts - self.locktime
135             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
136                 str_duration = datetime_utils.describe_duration_briefly(duration)
137                 msg = f'Held {self.lockfile} for {str_duration}'
138                 logger.warning(msg)
139                 warnings.warn(msg, stacklevel=2)
140         self.release()
141
142     def __del__(self):
143         if self.is_locked:
144             self.release()
145
146     def _signal(self, *args):
147         if self.is_locked:
148             self.release()
149
150     def _get_lockfile_contents(self) -> str:
151         if self.override_command:
152             cmd = self.override_command
153         else:
154             cmd = ' '.join(sys.argv)
155         print(cmd)
156         contents = LockFileContents(
157             pid = os.getpid(),
158             commandline = cmd,
159             expiration_timestamp = self.expiration_timestamp,
160         )
161         return json.dumps(contents.__dict__)
162
163     def _detect_stale_lockfile(self) -> None:
164         try:
165             with open(self.lockfile, 'r') as rf:
166                 lines = rf.readlines()
167                 if len(lines) == 1:
168                     line = lines[0]
169                     line_dict = json.loads(line)
170                     contents = LockFileContents(**line_dict)
171                     logger.debug(f'Blocking lock contents="{contents}"')
172
173                     # Does the PID exist still?
174                     try:
175                         os.kill(contents.pid, 0)
176                     except OSError:
177                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
178                         logger.warning(msg)
179                         self.release()
180
181                     # Has the lock expiration expired?
182                     if contents.expiration_timestamp is not None:
183                         now = datetime.datetime.now().timestamp()
184                         if now > contents.expiration_datetime:
185                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
186                             logger.warning(msg)
187                             self.release()
188         except Exception:
189             pass