3 from dataclasses import dataclass
10 from typing import Optional
12 import decorator_utils
15 logger = logging.getLogger(__name__)
18 class LockFileException(Exception):
23 class LockFileContents:
26 expiration_timestamp: float
29 class LockFile(object):
30 """A file locking mechanism that has context-manager support so you
31 can use it in a with statement. e.g.
33 with LockFile('./foo.lock'):
34 # do a bunch of stuff... if the process dies we have a signal
35 # handler to do cleanup. Other code (in this process or another)
36 # that tries to take the same lockfile will block. There is also
37 # some logic for detecting stale locks.
45 do_signal_cleanup: bool = True,
46 expiration_timestamp: Optional[float] = None,
47 override_command: Optional[str] = None,
49 self.is_locked = False
50 self.lockfile = lockfile_path
51 self.override_command = override_command
53 signal.signal(signal.SIGINT, self._signal)
54 signal.signal(signal.SIGTERM, self._signal)
55 self.expiration_timestamp = expiration_timestamp
61 return not os.path.exists(self.lockfile)
63 def try_acquire_lock_once(self) -> bool:
64 logger.debug(f"Trying to acquire {self.lockfile}.")
66 # Attempt to create the lockfile. These flags cause
67 # os.open to raise an OSError if the file already
69 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
70 with os.fdopen(fd, "a") as f:
71 contents = self._get_lockfile_contents()
72 logger.debug(contents)
74 logger.debug(f'Success; I own {self.lockfile}.')
79 logger.debug(f'Failed; I could not acquire {self.lockfile}.')
82 def acquire_with_retries(
85 initial_delay: float = 1.0,
86 backoff_factor: float = 2.0,
90 @decorator_utils.retry_if_false(tries = max_attempts,
91 delay_sec = initial_delay,
92 backoff = backoff_factor)
93 def _try_acquire_lock_with_retries() -> bool:
94 success = self.try_acquire_lock_once()
95 if not success and os.path.exists(self.lockfile):
96 self._detect_stale_lockfile()
99 if os.path.exists(self.lockfile):
100 self._detect_stale_lockfile()
101 return _try_acquire_lock_with_retries()
105 os.unlink(self.lockfile)
106 except Exception as e:
108 self.is_locked = False
111 if self.acquire_with_retries():
113 msg = f"Couldn't acquire {self.lockfile}; giving up."
115 raise LockFileException(msg)
117 def __exit__(self, type, value, traceback):
124 def _signal(self, *args):
128 def _get_lockfile_contents(self) -> str:
129 if self.override_command:
130 cmd = self.override_command
132 cmd = ' '.join(sys.argv)
134 contents = LockFileContents(
137 expiration_timestamp = self.expiration_timestamp,
139 return json.dumps(contents.__dict__)
141 def _detect_stale_lockfile(self) -> None:
143 with open(self.lockfile, 'r') as rf:
144 lines = rf.readlines()
147 line_dict = json.loads(line)
148 contents = LockFileContents(**line_dict)
149 logger.debug(f'Blocking lock contents="{contents}"')
151 # Does the PID exist still?
153 os.kill(contents.pid, 0)
155 logger.debug('The pid seems stale; killing the lock.')
158 # Has the lock expiration expired?
159 if contents.expiration_timestamp is not None:
160 now = datetime.datetime.now().timestamp()
161 if now > contents.expiration_datetime:
162 logger.debug('The expiration time has passed; ' +