2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
7 This is a module for making it easier to deal with Zookeeper / Kazoo.
8 Apache Zookeeper (https://zookeeper.apache.org/) is a consistent centralized
9 datastore. :mod:`pyutils.config` optionally uses it to save/read program
10 configuration. But it's also very useful for things like distributed
11 master election, locking, etc...
22 from typing import Any, Callable, Optional
24 from kazoo.client import KazooClient
25 from kazoo.exceptions import CancelledError
26 from kazoo.protocol.states import KazooState
27 from kazoo.recipe.lease import NonBlockingLease
29 from pyutils import argparse_utils, config
30 from pyutils.files import file_utils
32 logger = logging.getLogger(__name__)
34 cfg = config.add_commandline_args(
35 f'Zookeeper ({__file__})',
36 'Args related python-zookeeper interactions',
42 help='Comma separated host:port or ip:port address(es)',
45 '--zookeeper_client_cert_path',
46 type=argparse_utils.valid_filename,
49 help='Path to file containing client certificate.',
52 '--zookeeper_client_passphrase',
56 help='Pass phrase for unlocking the client certificate.',
60 # On module load, grab what we presume to be our process' program name.
61 # This is used, by default, to construct internal zookeeper paths (e.g.
62 # to identify a lease or election).
63 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
66 def get_started_zk_client() -> KazooClient:
69 A zk client library reference that has been connected and started
70 using the commandline provided address, certificates and passphrase.
73 hosts=config.config['zookeeper_nodes'],
76 keyfile=config.config['zookeeper_client_cert_path'],
77 keyfile_password=config.config['zookeeper_client_passphrase'],
78 certfile=config.config['zookeeper_client_cert_path'],
81 logger.debug('We have an active zookeeper connection.')
85 class RenewableReleasableLease(NonBlockingLease):
86 """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
87 (see https://kazoo.readthedocs.io/en/latest/api/recipe/lease.html#kazoo.recipe.lease.NonBlockingLease) that adds some behaviors:
89 + Ability to renew the lease if it's already held without
90 going through the effort of reobtaining the same lease
93 + Ability to release the lease if it's held and not yet
96 It also is more picky than the base class in terms of when it
97 evaluates to "True" (indicating that the lease is held); it will
98 begin to evaluate to "False" as soon as the lease has expired even
99 if you used to hold it. This means client code should be aware
100 that the lease can disappear (expire) while held and it also means
101 that the performance of evaulating the lease (i.e. if lease:)
102 requires a round trip to zookeeper every time.
104 Note that it is not valid to release the lease more than once
105 (since you no longer have it the second time). The code ignores
106 the 2nd..nth attempt. It's also not possible to reobtain an
107 expired or released lease by calling renew. Go create a new lease
108 object at that point. Finally, note that when you renew the lease
109 it will evaluate to False briefly as it is reobtained.
116 duration: datetime.timedelta,
117 identifier: str = None,
118 utcnow=datetime.datetime.utcnow,
120 """Construct the RenewableReleasableLease.
123 client: a KazooClient that is connected and started
124 path: the path to the lease in zookeeper
125 duration: duration during which the lease is reserved
126 identifier: unique name to use for this lease holder.
127 Reuse in order to renew the lease.
128 utcnow: clock function, by default returning
129 :meth:`datetime.datetime.utcnow`. Used for testing.
132 super().__init__(client, path, duration, identifier, utcnow)
135 self.identifier = identifier
138 def release(self) -> bool:
139 """Release the lease, if it's presently being held.
142 True if the lease was successfully released,
145 self.client.ensure_path(self.path)
146 holder_path = self.path + "/lease_holder"
147 lock = self.client.Lock(self.path, self.identifier)
150 if not self._is_lease_held_pre_locked():
151 logger.debug("Can't release lease; I don't have it!")
155 if self.client.exists(holder_path):
156 self.client.delete(holder_path)
157 end_lease = now.strftime(self._date_format)
159 # Release by moving end to now.
161 'version': self._version,
162 'holder': self.identifier,
165 self.client.create(holder_path, self._encode(data))
166 self.obtained = False
167 logger.debug('Successfully released lease')
170 except CancelledError as e:
171 logger.debug('Exception %s in zookeeper?', e)
174 def try_renew(self, duration: datetime.timedelta) -> bool:
175 """Attempt to renew a lease that is currently held. Note that
176 this will cause self to evaluate to False briefly as the lease
180 duration: the amount of additional time to add to the
181 current lease expiration.
184 True if the lease was successfully renewed,
188 if not self.obtained:
190 self.obtained = False
191 self._attempt_obtaining(
192 self.client, self.path, duration, self.identifier, self.utcnow
196 def _is_lease_held_pre_locked(self) -> bool:
197 self.client.ensure_path(self.path)
198 holder_path = self.path + "/lease_holder"
200 if self.client.exists(holder_path):
201 raw, _ = self.client.get(holder_path)
202 data = self._decode(raw)
203 if data["version"] != self._version:
205 current_end = datetime.datetime.strptime(data['end'], self._date_format)
206 if data['holder'] == self.identifier and now <= current_end:
207 logger.debug('Yes, we hold the lease and it isn\'t expired.')
215 This implementation differs from that of the base class in
216 that it probes zookeeper to ensure that the lease is not yet
217 expired and is therefore more expensive.
220 if not self.obtained:
222 lock = self.client.Lock(self.path, self.identifier)
225 ret = self._is_lease_held_pre_locked()
226 except CancelledError:
232 f: Optional[Callable] = None,
234 lease_id: str = PROGRAM_NAME,
235 contender_id: str = platform.node(),
236 duration: datetime.timedelta = datetime.timedelta(minutes=5),
237 also_pass_lease: bool = False,
238 also_pass_zk_client: bool = False,
240 """Obtain an exclusive lease identified by the lease_id name
241 before invoking a function or skip invoking the function if the
242 lease cannot be obtained.
244 Note that we use a hacky "RenewableReleasableLease" and not the
245 kazoo NonBlockingLease because the former allows us to release the
246 lease when the user code returns whereas the latter does not.
249 lease_id: string identifying the lease to obtain
250 contender_id: string identifying who's attempting to obtain
251 duration: how long should the lease be held, if obtained?
252 also_pass_lease: pass the lease into the user function
253 also_pass_zk_client: pass our zk client into the user function
256 ... lease_id='zookeeper_doctest',
257 ... duration=datetime.timedelta(seconds=5),
259 ... def f(name: str) -> int:
260 ... print(f'Hello, {name}')
268 if not lease_id.startswith('/leases/'):
269 lease_id = f'/leases/{lease_id}'
270 lease_id = file_utils.fix_multiple_slashes(lease_id)
272 def wrapper(func: Callable) -> Callable:
273 @functools.wraps(func)
274 def wrapper2(*args, **kwargs) -> Optional[Any]:
275 zk = get_started_zk_client()
277 'Trying to obtain %s for contender %s now...',
281 lease = RenewableReleasableLease(
289 'Successfully obtained %s for contender %s; invoking user function.',
293 if also_pass_zk_client:
296 args = (*args, lease)
297 ret = func(*args, *kwargs)
299 # We don't care if this release operation succeeds;
300 # there are legitimate cases where it will fail such
301 # as when the user code has already voluntarily
302 # released the lease.
306 'Failed to obtain %s for contender %s, shutting down.',
311 logger.debug('Shutting down zookeeper client.')
323 def run_for_election(
324 f: Optional[Callable] = None,
326 election_id: str = PROGRAM_NAME,
327 contender_id: str = platform.node(),
328 also_pass_zk_client: bool = False,
330 """Run as a contender for a leader election. If/when we become
331 the leader, invoke the user's function.
333 The user's function will be executed on a new thread and must
334 accept a "stop processing" event that it must check regularly.
335 This event will be set automatically by the wrapper in the event
336 that we lose connection to zookeeper (and hence are no longer
337 confident that we are still the leader).
339 The user's function may return at any time which will cause
340 the wrapper to also return and effectively cede leadership.
342 Because the user's code is run in a separate thread, it may
343 not return anything / whatever it returns will be dropped.
346 election_id: global string identifier for the election
347 contender_id: string identifying who is running for leader
348 also_pass_zk_client: pass the zk client into the user code
350 >>> @run_for_election(
351 ... election_id='zookeeper_doctest',
352 ... also_pass_zk_client=True
354 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
358 ... print(f"Hello, {name}, I'm the leader.")
359 ... if stop_now.is_set():
360 ... print("Oops, not anymore?!")
365 ... print("I'm sick of being leader.")
369 Hello, Scott, I'm the leader.
370 Hello, Scott, I'm the leader.
371 Hello, Scott, I'm the leader.
372 I'm sick of being leader.
375 if not election_id.startswith('/elections/'):
376 election_id = f'/elections/{election_id}'
377 election_id = file_utils.fix_multiple_slashes(election_id)
380 """Helper wrapper class."""
382 def __init__(self, func: Callable) -> None:
383 functools.update_wrapper(self, func)
385 self.zk = get_started_zk_client()
386 self.stop_event = threading.Event()
387 self.stop_event.clear()
389 def zk_listener(self, state: KazooState) -> None:
390 logger.debug('Listener received state %s.', state)
391 if state != KazooState.CONNECTED:
393 'Bad connection to zookeeper (state=%s); bailing out.',
396 self.stop_event.set()
398 def runit(self, *args, **kwargs) -> None:
399 # Possibly augment args if requested; always pass stop_event
400 if also_pass_zk_client:
401 args = (*args, self.zk)
402 args = (*args, self.stop_event)
404 thread = threading.Thread(
410 'Invoking user code on separate thread: %s',
415 # Periodically poll the zookeeper state (fail safe for
416 # listener) and the state of the child thread.
418 state = self.zk.client_state
419 if state != KazooState.CONNECTED:
421 'Bad connection to zookeeper (state=%s); bailing out.',
424 self.stop_event.set()
425 logger.debug('Waiting for user thread to tear down...')
427 logger.debug('User thread exited after our notification.')
430 thread.join(timeout=5.0)
431 if not thread.is_alive():
432 logger.info('User thread exited on its own.')
435 def __call__(self, *args, **kwargs):
436 election = self.zk.Election(election_id, contender_id)
437 self.zk.add_listener(self.zk_listener)
438 election.run(self.runit, *args, **kwargs)
447 if __name__ == '__main__':