2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Any, Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
26 logger = logging.getLogger(__name__)
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
35 class RenewableReleasableLease(NonBlockingLease):
36 """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37 that adds some behaviors:
39 + Ability to renew the lease if it's already held without
40 going through the effort of reobtaining the same lease.
42 + Ability to release the lease if it's held and not yet
45 It also is more picky than the base class in terms of when it
46 evaluates to "True" (indicating that the lease is held); it will
47 begin to evaluate to "False" as soon as the lease has expired
48 even if you use to hold it. This means client code should be
49 aware that the lease can disappear and it also means that the
50 performance of evaulating the lease requires a round trip to
53 Note that it is not valid to release the lease more than once
54 (since you no longer have it the second time). The code ignores
55 this. It's also not possible to reobtain an expired or released
56 lease by calling renew. Go create a new lease object at that
65 duration: datetime.timedelta,
66 identifier: str = None,
67 utcnow=datetime.datetime.utcnow,
69 super().__init__(client, path, duration, identifier, utcnow)
72 self.identifier = identifier
75 def release(self) -> bool:
76 """Release the lease, if it's presently being held."""
77 self.client.ensure_path(self.path)
78 holder_path = self.path + "/lease_holder"
79 lock = self.client.Lock(self.path, self.identifier)
82 if not self._is_lease_held_pre_locked():
83 logger.debug("Can't release lease; I don't have it!")
87 if self.client.exists(holder_path):
88 self.client.delete(holder_path)
89 end_lease = now.strftime(self._date_format)
91 # Release by moving end to now.
92 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
93 self.client.create(holder_path, self._encode(data))
95 logger.debug('Successfully released lease')
98 except CancelledError as e:
99 logger.debug('Exception %s in zookeeper?', e)
102 def try_renew(self, duration: datetime.timedelta) -> bool:
103 if not self.obtained:
105 self.obtained = False
106 self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
109 def _is_lease_held_pre_locked(self) -> bool:
110 self.client.ensure_path(self.path)
111 holder_path = self.path + "/lease_holder"
113 if self.client.exists(holder_path):
114 raw, _ = self.client.get(holder_path)
115 data = self._decode(raw)
116 if data["version"] != self._version:
118 current_end = datetime.datetime.strptime(data['end'], self._date_format)
119 if data['holder'] == self.identifier and now <= current_end:
120 logger.debug('Yes, we hold the lease and it isn\'t expired.')
125 if not self.obtained:
127 lock = self.client.Lock(self.path, self.identifier)
130 ret = self._is_lease_held_pre_locked()
131 except CancelledError:
137 f: Optional[Callable] = None,
139 lease_id: str = PROGRAM_NAME,
140 contender_id: str = platform.node(),
141 duration: datetime.timedelta = datetime.timedelta(minutes=5),
142 also_pass_lease: bool = False,
143 also_pass_zk_client: bool = False,
145 """Obtain an exclusive lease identified by the lease_id name
146 before invoking a function or skip invoking the function if the
147 lease cannot be obtained.
149 Note that we use a hacky "RenewableReleasableLease" and not the
150 kazoo NonBlockingLease because the former allows us to release the
151 lease when the user code returns whereas the latter does not.
154 lease_id: string identifying the lease to obtain
155 contender_id: string identifying who's attempting to obtain
156 duration: how long should the lease be held, if obtained?
157 also_pass_lease: pass the lease into the user function
158 also_pass_zk_client: pass our zk client into the user function
161 ... lease_id='zookeeper_doctest',
162 ... duration=datetime.timedelta(seconds=5),
164 ... def f(name: str) -> int:
165 ... print(f'Hello, {name}')
173 if not lease_id.startswith('/leases/'):
174 lease_id = f'/leases/{lease_id}'
175 lease_id = file_utils.fix_multiple_slashes(lease_id)
177 def wrapper(func: Callable) -> Callable:
178 @functools.wraps(func)
179 def wrapper2(*args, **kwargs) -> Optional[Any]:
181 hosts=scott_secrets.ZOOKEEPER_NODES,
184 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
185 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
186 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
189 logger.debug('We have an active zookeeper connection.')
192 'Trying to obtain %s for contender %s now...',
196 lease = RenewableReleasableLease(
204 'Successfully obtained %s for contender %s; invoking user function.',
208 if also_pass_zk_client:
211 args = (*args, lease)
212 ret = func(*args, *kwargs)
214 # We don't care if this release operation succeeds;
215 # there are legitimate cases where it will fail such
216 # as when the user code has already voluntarily
217 # released the lease.
221 'Failed to obtain %s for contender %s, shutting down.',
226 logger.debug('Shutting down zookeeper client.')
238 def run_for_election(
239 f: Optional[Callable] = None,
241 election_id: str = PROGRAM_NAME,
242 contender_id: str = platform.node(),
243 also_pass_zk_client: bool = False,
245 """Run as a contender for a leader election. If/when we become
246 the leader, invoke the user's function.
248 The user's function will be executed on a new thread and must
249 accept a "stop processing" event that it must check regularly.
250 This event will be set automatically by the wrapper in the event
251 that we lose connection to zookeeper (and hence are no longer
252 confident that we are still the leader).
254 The user's function may return at any time which will cause
255 the wrapper to also return and effectively cede leadership.
257 Because the user's code is run in a separate thread, it may
258 not return anything / whatever it returns will be dropped.
261 election_id: global string identifier for the election
262 contender_id: string identifying who is running for leader
263 also_pass_zk_client: pass the zk client into the user code
265 >>> @run_for_election(
266 ... election_id='zookeeper_doctest',
267 ... also_pass_zk_client=True
269 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
273 ... print(f"Hello, {name}, I'm the leader.")
274 ... if stop_now.is_set():
275 ... print("Oops, not anymore?!")
280 ... print("I'm sick of being leader.")
284 Hello, Scott, I'm the leader.
285 Hello, Scott, I'm the leader.
286 Hello, Scott, I'm the leader.
287 I'm sick of being leader.
290 if not election_id.startswith('/elections/'):
291 election_id = f'/elections/{election_id}'
292 election_id = file_utils.fix_multiple_slashes(election_id)
295 """Helper wrapper class."""
297 def __init__(self, func: Callable) -> None:
298 functools.update_wrapper(self, func)
300 self.zk = KazooClient(
301 hosts=scott_secrets.ZOOKEEPER_NODES,
304 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
305 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
306 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
309 logger.debug('We have an active zookeeper connection.')
310 self.stop_event = threading.Event()
311 self.stop_event.clear()
313 def zk_listener(self, state: KazooState) -> None:
314 logger.debug('Listener received state %s.', state)
315 if state != KazooState.CONNECTED:
317 'Bad connection to zookeeper (state=%s); bailing out.',
320 self.stop_event.set()
322 def runit(self, *args, **kwargs) -> None:
323 # Possibly augment args if requested; always pass stop_event
324 if also_pass_zk_client:
325 args = (*args, self.zk)
326 args = (*args, self.stop_event)
328 thread = threading.Thread(
334 'Invoking user code on separate thread: %s',
339 # Periodically poll the zookeeper state (fail safe for
340 # listener) and the state of the child thread.
342 state = self.zk.client_state
343 if state != KazooState.CONNECTED:
345 'Bad connection to zookeeper (state=%s); bailing out.',
348 self.stop_event.set()
349 logger.debug('Waiting for user thread to tear down...')
351 logger.debug('User thread exited after our notification.')
354 thread.join(timeout=5.0)
355 if not thread.is_alive():
356 logger.info('User thread exited on its own.')
359 def __call__(self, *args, **kwargs):
360 election = self.zk.Election(election_id, contender_id)
361 self.zk.add_listener(self.zk_listener)
362 election.run(self.runit, *args, **kwargs)
371 if __name__ == '__main__':