Make smart futures avoid polling.
[python_utils.git] / lockfile.py
1 #!/usr/bin/env python3
2
3 from dataclasses import dataclass
4 import datetime
5 import json
6 import logging
7 import os
8 import signal
9 import sys
10 from typing import Optional
11
12 import decorator_utils
13
14
15 logger = logging.getLogger(__name__)
16
17
18 class LockFileException(Exception):
19     pass
20
21
22 @dataclass
23 class LockFileContents:
24     pid: int
25     commandline: str
26     expiration_timestamp: float
27
28
29 class LockFile(object):
30     """A file locking mechanism that has context-manager support so you
31     can use it in a with statement.  e.g.
32
33     with LockFile('./foo.lock'):
34         # do a bunch of stuff... if the process dies we have a signal
35         # handler to do cleanup.  Other code (in this process or another)
36         # that tries to take the same lockfile will block.  There is also
37         # some logic for detecting stale locks.
38
39     """
40
41     def __init__(
42             self,
43             lockfile_path: str,
44             *,
45             do_signal_cleanup: bool = True,
46             expiration_timestamp: Optional[float] = None,
47             override_command: Optional[str] = None,
48     ) -> None:
49         self.is_locked = False
50         self.lockfile = lockfile_path
51         self.override_command = override_command
52         if do_signal_cleanup:
53             signal.signal(signal.SIGINT, self._signal)
54             signal.signal(signal.SIGTERM, self._signal)
55         self.expiration_timestamp = expiration_timestamp
56
57     def locked(self):
58         return self.is_locked
59
60     def available(self):
61         return not os.path.exists(self.lockfile)
62
63     def try_acquire_lock_once(self) -> bool:
64         logger.debug(f"Trying to acquire {self.lockfile}.")
65         try:
66             # Attempt to create the lockfile.  These flags cause
67             # os.open to raise an OSError if the file already
68             # exists.
69             fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
70             with os.fdopen(fd, "a") as f:
71                 contents = self._get_lockfile_contents()
72                 logger.debug(contents)
73                 f.write(contents)
74             logger.debug(f'Success; I own {self.lockfile}.')
75             self.is_locked = True
76             return True
77         except OSError:
78             pass
79         logger.debug(f'Failed; I could not acquire {self.lockfile}.')
80         return False
81
82     def acquire_with_retries(
83             self,
84             *,
85             initial_delay: float = 1.0,
86             backoff_factor: float = 2.0,
87             max_attempts = 5
88     ) -> bool:
89
90         @decorator_utils.retry_if_false(tries = max_attempts,
91                                         delay_sec = initial_delay,
92                                         backoff = backoff_factor)
93         def _try_acquire_lock_with_retries() -> bool:
94             success = self.try_acquire_lock_once()
95             if not success and os.path.exists(self.lockfile):
96                 self._detect_stale_lockfile()
97             return success
98
99         if os.path.exists(self.lockfile):
100             self._detect_stale_lockfile()
101         return _try_acquire_lock_with_retries()
102
103     def release(self):
104         try:
105             os.unlink(self.lockfile)
106         except Exception as e:
107             logger.exception(e)
108         self.is_locked = False
109
110     def __enter__(self):
111         if self.acquire_with_retries():
112             return self
113         msg = f"Couldn't acquire {self.lockfile}; giving up."
114         logger.warning(msg)
115         raise LockFileException(msg)
116
117     def __exit__(self, type, value, traceback):
118         self.release()
119
120     def __del__(self):
121         if self.is_locked:
122             self.release()
123
124     def _signal(self, *args):
125         if self.is_locked:
126             self.release()
127
128     def _get_lockfile_contents(self) -> str:
129         if self.override_command:
130             cmd = self.override_command
131         else:
132             cmd = ' '.join(sys.argv)
133         print(cmd)
134         contents = LockFileContents(
135             pid = os.getpid(),
136             commandline = cmd,
137             expiration_timestamp = self.expiration_timestamp,
138         )
139         return json.dumps(contents.__dict__)
140
141     def _detect_stale_lockfile(self) -> None:
142         try:
143             with open(self.lockfile, 'r') as rf:
144                 lines = rf.readlines()
145                 if len(lines) == 1:
146                     line = lines[0]
147                     line_dict = json.loads(line)
148                     contents = LockFileContents(**line_dict)
149                     logger.debug(f'Blocking lock contents="{contents}"')
150
151                     # Does the PID exist still?
152                     try:
153                         os.kill(contents.pid, 0)
154                     except OSError:
155                         logger.debug('The pid seems stale; killing the lock.')
156                         self.release()
157
158                     # Has the lock expiration expired?
159                     if contents.expiration_timestamp is not None:
160                         now = datetime.datetime.now().timestamp()
161                         if now > contents.expiration_datetime:
162                             logger.debug('The expiration time has passed; ' +
163                                          'killing the lock')
164                             self.release()
165         except Exception:
166             pass