Fix an edge condition around the Nth weekday of the month when
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """File-based locking helper."""
6
7 from __future__ import annotations
8 import contextlib
9 import datetime
10 import json
11 import logging
12 import os
13 import signal
14 import sys
15 import warnings
16 from dataclasses import dataclass
17 from typing import Literal, Optional
18
19 import config
20 import datetime_utils
21 import decorator_utils
22
23 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
24 cfg.add_argument(
25     '--lockfile_held_duration_warning_threshold_sec',
26     type=float,
27     default=60.0,
28     metavar='SECONDS',
29     help='If a lock is held for longer than this threshold we log a warning',
30 )
31 logger = logging.getLogger(__name__)
32
33
34 class LockFileException(Exception):
35     """An exception related to lock files."""
36
37     pass
38
39
40 @dataclass
41 class LockFileContents:
42     """The contents we'll write to each lock file."""
43
44     pid: int
45     commandline: str
46     expiration_timestamp: Optional[float]
47
48
49 class LockFile(contextlib.AbstractContextManager):
50     """A file locking mechanism that has context-manager support so you
51     can use it in a with statement.  e.g.
52
53     with LockFile('./foo.lock'):
54         # do a bunch of stuff... if the process dies we have a signal
55         # handler to do cleanup.  Other code (in this process or another)
56         # that tries to take the same lockfile will block.  There is also
57         # some logic for detecting stale locks.
58
59     """
60
61     def __init__(
62         self,
63         lockfile_path: str,
64         *,
65         do_signal_cleanup: bool = True,
66         expiration_timestamp: Optional[float] = None,
67         override_command: Optional[str] = None,
68     ) -> None:
69         self.is_locked: bool = False
70         self.lockfile: str = lockfile_path
71         self.locktime: Optional[int] = None
72         self.override_command: Optional[str] = override_command
73         if do_signal_cleanup:
74             signal.signal(signal.SIGINT, self._signal)
75             signal.signal(signal.SIGTERM, self._signal)
76         self.expiration_timestamp = expiration_timestamp
77
78     def locked(self):
79         return self.is_locked
80
81     def available(self):
82         return not os.path.exists(self.lockfile)
83
84     def try_acquire_lock_once(self) -> bool:
85         logger.debug("Trying to acquire %s.", self.lockfile)
86         try:
87             # Attempt to create the lockfile.  These flags cause
88             # os.open to raise an OSError if the file already
89             # exists.
90             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
91             with os.fdopen(fd, "a") as f:
92                 contents = self._get_lockfile_contents()
93                 logger.debug(contents)
94                 f.write(contents)
95             logger.debug('Success; I own %s.', self.lockfile)
96             self.is_locked = True
97             return True
98         except OSError:
99             pass
100         logger.warning('Couldn\'t acquire %s.', self.lockfile)
101         return False
102
103     def acquire_with_retries(
104         self,
105         *,
106         initial_delay: float = 1.0,
107         backoff_factor: float = 2.0,
108         max_attempts=5,
109     ) -> bool:
110         @decorator_utils.retry_if_false(
111             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
112         )
113         def _try_acquire_lock_with_retries() -> bool:
114             success = self.try_acquire_lock_once()
115             if not success and os.path.exists(self.lockfile):
116                 self._detect_stale_lockfile()
117             return success
118
119         if os.path.exists(self.lockfile):
120             self._detect_stale_lockfile()
121         return _try_acquire_lock_with_retries()
122
123     def release(self):
124         try:
125             os.unlink(self.lockfile)
126         except Exception as e:
127             logger.exception(e)
128         self.is_locked = False
129
130     def __enter__(self):
131         if self.acquire_with_retries():
132             self.locktime = datetime.datetime.now().timestamp()
133             return self
134         msg = f"Couldn't acquire {self.lockfile}; giving up."
135         logger.warning(msg)
136         raise LockFileException(msg)
137
138     def __exit__(self, _, value, traceback) -> Literal[False]:
139         if self.locktime:
140             ts = datetime.datetime.now().timestamp()
141             duration = ts - self.locktime
142             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
143                 # Note: describe duration briefly only does 1s granularity...
144                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
145                 msg = f'Held {self.lockfile} for {str_duration}'
146                 logger.warning(msg)
147                 warnings.warn(msg, stacklevel=2)
148         self.release()
149         return False
150
151     def __del__(self):
152         if self.is_locked:
153             self.release()
154
155     def _signal(self, *args):
156         if self.is_locked:
157             self.release()
158
159     def _get_lockfile_contents(self) -> str:
160         if self.override_command:
161             cmd = self.override_command
162         else:
163             cmd = ' '.join(sys.argv)
164         contents = LockFileContents(
165             pid=os.getpid(),
166             commandline=cmd,
167             expiration_timestamp=self.expiration_timestamp,
168         )
169         return json.dumps(contents.__dict__)
170
171     def _detect_stale_lockfile(self) -> None:
172         try:
173             with open(self.lockfile, 'r') as rf:
174                 lines = rf.readlines()
175                 if len(lines) == 1:
176                     line = lines[0]
177                     line_dict = json.loads(line)
178                     contents = LockFileContents(**line_dict)
179                     logger.debug('Blocking lock contents="%s"', contents)
180
181                     # Does the PID exist still?
182                     try:
183                         os.kill(contents.pid, 0)
184                     except OSError:
185                         logger.warning(
186                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
187                             self.lockfile,
188                             contents.pid,
189                         )
190                         self.release()
191
192                     # Has the lock expiration expired?
193                     if contents.expiration_timestamp is not None:
194                         now = datetime.datetime.now().timestamp()
195                         if now > contents.expiration_timestamp:
196                             logger.warning(
197                                 'Lockfile %s\'s expiration time has passed; force acquiring',
198                                 self.lockfile,
199                             )
200                             self.release()
201         except Exception:
202             pass  # If the lockfile doesn't exist or disappears, good.