2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Any, Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
26 logger = logging.getLogger(__name__)
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
35 class RenewableReleasableLease(NonBlockingLease):
36 """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37 that adds some behaviors:
39 + Ability to renew the lease if it's already held without
40 going through the effort of reobtaining the same lease
43 + Ability to release the lease if it's held and not yet
46 It also is more picky than the base class in terms of when it
47 evaluates to "True" (indicating that the lease is held); it will
48 begin to evaluate to "False" as soon as the lease has expired even
49 if you used to hold it. This means client code should be aware
50 that the lease can disappear (expire) while held and it also means
51 that the performance of evaulating the lease (i.e. if lease:)
52 requires a round trip to zookeeper every time.
54 Note that it is not valid to release the lease more than once
55 (since you no longer have it the second time). The code ignores
56 the 2nd..nth attempt. It's also not possible to reobtain an
57 expired or released lease by calling renew. Go create a new lease
58 object at that point. Finally, note that when you renew the lease
59 it will evaluate to False briefly as it is reobtained.
66 duration: datetime.timedelta,
67 identifier: str = None,
68 utcnow=datetime.datetime.utcnow,
70 super().__init__(client, path, duration, identifier, utcnow)
73 self.identifier = identifier
76 def release(self) -> bool:
77 """Release the lease, if it's presently being held.
80 True if the lease was successfully released,
83 self.client.ensure_path(self.path)
84 holder_path = self.path + "/lease_holder"
85 lock = self.client.Lock(self.path, self.identifier)
88 if not self._is_lease_held_pre_locked():
89 logger.debug("Can't release lease; I don't have it!")
93 if self.client.exists(holder_path):
94 self.client.delete(holder_path)
95 end_lease = now.strftime(self._date_format)
97 # Release by moving end to now.
98 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
99 self.client.create(holder_path, self._encode(data))
100 self.obtained = False
101 logger.debug('Successfully released lease')
104 except CancelledError as e:
105 logger.debug('Exception %s in zookeeper?', e)
108 def try_renew(self, duration: datetime.timedelta) -> bool:
109 """Attempt to renew a lease that is currently held. Note that
110 this will cause self to evaluate to False briefly as the lease
114 duration: the amount of additional time to add to the
115 current lease expiration.
118 True if the lease was successfully renewed,
122 if not self.obtained:
124 self.obtained = False
125 self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
128 def _is_lease_held_pre_locked(self) -> bool:
129 self.client.ensure_path(self.path)
130 holder_path = self.path + "/lease_holder"
132 if self.client.exists(holder_path):
133 raw, _ = self.client.get(holder_path)
134 data = self._decode(raw)
135 if data["version"] != self._version:
137 current_end = datetime.datetime.strptime(data['end'], self._date_format)
138 if data['holder'] == self.identifier and now <= current_end:
139 logger.debug('Yes, we hold the lease and it isn\'t expired.')
144 """Note that this implementation differs from that of the base
145 class in that it probes zookeeper to ensure that the lease is
146 not yet expired and is therefore more expensive.
149 if not self.obtained:
151 lock = self.client.Lock(self.path, self.identifier)
154 ret = self._is_lease_held_pre_locked()
155 except CancelledError:
161 f: Optional[Callable] = None,
163 lease_id: str = PROGRAM_NAME,
164 contender_id: str = platform.node(),
165 duration: datetime.timedelta = datetime.timedelta(minutes=5),
166 also_pass_lease: bool = False,
167 also_pass_zk_client: bool = False,
169 """Obtain an exclusive lease identified by the lease_id name
170 before invoking a function or skip invoking the function if the
171 lease cannot be obtained.
173 Note that we use a hacky "RenewableReleasableLease" and not the
174 kazoo NonBlockingLease because the former allows us to release the
175 lease when the user code returns whereas the latter does not.
178 lease_id: string identifying the lease to obtain
179 contender_id: string identifying who's attempting to obtain
180 duration: how long should the lease be held, if obtained?
181 also_pass_lease: pass the lease into the user function
182 also_pass_zk_client: pass our zk client into the user function
185 ... lease_id='zookeeper_doctest',
186 ... duration=datetime.timedelta(seconds=5),
188 ... def f(name: str) -> int:
189 ... print(f'Hello, {name}')
197 if not lease_id.startswith('/leases/'):
198 lease_id = f'/leases/{lease_id}'
199 lease_id = file_utils.fix_multiple_slashes(lease_id)
201 def wrapper(func: Callable) -> Callable:
202 @functools.wraps(func)
203 def wrapper2(*args, **kwargs) -> Optional[Any]:
205 hosts=scott_secrets.ZOOKEEPER_NODES,
208 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
209 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
210 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
213 logger.debug('We have an active zookeeper connection.')
216 'Trying to obtain %s for contender %s now...',
220 lease = RenewableReleasableLease(
228 'Successfully obtained %s for contender %s; invoking user function.',
232 if also_pass_zk_client:
235 args = (*args, lease)
236 ret = func(*args, *kwargs)
238 # We don't care if this release operation succeeds;
239 # there are legitimate cases where it will fail such
240 # as when the user code has already voluntarily
241 # released the lease.
245 'Failed to obtain %s for contender %s, shutting down.',
250 logger.debug('Shutting down zookeeper client.')
262 def run_for_election(
263 f: Optional[Callable] = None,
265 election_id: str = PROGRAM_NAME,
266 contender_id: str = platform.node(),
267 also_pass_zk_client: bool = False,
269 """Run as a contender for a leader election. If/when we become
270 the leader, invoke the user's function.
272 The user's function will be executed on a new thread and must
273 accept a "stop processing" event that it must check regularly.
274 This event will be set automatically by the wrapper in the event
275 that we lose connection to zookeeper (and hence are no longer
276 confident that we are still the leader).
278 The user's function may return at any time which will cause
279 the wrapper to also return and effectively cede leadership.
281 Because the user's code is run in a separate thread, it may
282 not return anything / whatever it returns will be dropped.
285 election_id: global string identifier for the election
286 contender_id: string identifying who is running for leader
287 also_pass_zk_client: pass the zk client into the user code
289 >>> @run_for_election(
290 ... election_id='zookeeper_doctest',
291 ... also_pass_zk_client=True
293 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
297 ... print(f"Hello, {name}, I'm the leader.")
298 ... if stop_now.is_set():
299 ... print("Oops, not anymore?!")
304 ... print("I'm sick of being leader.")
308 Hello, Scott, I'm the leader.
309 Hello, Scott, I'm the leader.
310 Hello, Scott, I'm the leader.
311 I'm sick of being leader.
314 if not election_id.startswith('/elections/'):
315 election_id = f'/elections/{election_id}'
316 election_id = file_utils.fix_multiple_slashes(election_id)
319 """Helper wrapper class."""
321 def __init__(self, func: Callable) -> None:
322 functools.update_wrapper(self, func)
324 self.zk = KazooClient(
325 hosts=scott_secrets.ZOOKEEPER_NODES,
328 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
329 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
330 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
333 logger.debug('We have an active zookeeper connection.')
334 self.stop_event = threading.Event()
335 self.stop_event.clear()
337 def zk_listener(self, state: KazooState) -> None:
338 logger.debug('Listener received state %s.', state)
339 if state != KazooState.CONNECTED:
341 'Bad connection to zookeeper (state=%s); bailing out.',
344 self.stop_event.set()
346 def runit(self, *args, **kwargs) -> None:
347 # Possibly augment args if requested; always pass stop_event
348 if also_pass_zk_client:
349 args = (*args, self.zk)
350 args = (*args, self.stop_event)
352 thread = threading.Thread(
358 'Invoking user code on separate thread: %s',
363 # Periodically poll the zookeeper state (fail safe for
364 # listener) and the state of the child thread.
366 state = self.zk.client_state
367 if state != KazooState.CONNECTED:
369 'Bad connection to zookeeper (state=%s); bailing out.',
372 self.stop_event.set()
373 logger.debug('Waiting for user thread to tear down...')
375 logger.debug('User thread exited after our notification.')
378 thread.join(timeout=5.0)
379 if not thread.is_alive():
380 logger.info('User thread exited on its own.')
383 def __call__(self, *args, **kwargs):
384 election = self.zk.Election(election_id, contender_id)
385 self.zk.add_listener(self.zk_listener)
386 election.run(self.runit, *args, **kwargs)
395 if __name__ == '__main__':