Adds shuffle/scramble to list_utils.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 """File-based locking helper."""
4
5 from __future__ import annotations
6 import contextlib
7 import datetime
8 import json
9 import logging
10 import os
11 import signal
12 import sys
13 import warnings
14 from dataclasses import dataclass
15 from typing import Literal, Optional
16
17 import config
18 import datetime_utils
19 import decorator_utils
20
21 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
22 cfg.add_argument(
23     '--lockfile_held_duration_warning_threshold_sec',
24     type=float,
25     default=60.0,
26     metavar='SECONDS',
27     help='If a lock is held for longer than this threshold we log a warning',
28 )
29 logger = logging.getLogger(__name__)
30
31
32 class LockFileException(Exception):
33     """An exception related to lock files."""
34
35     pass
36
37
38 @dataclass
39 class LockFileContents:
40     """The contents we'll write to each lock file."""
41
42     pid: int
43     commandline: str
44     expiration_timestamp: Optional[float]
45
46
47 class LockFile(contextlib.AbstractContextManager):
48     """A file locking mechanism that has context-manager support so you
49     can use it in a with statement.  e.g.
50
51     with LockFile('./foo.lock'):
52         # do a bunch of stuff... if the process dies we have a signal
53         # handler to do cleanup.  Other code (in this process or another)
54         # that tries to take the same lockfile will block.  There is also
55         # some logic for detecting stale locks.
56
57     """
58
59     def __init__(
60         self,
61         lockfile_path: str,
62         *,
63         do_signal_cleanup: bool = True,
64         expiration_timestamp: Optional[float] = None,
65         override_command: Optional[str] = None,
66     ) -> None:
67         self.is_locked: bool = False
68         self.lockfile: str = lockfile_path
69         self.locktime: Optional[int] = None
70         self.override_command: Optional[str] = override_command
71         if do_signal_cleanup:
72             signal.signal(signal.SIGINT, self._signal)
73             signal.signal(signal.SIGTERM, self._signal)
74         self.expiration_timestamp = expiration_timestamp
75
76     def locked(self):
77         return self.is_locked
78
79     def available(self):
80         return not os.path.exists(self.lockfile)
81
82     def try_acquire_lock_once(self) -> bool:
83         logger.debug("Trying to acquire %s.", self.lockfile)
84         try:
85             # Attempt to create the lockfile.  These flags cause
86             # os.open to raise an OSError if the file already
87             # exists.
88             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
89             with os.fdopen(fd, "a") as f:
90                 contents = self._get_lockfile_contents()
91                 logger.debug(contents)
92                 f.write(contents)
93             logger.debug('Success; I own %s.', self.lockfile)
94             self.is_locked = True
95             return True
96         except OSError:
97             pass
98         logger.warning('Couldn\'t acquire %s.', self.lockfile)
99         return False
100
101     def acquire_with_retries(
102         self,
103         *,
104         initial_delay: float = 1.0,
105         backoff_factor: float = 2.0,
106         max_attempts=5,
107     ) -> bool:
108         @decorator_utils.retry_if_false(
109             tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
110         )
111         def _try_acquire_lock_with_retries() -> bool:
112             success = self.try_acquire_lock_once()
113             if not success and os.path.exists(self.lockfile):
114                 self._detect_stale_lockfile()
115             return success
116
117         if os.path.exists(self.lockfile):
118             self._detect_stale_lockfile()
119         return _try_acquire_lock_with_retries()
120
121     def release(self):
122         try:
123             os.unlink(self.lockfile)
124         except Exception as e:
125             logger.exception(e)
126         self.is_locked = False
127
128     def __enter__(self):
129         if self.acquire_with_retries():
130             self.locktime = datetime.datetime.now().timestamp()
131             return self
132         msg = f"Couldn't acquire {self.lockfile}; giving up."
133         logger.warning(msg)
134         raise LockFileException(msg)
135
136     def __exit__(self, _, value, traceback) -> Literal[False]:
137         if self.locktime:
138             ts = datetime.datetime.now().timestamp()
139             duration = ts - self.locktime
140             if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
141                 # Note: describe duration briefly only does 1s granularity...
142                 str_duration = datetime_utils.describe_duration_briefly(int(duration))
143                 msg = f'Held {self.lockfile} for {str_duration}'
144                 logger.warning(msg)
145                 warnings.warn(msg, stacklevel=2)
146         self.release()
147         return False
148
149     def __del__(self):
150         if self.is_locked:
151             self.release()
152
153     def _signal(self, *args):
154         if self.is_locked:
155             self.release()
156
157     def _get_lockfile_contents(self) -> str:
158         if self.override_command:
159             cmd = self.override_command
160         else:
161             cmd = ' '.join(sys.argv)
162         contents = LockFileContents(
163             pid=os.getpid(),
164             commandline=cmd,
165             expiration_timestamp=self.expiration_timestamp,
166         )
167         return json.dumps(contents.__dict__)
168
169     def _detect_stale_lockfile(self) -> None:
170         try:
171             with open(self.lockfile, 'r') as rf:
172                 lines = rf.readlines()
173                 if len(lines) == 1:
174                     line = lines[0]
175                     line_dict = json.loads(line)
176                     contents = LockFileContents(**line_dict)
177                     logger.debug('Blocking lock contents="%s"', contents)
178
179                     # Does the PID exist still?
180                     try:
181                         os.kill(contents.pid, 0)
182                     except OSError:
183                         logger.warning(
184                             'Lockfile %s\'s pid (%d) is stale; force acquiring...',
185                             self.lockfile,
186                             contents.pid,
187                         )
188                         self.release()
189
190                     # Has the lock expiration expired?
191                     if contents.expiration_timestamp is not None:
192                         now = datetime.datetime.now().timestamp()
193                         if now > contents.expiration_timestamp:
194                             logger.warning(
195                                 'Lockfile %s\'s expiration time has passed; force acquiring',
196                                 self.lockfile,
197                             )
198                             self.release()
199         except Exception:
200             pass  # If the lockfile doesn't exist or disappears, good.