290710759dbc702fff757ae2561fe5c0af3051a6
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 import datetime
4 import json
5 import logging
6 import os
7 import signal
8 import sys
9 import warnings
10 from dataclasses import dataclass
11 from typing import Optional
12
13 import config
14 import datetime_utils
15 import decorator_utils
16
17 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
18 cfg.add_argument(
19     '--lockfile_held_duration_warning_threshold_sec',
20     type=float,
21     default=60.0,
22     metavar='SECONDS',
23     help='If a lock is held for longer than this threshold we log a warning',
24 )
25 logger = logging.getLogger(__name__)
26
27
28 class LockFileException(Exception):
29     pass
30
31
32 @dataclass
33 class LockFileContents:
34     pid: int
35     commandline: str
36     expiration_timestamp: Optional[float]
37
38
39 class LockFile(object):
40     """A file locking mechanism that has context-manager support so you
41     can use it in a with statement.  e.g.
42
43     with LockFile('./foo.lock'):
44         # do a bunch of stuff... if the process dies we have a signal
45         # handler to do cleanup.  Other code (in this process or another)
46         # that tries to take the same lockfile will block.  There is also
47         # some logic for detecting stale locks.
48
49     """
50
51     def __init__(
52         self,
53         lockfile_path: str,
54         *,
55         do_signal_cleanup: bool = True,
56         expiration_timestamp: Optional[float] = None,
57         override_command: Optional[str] = None,
58     ) -> None:
59         self.is_locked = False
60         self.lockfile = lockfile_path
61         self.override_command = override_command
62         if do_signal_cleanup:
63             signal.signal(signal.SIGINT, self._signal)
64             signal.signal(signal.SIGTERM, self._signal)
65         self.expiration_timestamp = expiration_timestamp
66
67     def locked(self):
68         return self.is_locked
69
70     def available(self):
71         return not os.path.exists(self.lockfile)
72
73     def try_acquire_lock_once(self) -> bool:
74         logger.debug(f"Trying to acquire {self.lockfile}.")
75         try:
76             # Attempt to create the lockfile.  These flags cause
77             # os.open to raise an OSError if the file already
78             # exists.
79             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
80             with os.fdopen(fd, "a") as f:
81                 contents = self._get_lockfile_contents()
82                 logger.debug(contents)
83                 f.write(contents)
84             logger.debug(f'Success; I own {self.lockfile}.')
85             self.is_locked = True
86             return True
87         except OSError:
88             pass
89         msg = f'Could not acquire {self.lockfile}.'
90         logger.warning(msg)
91         return False
92
93     def acquire_with_retries(
94         self,
95         *,
96         initial_delay: float = 1.0,
97         backoff_factor: float = 2.0,
98         max_attempts=5,
99     ) -> bool:
100         @decorator_utils.retry_if_false(
101             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
102         )
103         def _try_acquire_lock_with_retries() -> bool:
104             success = self.try_acquire_lock_once()
105             if not success and os.path.exists(self.lockfile):
106                 self._detect_stale_lockfile()
107             return success
108
109         if os.path.exists(self.lockfile):
110             self._detect_stale_lockfile()
111         return _try_acquire_lock_with_retries()
112
113     def release(self):
114         try:
115             os.unlink(self.lockfile)
116         except Exception as e:
117             logger.exception(e)
118         self.is_locked = False
119
120     def __enter__(self):
121         if self.acquire_with_retries():
122             self.locktime = datetime.datetime.now().timestamp()
123             return self
124         msg = f"Couldn't acquire {self.lockfile}; giving up."
125         logger.warning(msg)
126         raise LockFileException(msg)
127
128     def __exit__(self, type, value, traceback):
129         if self.locktime:
130             ts = datetime.datetime.now().timestamp()
131             duration = ts - self.locktime
132             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
133                 str_duration = datetime_utils.describe_duration_briefly(duration)
134                 msg = f'Held {self.lockfile} for {str_duration}'
135                 logger.warning(msg)
136                 warnings.warn(msg, stacklevel=2)
137         self.release()
138
139     def __del__(self):
140         if self.is_locked:
141             self.release()
142
143     def _signal(self, *args):
144         if self.is_locked:
145             self.release()
146
147     def _get_lockfile_contents(self) -> str:
148         if self.override_command:
149             cmd = self.override_command
150         else:
151             cmd = ' '.join(sys.argv)
152         contents = LockFileContents(
153             pid=os.getpid(),
154             commandline=cmd,
155             expiration_timestamp=self.expiration_timestamp,
156         )
157         return json.dumps(contents.__dict__)
158
159     def _detect_stale_lockfile(self) -> None:
160         try:
161             with open(self.lockfile, 'r') as rf:
162                 lines = rf.readlines()
163                 if len(lines) == 1:
164                     line = lines[0]
165                     line_dict = json.loads(line)
166                     contents = LockFileContents(**line_dict)
167                     logger.debug(f'Blocking lock contents="{contents}"')
168
169                     # Does the PID exist still?
170                     try:
171                         os.kill(contents.pid, 0)
172                     except OSError:
173                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
174                         logger.warning(msg)
175                         self.release()
176
177                     # Has the lock expiration expired?
178                     if contents.expiration_timestamp is not None:
179                         now = datetime.datetime.now().timestamp()
180                         if now > contents.expiration_timestamp:
181                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
182                             logger.warning(msg)
183                             self.release()
184         except Exception:
185             pass