Ahem. Still running black?
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 from dataclasses import dataclass
4 import datetime
5 import json
6 import logging
7 import os
8 import signal
9 import sys
10 from typing import Optional
11 import warnings
12
13 import config
14 import datetime_utils
15 import decorator_utils
16
17
18 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
19 cfg.add_argument(
20     '--lockfile_held_duration_warning_threshold_sec',
21     type=float,
22     default=10.0,
23     metavar='SECONDS',
24     help='If a lock is held for longer than this threshold we log a warning',
25 )
26 logger = logging.getLogger(__name__)
27
28
29 class LockFileException(Exception):
30     pass
31
32
33 @dataclass
34 class LockFileContents:
35     pid: int
36     commandline: str
37     expiration_timestamp: float
38
39
40 class LockFile(object):
41     """A file locking mechanism that has context-manager support so you
42     can use it in a with statement.  e.g.
43
44     with LockFile('./foo.lock'):
45         # do a bunch of stuff... if the process dies we have a signal
46         # handler to do cleanup.  Other code (in this process or another)
47         # that tries to take the same lockfile will block.  There is also
48         # some logic for detecting stale locks.
49
50     """
51
52     def __init__(
53         self,
54         lockfile_path: str,
55         *,
56         do_signal_cleanup: bool = True,
57         expiration_timestamp: Optional[float] = None,
58         override_command: Optional[str] = None,
59     ) -> None:
60         self.is_locked = False
61         self.lockfile = lockfile_path
62         self.override_command = override_command
63         if do_signal_cleanup:
64             signal.signal(signal.SIGINT, self._signal)
65             signal.signal(signal.SIGTERM, self._signal)
66         self.expiration_timestamp = expiration_timestamp
67
68     def locked(self):
69         return self.is_locked
70
71     def available(self):
72         return not os.path.exists(self.lockfile)
73
74     def try_acquire_lock_once(self) -> bool:
75         logger.debug(f"Trying to acquire {self.lockfile}.")
76         try:
77             # Attempt to create the lockfile.  These flags cause
78             # os.open to raise an OSError if the file already
79             # exists.
80             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
81             with os.fdopen(fd, "a") as f:
82                 contents = self._get_lockfile_contents()
83                 logger.debug(contents)
84                 f.write(contents)
85             logger.debug(f'Success; I own {self.lockfile}.')
86             self.is_locked = True
87             return True
88         except OSError:
89             pass
90         msg = f'Could not acquire {self.lockfile}.'
91         logger.warning(msg)
92         return False
93
94     def acquire_with_retries(
95         self,
96         *,
97         initial_delay: float = 1.0,
98         backoff_factor: float = 2.0,
99         max_attempts=5,
100     ) -> bool:
101         @decorator_utils.retry_if_false(
102             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
103         )
104         def _try_acquire_lock_with_retries() -> bool:
105             success = self.try_acquire_lock_once()
106             if not success and os.path.exists(self.lockfile):
107                 self._detect_stale_lockfile()
108             return success
109
110         if os.path.exists(self.lockfile):
111             self._detect_stale_lockfile()
112         return _try_acquire_lock_with_retries()
113
114     def release(self):
115         try:
116             os.unlink(self.lockfile)
117         except Exception as e:
118             logger.exception(e)
119         self.is_locked = False
120
121     def __enter__(self):
122         if self.acquire_with_retries():
123             self.locktime = datetime.datetime.now().timestamp()
124             return self
125         msg = f"Couldn't acquire {self.lockfile}; giving up."
126         logger.warning(msg)
127         raise LockFileException(msg)
128
129     def __exit__(self, type, value, traceback):
130         if self.locktime:
131             ts = datetime.datetime.now().timestamp()
132             duration = ts - self.locktime
133             if (
134                 duration
135                 >= config.config['lockfile_held_duration_warning_threshold_sec']
136             ):
137                 str_duration = datetime_utils.describe_duration_briefly(duration)
138                 msg = f'Held {self.lockfile} for {str_duration}'
139                 logger.warning(msg)
140                 warnings.warn(msg, stacklevel=2)
141         self.release()
142
143     def __del__(self):
144         if self.is_locked:
145             self.release()
146
147     def _signal(self, *args):
148         if self.is_locked:
149             self.release()
150
151     def _get_lockfile_contents(self) -> str:
152         if self.override_command:
153             cmd = self.override_command
154         else:
155             cmd = ' '.join(sys.argv)
156         contents = LockFileContents(
157             pid=os.getpid(),
158             commandline=cmd,
159             expiration_timestamp=self.expiration_timestamp,
160         )
161         return json.dumps(contents.__dict__)
162
163     def _detect_stale_lockfile(self) -> None:
164         try:
165             with open(self.lockfile, 'r') as rf:
166                 lines = rf.readlines()
167                 if len(lines) == 1:
168                     line = lines[0]
169                     line_dict = json.loads(line)
170                     contents = LockFileContents(**line_dict)
171                     logger.debug(f'Blocking lock contents="{contents}"')
172
173                     # Does the PID exist still?
174                     try:
175                         os.kill(contents.pid, 0)
176                     except OSError:
177                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
178                         logger.warning(msg)
179                         self.release()
180
181                     # Has the lock expiration expired?
182                     if contents.expiration_timestamp is not None:
183                         now = datetime.datetime.now().timestamp()
184                         if now > contents.expiration_datetime:
185                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
186                             logger.warning(msg)
187                             self.release()
188         except Exception:
189             pass