Start using warnings from stdlib.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 from dataclasses import dataclass
4 import datetime
5 import json
6 import logging
7 import os
8 import signal
9 import sys
10 from typing import Optional
11 import warnings
12
13 import config
14 import datetime_utils
15 import decorator_utils
16
17
18 cfg = config.add_commandline_args(
19     f'Lockfile ({__file__})',
20     'Args related to lockfiles')
21 cfg.add_argument(
22     '--lockfile_held_duration_warning_threshold_sec',
23     type=float,
24     default=10.0,
25     metavar='SECONDS',
26     help='If a lock is held for longer than this threshold we log a warning'
27 )
28 logger = logging.getLogger(__name__)
29
30
31 class LockFileException(Exception):
32     pass
33
34
35 @dataclass
36 class LockFileContents:
37     pid: int
38     commandline: str
39     expiration_timestamp: float
40
41
42 class LockFile(object):
43     """A file locking mechanism that has context-manager support so you
44     can use it in a with statement.  e.g.
45
46     with LockFile('./foo.lock'):
47         # do a bunch of stuff... if the process dies we have a signal
48         # handler to do cleanup.  Other code (in this process or another)
49         # that tries to take the same lockfile will block.  There is also
50         # some logic for detecting stale locks.
51
52     """
53     def __init__(
54             self,
55             lockfile_path: str,
56             *,
57             do_signal_cleanup: bool = True,
58             expiration_timestamp: Optional[float] = None,
59             override_command: Optional[str] = None,
60     ) -> None:
61         self.is_locked = False
62         self.lockfile = lockfile_path
63         self.override_command = override_command
64         if do_signal_cleanup:
65             signal.signal(signal.SIGINT, self._signal)
66             signal.signal(signal.SIGTERM, self._signal)
67         self.expiration_timestamp = expiration_timestamp
68
69     def locked(self):
70         return self.is_locked
71
72     def available(self):
73         return not os.path.exists(self.lockfile)
74
75     def try_acquire_lock_once(self) -> bool:
76         logger.debug(f"Trying to acquire {self.lockfile}.")
77         try:
78             # Attempt to create the lockfile.  These flags cause
79             # os.open to raise an OSError if the file already
80             # exists.
81             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
82             with os.fdopen(fd, "a") as f:
83                 contents = self._get_lockfile_contents()
84                 logger.debug(contents)
85                 f.write(contents)
86             logger.debug(f'Success; I own {self.lockfile}.')
87             self.is_locked = True
88             return True
89         except OSError:
90             pass
91         msg = f'Could not acquire {self.lockfile}.'
92         logger.warning(msg)
93         warnings.warn(msg)
94         return False
95
96     def acquire_with_retries(
97             self,
98             *,
99             initial_delay: float = 1.0,
100             backoff_factor: float = 2.0,
101             max_attempts = 5
102     ) -> bool:
103
104         @decorator_utils.retry_if_false(tries = max_attempts,
105                                         delay_sec = initial_delay,
106                                         backoff = backoff_factor)
107         def _try_acquire_lock_with_retries() -> bool:
108             success = self.try_acquire_lock_once()
109             if not success and os.path.exists(self.lockfile):
110                 self._detect_stale_lockfile()
111             return success
112
113         if os.path.exists(self.lockfile):
114             self._detect_stale_lockfile()
115         return _try_acquire_lock_with_retries()
116
117     def release(self):
118         try:
119             os.unlink(self.lockfile)
120         except Exception as e:
121             logger.exception(e)
122         self.is_locked = False
123
124     def __enter__(self):
125         if self.acquire_with_retries():
126             self.locktime = datetime.datetime.now().timestamp()
127             return self
128         msg = f"Couldn't acquire {self.lockfile}; giving up."
129         logger.warning(msg)
130         warnings.warn(msg)
131         raise LockFileException(msg)
132
133     def __exit__(self, type, value, traceback):
134         if self.locktime:
135             ts = datetime.datetime.now().timestamp()
136             duration = ts - self.locktime
137             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
138                 str_duration = datetime_utils.describe_duration_briefly(duration)
139                 msg = f'Held {self.lockfile} for {str_duration}'
140                 logger.warning(msg)
141                 warnings.warn(msg, stacklevel=2)
142         self.release()
143
144     def __del__(self):
145         if self.is_locked:
146             self.release()
147
148     def _signal(self, *args):
149         if self.is_locked:
150             self.release()
151
152     def _get_lockfile_contents(self) -> str:
153         if self.override_command:
154             cmd = self.override_command
155         else:
156             cmd = ' '.join(sys.argv)
157         print(cmd)
158         contents = LockFileContents(
159             pid = os.getpid(),
160             commandline = cmd,
161             expiration_timestamp = self.expiration_timestamp,
162         )
163         return json.dumps(contents.__dict__)
164
165     def _detect_stale_lockfile(self) -> None:
166         try:
167             with open(self.lockfile, 'r') as rf:
168                 lines = rf.readlines()
169                 if len(lines) == 1:
170                     line = lines[0]
171                     line_dict = json.loads(line)
172                     contents = LockFileContents(**line_dict)
173                     logger.debug(f'Blocking lock contents="{contents}"')
174
175                     # Does the PID exist still?
176                     try:
177                         os.kill(contents.pid, 0)
178                     except OSError:
179                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
180                         logger.warning(msg)
181                         warnings.warn(msg)
182                         self.release()
183
184                     # Has the lock expiration expired?
185                     if contents.expiration_timestamp is not None:
186                         now = datetime.datetime.now().timestamp()
187                         if now > contents.expiration_datetime:
188                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
189                             logger.warning(msg)
190                             warnings.warn(msg)
191                             self.release()
192         except Exception:
193             pass