Ran black code formatter on everything.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 from dataclasses import dataclass
4 import datetime
5 import json
6 import logging
7 import os
8 import signal
9 import sys
10 from typing import Optional
11 import warnings
12
13 import config
14 import datetime_utils
15 import decorator_utils
16
17
18 cfg = config.add_commandline_args(
19     f'Lockfile ({__file__})', 'Args related to lockfiles'
20 )
21 cfg.add_argument(
22     '--lockfile_held_duration_warning_threshold_sec',
23     type=float,
24     default=10.0,
25     metavar='SECONDS',
26     help='If a lock is held for longer than this threshold we log a warning',
27 )
28 logger = logging.getLogger(__name__)
29
30
31 class LockFileException(Exception):
32     pass
33
34
35 @dataclass
36 class LockFileContents:
37     pid: int
38     commandline: str
39     expiration_timestamp: float
40
41
42 class LockFile(object):
43     """A file locking mechanism that has context-manager support so you
44     can use it in a with statement.  e.g.
45
46     with LockFile('./foo.lock'):
47         # do a bunch of stuff... if the process dies we have a signal
48         # handler to do cleanup.  Other code (in this process or another)
49         # that tries to take the same lockfile will block.  There is also
50         # some logic for detecting stale locks.
51
52     """
53
54     def __init__(
55         self,
56         lockfile_path: str,
57         *,
58         do_signal_cleanup: bool = True,
59         expiration_timestamp: Optional[float] = None,
60         override_command: Optional[str] = None,
61     ) -> None:
62         self.is_locked = False
63         self.lockfile = lockfile_path
64         self.override_command = override_command
65         if do_signal_cleanup:
66             signal.signal(signal.SIGINT, self._signal)
67             signal.signal(signal.SIGTERM, self._signal)
68         self.expiration_timestamp = expiration_timestamp
69
70     def locked(self):
71         return self.is_locked
72
73     def available(self):
74         return not os.path.exists(self.lockfile)
75
76     def try_acquire_lock_once(self) -> bool:
77         logger.debug(f"Trying to acquire {self.lockfile}.")
78         try:
79             # Attempt to create the lockfile.  These flags cause
80             # os.open to raise an OSError if the file already
81             # exists.
82             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
83             with os.fdopen(fd, "a") as f:
84                 contents = self._get_lockfile_contents()
85                 logger.debug(contents)
86                 f.write(contents)
87             logger.debug(f'Success; I own {self.lockfile}.')
88             self.is_locked = True
89             return True
90         except OSError:
91             pass
92         msg = f'Could not acquire {self.lockfile}.'
93         logger.warning(msg)
94         return False
95
96     def acquire_with_retries(
97         self,
98         *,
99         initial_delay: float = 1.0,
100         backoff_factor: float = 2.0,
101         max_attempts=5,
102     ) -> bool:
103         @decorator_utils.retry_if_false(
104             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
105         )
106         def _try_acquire_lock_with_retries() -> bool:
107             success = self.try_acquire_lock_once()
108             if not success and os.path.exists(self.lockfile):
109                 self._detect_stale_lockfile()
110             return success
111
112         if os.path.exists(self.lockfile):
113             self._detect_stale_lockfile()
114         return _try_acquire_lock_with_retries()
115
116     def release(self):
117         try:
118             os.unlink(self.lockfile)
119         except Exception as e:
120             logger.exception(e)
121         self.is_locked = False
122
123     def __enter__(self):
124         if self.acquire_with_retries():
125             self.locktime = datetime.datetime.now().timestamp()
126             return self
127         msg = f"Couldn't acquire {self.lockfile}; giving up."
128         logger.warning(msg)
129         raise LockFileException(msg)
130
131     def __exit__(self, type, value, traceback):
132         if self.locktime:
133             ts = datetime.datetime.now().timestamp()
134             duration = ts - self.locktime
135             if (
136                 duration
137                 >= config.config['lockfile_held_duration_warning_threshold_sec']
138             ):
139                 str_duration = datetime_utils.describe_duration_briefly(
140                     duration
141                 )
142                 msg = f'Held {self.lockfile} for {str_duration}'
143                 logger.warning(msg)
144                 warnings.warn(msg, stacklevel=2)
145         self.release()
146
147     def __del__(self):
148         if self.is_locked:
149             self.release()
150
151     def _signal(self, *args):
152         if self.is_locked:
153             self.release()
154
155     def _get_lockfile_contents(self) -> str:
156         if self.override_command:
157             cmd = self.override_command
158         else:
159             cmd = ' '.join(sys.argv)
160         contents = LockFileContents(
161             pid=os.getpid(),
162             commandline=cmd,
163             expiration_timestamp=self.expiration_timestamp,
164         )
165         return json.dumps(contents.__dict__)
166
167     def _detect_stale_lockfile(self) -> None:
168         try:
169             with open(self.lockfile, 'r') as rf:
170                 lines = rf.readlines()
171                 if len(lines) == 1:
172                     line = lines[0]
173                     line_dict = json.loads(line)
174                     contents = LockFileContents(**line_dict)
175                     logger.debug(f'Blocking lock contents="{contents}"')
176
177                     # Does the PID exist still?
178                     try:
179                         os.kill(contents.pid, 0)
180                     except OSError:
181                         msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
182                         logger.warning(msg)
183                         self.release()
184
185                     # Has the lock expiration expired?
186                     if contents.expiration_timestamp is not None:
187                         now = datetime.datetime.now().timestamp()
188                         if now > contents.expiration_datetime:
189                             msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'
190                             logger.warning(msg)
191                             self.release()
192         except Exception:
193             pass