3 from dataclasses import dataclass
10 from typing import Optional
15 import decorator_utils
18 cfg = config.add_commandline_args(
19 f'Lockfile ({__file__})', 'Args related to lockfiles'
22 '--lockfile_held_duration_warning_threshold_sec',
26 help='If a lock is held for longer than this threshold we log a warning',
28 logger = logging.getLogger(__name__)
31 class LockFileException(Exception):
36 class LockFileContents:
39 expiration_timestamp: float
42 class LockFile(object):
43 """A file locking mechanism that has context-manager support so you
44 can use it in a with statement. e.g.
46 with LockFile('./foo.lock'):
47 # do a bunch of stuff... if the process dies we have a signal
48 # handler to do cleanup. Other code (in this process or another)
49 # that tries to take the same lockfile will block. There is also
50 # some logic for detecting stale locks.
58 do_signal_cleanup: bool = True,
59 expiration_timestamp: Optional[float] = None,
60 override_command: Optional[str] = None,
62 self.is_locked = False
63 self.lockfile = lockfile_path
64 self.override_command = override_command
66 signal.signal(signal.SIGINT, self._signal)
67 signal.signal(signal.SIGTERM, self._signal)
68 self.expiration_timestamp = expiration_timestamp
74 return not os.path.exists(self.lockfile)
76 def try_acquire_lock_once(self) -> bool:
77 logger.debug(f"Trying to acquire {self.lockfile}.")
79 # Attempt to create the lockfile. These flags cause
80 # os.open to raise an OSError if the file already
82 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
83 with os.fdopen(fd, "a") as f:
84 contents = self._get_lockfile_contents()
85 logger.debug(contents)
87 logger.debug(f'Success; I own {self.lockfile}.')
92 msg = f'Could not acquire {self.lockfile}.'
96 def acquire_with_retries(
99 initial_delay: float = 1.0,
100 backoff_factor: float = 2.0,
103 @decorator_utils.retry_if_false(
104 tries=max_attempts, delay_sec=initial_delay, backoff=backoff_factor
106 def _try_acquire_lock_with_retries() -> bool:
107 success = self.try_acquire_lock_once()
108 if not success and os.path.exists(self.lockfile):
109 self._detect_stale_lockfile()
112 if os.path.exists(self.lockfile):
113 self._detect_stale_lockfile()
114 return _try_acquire_lock_with_retries()
118 os.unlink(self.lockfile)
119 except Exception as e:
121 self.is_locked = False
124 if self.acquire_with_retries():
125 self.locktime = datetime.datetime.now().timestamp()
127 msg = f"Couldn't acquire {self.lockfile}; giving up."
129 raise LockFileException(msg)
131 def __exit__(self, type, value, traceback):
133 ts = datetime.datetime.now().timestamp()
134 duration = ts - self.locktime
137 >= config.config['lockfile_held_duration_warning_threshold_sec']
139 str_duration = datetime_utils.describe_duration_briefly(
142 msg = f'Held {self.lockfile} for {str_duration}'
144 warnings.warn(msg, stacklevel=2)
151 def _signal(self, *args):
155 def _get_lockfile_contents(self) -> str:
156 if self.override_command:
157 cmd = self.override_command
159 cmd = ' '.join(sys.argv)
160 contents = LockFileContents(
163 expiration_timestamp=self.expiration_timestamp,
165 return json.dumps(contents.__dict__)
167 def _detect_stale_lockfile(self) -> None:
169 with open(self.lockfile, 'r') as rf:
170 lines = rf.readlines()
173 line_dict = json.loads(line)
174 contents = LockFileContents(**line_dict)
175 logger.debug(f'Blocking lock contents="{contents}"')
177 # Does the PID exist still?
179 os.kill(contents.pid, 0)
181 msg = f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; force acquiring'
185 # Has the lock expiration expired?
186 if contents.expiration_timestamp is not None:
187 now = datetime.datetime.now().timestamp()
188 if now > contents.expiration_datetime:
189 msg = f'Lockfile {self.lockfile} expiration time has passed; force acquiring'