3 from dataclasses import dataclass
10 from typing import Optional
13 import decorator_utils
16 cfg = config.add_commandline_args(
17 f'Lockfile ({__file__})',
18 'Args related to lockfiles')
20 '--lockfile_held_duration_warning_threshold_sec',
24 help='If a lock is held for longer than this threshold we log a warning'
26 logger = logging.getLogger(__name__)
29 class LockFileException(Exception):
34 class LockFileContents:
37 expiration_timestamp: float
40 class LockFile(object):
41 """A file locking mechanism that has context-manager support so you
42 can use it in a with statement. e.g.
44 with LockFile('./foo.lock'):
45 # do a bunch of stuff... if the process dies we have a signal
46 # handler to do cleanup. Other code (in this process or another)
47 # that tries to take the same lockfile will block. There is also
48 # some logic for detecting stale locks.
55 do_signal_cleanup: bool = True,
56 expiration_timestamp: Optional[float] = None,
57 override_command: Optional[str] = None,
59 self.is_locked = False
60 self.lockfile = lockfile_path
61 self.override_command = override_command
63 signal.signal(signal.SIGINT, self._signal)
64 signal.signal(signal.SIGTERM, self._signal)
65 self.expiration_timestamp = expiration_timestamp
71 return not os.path.exists(self.lockfile)
73 def try_acquire_lock_once(self) -> bool:
74 logger.debug(f"Trying to acquire {self.lockfile}.")
76 # Attempt to create the lockfile. These flags cause
77 # os.open to raise an OSError if the file already
79 fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
80 with os.fdopen(fd, "a") as f:
81 contents = self._get_lockfile_contents()
82 logger.debug(contents)
84 logger.debug(f'Success; I own {self.lockfile}.')
89 logger.warning(f'Could not acquire {self.lockfile}.')
92 def acquire_with_retries(
95 initial_delay: float = 1.0,
96 backoff_factor: float = 2.0,
100 @decorator_utils.retry_if_false(tries = max_attempts,
101 delay_sec = initial_delay,
102 backoff = backoff_factor)
103 def _try_acquire_lock_with_retries() -> bool:
104 success = self.try_acquire_lock_once()
105 if not success and os.path.exists(self.lockfile):
106 self._detect_stale_lockfile()
109 if os.path.exists(self.lockfile):
110 self._detect_stale_lockfile()
111 return _try_acquire_lock_with_retries()
115 os.unlink(self.lockfile)
116 except Exception as e:
118 self.is_locked = False
121 if self.acquire_with_retries():
122 self.locktime = datetime.datetime.now().timestamp()
124 msg = f"Couldn't acquire {self.lockfile}; giving up."
126 raise LockFileException(msg)
128 def __exit__(self, type, value, traceback):
130 ts = datetime.datetime.now().timestamp()
131 duration = ts - self.locktime
132 if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
133 str_duration = datetime_utils.describe_duration_briefly(duration)
134 logger.warning(f'Held {self.lockfile} for {str_duration}')
141 def _signal(self, *args):
145 def _get_lockfile_contents(self) -> str:
146 if self.override_command:
147 cmd = self.override_command
149 cmd = ' '.join(sys.argv)
151 contents = LockFileContents(
154 expiration_timestamp = self.expiration_timestamp,
156 return json.dumps(contents.__dict__)
158 def _detect_stale_lockfile(self) -> None:
160 with open(self.lockfile, 'r') as rf:
161 lines = rf.readlines()
164 line_dict = json.loads(line)
165 contents = LockFileContents(**line_dict)
166 logger.debug(f'Blocking lock contents="{contents}"')
168 # Does the PID exist still?
170 os.kill(contents.pid, 0)
172 logger.warning(f'Lockfile {self.lockfile}\'s pid ({contents.pid}) is stale; ' +
176 # Has the lock expiration expired?
177 if contents.expiration_timestamp is not None:
178 now = datetime.datetime.now().timestamp()
179 if now > contents.expiration_datetime:
180 logger.warning(f'Lockfile {self.lockfile} expiration time has passed; ' +