Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / files / lockfile.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """File-based locking helper."""
6
7 from __future__ import annotations
8
9 import contextlib
10 import datetime
11 import json
12 import logging
13 import os
14 import signal
15 import sys
16 import warnings
17 from dataclasses import dataclass
18 from typing import Literal, Optional
19
20 from pyutils import config, decorator_utils
21 from pyutils.datetimez import datetime_utils
22
23 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
24 cfg.add_argument(
25     '--lockfile_held_duration_warning_threshold_sec',
26     type=float,
27     default=60.0,
28     metavar='SECONDS',
29     help='If a lock is held for longer than this threshold we log a warning',
30 )
31 logger = logging.getLogger(__name__)
32
33
34 class LockFileException(Exception):
35     """An exception related to lock files."""
36
37     pass
38
39
40 @dataclass
41 class LockFileContents:
42     """The contents we'll write to each lock file."""
43
44     pid: int
45     """The pid of the process that holds the lock"""
46
47     commandline: str
48     """The commandline of the process that holds the lock"""
49
50     expiration_timestamp: Optional[float]
51     """When this lock will expire as seconds since Epoch"""
52
53
54 class LockFile(contextlib.AbstractContextManager):
55     """A file locking mechanism that has context-manager support so you
56     can use it in a with statement.  e.g.::
57
58         with LockFile('./foo.lock'):
59             # do a bunch of stuff... if the process dies we have a signal
60             # handler to do cleanup.  Other code (in this process or another)
61             # that tries to take the same lockfile will block.  There is also
62             # some logic for detecting stale locks.
63     """
64
65     def __init__(
66         self,
67         lockfile_path: str,
68         *,
69         do_signal_cleanup: bool = True,
70         expiration_timestamp: Optional[float] = None,
71         override_command: Optional[str] = None,
72     ) -> None:
73         """C'tor.
74
75         Args:
76             lockfile_path: path of the lockfile to acquire
77             do_signal_cleanup: handle SIGINT and SIGTERM events by
78                 releasing the lock before exiting
79             expiration_timestamp: when our lease on the lock should
80                 expire (as seconds since the Epoch).  None means the
81                 lock will not expire until we explicltly release it.
82             override_command: don't use argv to determine our commandline
83                 rather use this instead if provided.
84         """
85         self.is_locked: bool = False
86         self.lockfile: str = lockfile_path
87         self.locktime: Optional[int] = None
88         self.override_command: Optional[str] = override_command
89         if do_signal_cleanup:
90             signal.signal(signal.SIGINT, self._signal)
91             signal.signal(signal.SIGTERM, self._signal)
92         self.expiration_timestamp = expiration_timestamp
93
94     def locked(self):
95         """Is it locked currently?"""
96         return self.is_locked
97
98     def available(self):
99         """Is it available currently?"""
100         return not os.path.exists(self.lockfile)
101
102     def try_acquire_lock_once(self) -> bool:
103         """Attempt to acquire the lock with no blocking.
104
105         Returns:
106             True if the lock was acquired and False otherwise.
107         """
108         logger.debug("Trying to acquire %s.", self.lockfile)
109         try:
110             # Attempt to create the lockfile.  These flags cause
111             # os.open to raise an OSError if the file already
112             # exists.
113             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
114             with os.fdopen(fd, "a") as f:
115                 contents = self._get_lockfile_contents()
116                 logger.debug(contents)
117                 f.write(contents)
118             logger.debug('Success; I own %s.', self.lockfile)
119             self.is_locked = True
120             return True
121         except OSError:
122             pass
123         logger.warning('Couldn\'t acquire %s.', self.lockfile)
124         return False
125
126     def acquire_with_retries(
127         self,
128         *,
129         initial_delay: float = 1.0,
130         backoff_factor: float = 2.0,
131         max_attempts=5,
132     ) -> bool:
133         """Attempt to acquire the lock repeatedly with retries and backoffs.
134
135         Args:
136             initial_delay: how long to wait before retrying the first time
137             backoff_factor: a float >= 1.0 the multiples the current retry
138                 delay each subsequent time we attempt to acquire and fail
139                 to do so.
140             max_attempts: maximum number of times to try before giving up
141                 and failing.
142
143         Returns:
144             True if the lock was acquired and False otherwise.
145         """
146
147         @decorator_utils.retry_if_false(
148             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
149         )
150         def _try_acquire_lock_with_retries() -> bool:
151             success = self.try_acquire_lock_once()
152             if not success and os.path.exists(self.lockfile):
153                 self._detect_stale_lockfile()
154             return success
155
156         if os.path.exists(self.lockfile):
157             self._detect_stale_lockfile()
158         return _try_acquire_lock_with_retries()
159
160     def release(self):
161         """Release the lock"""
162         try:
163             os.unlink(self.lockfile)
164         except Exception as e:
165             logger.exception(e)
166         self.is_locked = False
167
168     def __enter__(self):
169         if self.acquire_with_retries():
170             self.locktime = datetime.datetime.now().timestamp()
171             return self
172         msg = f"Couldn't acquire {self.lockfile}; giving up."
173         logger.warning(msg)
174         raise LockFileException(msg)
175
176     def __exit__(self, _, value, traceback) -> Literal[False]:
177         if self.locktime:
178             ts = datetime.datetime.now().timestamp()
179             duration = ts - self.locktime
180             if (
181                 duration
182                 >= config.config['lockfile_held_duration_warning_threshold_sec']
183             ):
184                 # Note: describe duration briefly only does 1s granularity...
185                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
186                 msg = f'Held {self.lockfile} for {str_duration}'
187                 logger.warning(msg)
188                 warnings.warn(msg, stacklevel=2)
189         self.release()
190         return False
191
192     def __del__(self):
193         if self.is_locked:
194             self.release()
195
196     def _signal(self, *args):
197         if self.is_locked:
198             self.release()
199
200     def _get_lockfile_contents(self) -> str:
201         if self.override_command:
202             cmd = self.override_command
203         else:
204             cmd = ' '.join(sys.argv)
205         contents = LockFileContents(
206             pid=os.getpid(),
207             commandline=cmd,
208             expiration_timestamp=self.expiration_timestamp,
209         )
210         return json.dumps(contents.__dict__)
211
212     def _detect_stale_lockfile(self) -> None:
213         try:
214             with open(self.lockfile, 'r') as rf:
215                 lines = rf.readlines()
216                 if len(lines) == 1:
217                     line = lines[0]
218                     line_dict = json.loads(line)
219                     contents = LockFileContents(**line_dict)
220                     logger.debug('Blocking lock contents="%s"', contents)
221
222                     # Does the PID exist still?
223                     try:
224                         os.kill(contents.pid, 0)
225                     except OSError:
226                         logger.warning(
227                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
228                             self.lockfile,
229                             contents.pid,
230                         )
231                         self.release()
232
233                     # Has the lock expiration expired?
234                     if contents.expiration_timestamp is not None:
235                         now = datetime.datetime.now().timestamp()
236                         if now > contents.expiration_timestamp:
237                             logger.warning(
238                                 'Lockfile %s\'s expiration time has passed; force acquiring',
239                                 self.lockfile,
240                             )
241                             self.release()
242         except Exception:
243             pass  # If the lockfile doesn't exist or disappears, good.