3 # © Copyright 2021-2022, Scott Gasch
5 """File-based locking helper."""
7 from __future__ import annotations
16 from dataclasses import dataclass
17 from typing import Literal, Optional
21 import decorator_utils
23 cfg = config.add_commandline_args(f'Lockfile ({__file__})', 'Args related to lockfiles')
25 '--lockfile_held_duration_warning_threshold_sec',
29 help='If a lock is held for longer than this threshold we log a warning',
31 logger = logging.getLogger(__name__)
34 class LockFileException(Exception):
35 """An exception related to lock files."""
41 class LockFileContents:
42 """The contents we'll write to each lock file."""
45 """The pid of the process that holds the lock"""
48 """The commandline of the process that holds the lock"""
50 expiration_timestamp: Optional[float]
51 """When this lock will expire as seconds since Epoch"""
54 class LockFile(contextlib.AbstractContextManager):
55 """A file locking mechanism that has context-manager support so you
56 can use it in a with statement. e.g.::
58 with LockFile('./foo.lock'):
59 # do a bunch of stuff... if the process dies we have a signal
60 # handler to do cleanup. Other code (in this process or another)
61 # that tries to take the same lockfile will block. There is also
62 # some logic for detecting stale locks.
69 do_signal_cleanup: bool = True,
70 expiration_timestamp: Optional[float] = None,
71 override_command: Optional[str] = None,
76 lockfile_path: path of the lockfile to acquire
77 do_signal_cleanup: handle SIGINT and SIGTERM events by
78 releasing the lock before exiting
79 expiration_timestamp: when our lease on the lock should
80 expire (as seconds since the Epoch). None means the
81 lock will not expire until we explicltly release it.
82 override_command: don't use argv to determine our commandline
83 rather use this instead if provided.
85 self.is_locked: bool = False
86 self.lockfile: str = lockfile_path
87 self.locktime: Optional[int] = None
88 self.override_command: Optional[str] = override_command
90 signal.signal(signal.SIGINT, self._signal)
91 signal.signal(signal.SIGTERM, self._signal)
92 self.expiration_timestamp = expiration_timestamp
95 """Is it locked currently?"""
99 """Is it available currently?"""
100 return not os.path.exists(self.lockfile)
102 def try_acquire_lock_once(self) -> bool:
103 """Attempt to acquire the lock with no blocking.
106 True if the lock was acquired and False otherwise.
108 logger.debug("Trying to acquire %s.", self.lockfile)
110 # Attempt to create the lockfile. These flags cause
111 # os.open to raise an OSError if the file already
113 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
114 with os.fdopen(fd, "a") as f:
115 contents = self._get_lockfile_contents()
116 logger.debug(contents)
118 logger.debug('Success; I own %s.', self.lockfile)
119 self.is_locked = True
123 logger.warning('Couldn\'t acquire %s.', self.lockfile)
126 def acquire_with_retries(
129 initial_delay: float = 1.0,
130 backoff_factor: float = 2.0,
133 """Attempt to acquire the lock repeatedly with retries and backoffs.
136 initial_delay: how long to wait before retrying the first time
137 backoff_factor: a float >= 1.0 the multiples the current retry
138 delay each subsequent time we attempt to acquire and fail
140 max_attempts: maximum number of times to try before giving up
144 True if the lock was acquired and False otherwise.
147 @decorator_utils.retry_if_false(
148 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
150 def _try_acquire_lock_with_retries() -> bool:
151 success = self.try_acquire_lock_once()
152 if not success and os.path.exists(self.lockfile):
153 self._detect_stale_lockfile()
156 if os.path.exists(self.lockfile):
157 self._detect_stale_lockfile()
158 return _try_acquire_lock_with_retries()
161 """Release the lock"""
163 os.unlink(self.lockfile)
164 except Exception as e:
166 self.is_locked = False
169 if self.acquire_with_retries():
170 self.locktime = datetime.datetime.now().timestamp()
172 msg = f"Couldn't acquire {self.lockfile}; giving up."
174 raise LockFileException(msg)
176 def __exit__(self, _, value, traceback) -> Literal[False]:
178 ts = datetime.datetime.now().timestamp()
179 duration = ts - self.locktime
180 if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
181 # Note: describe duration briefly only does 1s granularity...
182 str_duration = datetime_utils.describe_duration_briefly(int(duration))
183 msg = f'Held {self.lockfile} for {str_duration}'
185 warnings.warn(msg, stacklevel=2)
193 def _signal(self, *args):
197 def _get_lockfile_contents(self) -> str:
198 if self.override_command:
199 cmd = self.override_command
201 cmd = ' '.join(sys.argv)
202 contents = LockFileContents(
205 expiration_timestamp=self.expiration_timestamp,
207 return json.dumps(contents.__dict__)
209 def _detect_stale_lockfile(self) -> None:
211 with open(self.lockfile, 'r') as rf:
212 lines = rf.readlines()
215 line_dict = json.loads(line)
216 contents = LockFileContents(**line_dict)
217 logger.debug('Blocking lock contents="%s"', contents)
219 # Does the PID exist still?
221 os.kill(contents.pid, 0)
224 'Lockfile %s\'s pid (%d) is stale; force acquiring...',
230 # Has the lock expiration expired?
231 if contents.expiration_timestamp is not None:
232 now = datetime.datetime.now().timestamp()
233 if now > contents.expiration_timestamp:
235 'Lockfile %s\'s expiration time has passed; force acquiring',
240 pass # If the lockfile doesn't exist or disappears, good.