2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Any, Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
23 from pyutils import argparse_utils, config
24 from pyutils.files import file_utils
26 logger = logging.getLogger(__name__)
28 cfg = config.add_commandline_args(
29 f'Zookeeper ({__file__})',
30 'Args related python-zookeeper interactions',
36 help='Comma separated host:port or ip:port address(es)',
39 '--zookeeper_client_cert_path',
40 type=argparse_utils.valid_filename,
43 help='Path to file containing client certificate.',
46 '--zookeeper_client_passphrase',
50 help='Pass phrase for unlocking the client certificate.',
54 # On module load, grab what we presume to be our process' program name.
55 # This is used, by default, to construct internal zookeeper paths (e.g.
56 # to identify a lease or election).
57 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
60 def get_started_zk_client() -> KazooClient:
62 hosts=config.config['zookeeper_nodes'],
65 keyfile=config.config['zookeeper_client_cert_path'],
66 keyfile_password=config.config['zookeeper_client_passphrase'],
67 certfile=config.config['zookeeper_client_cert_path'],
70 logger.debug('We have an active zookeeper connection.')
74 class RenewableReleasableLease(NonBlockingLease):
75 """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
76 that adds some behaviors:
78 + Ability to renew the lease if it's already held without
79 going through the effort of reobtaining the same lease
82 + Ability to release the lease if it's held and not yet
85 It also is more picky than the base class in terms of when it
86 evaluates to "True" (indicating that the lease is held); it will
87 begin to evaluate to "False" as soon as the lease has expired even
88 if you used to hold it. This means client code should be aware
89 that the lease can disappear (expire) while held and it also means
90 that the performance of evaulating the lease (i.e. if lease:)
91 requires a round trip to zookeeper every time.
93 Note that it is not valid to release the lease more than once
94 (since you no longer have it the second time). The code ignores
95 the 2nd..nth attempt. It's also not possible to reobtain an
96 expired or released lease by calling renew. Go create a new lease
97 object at that point. Finally, note that when you renew the lease
98 it will evaluate to False briefly as it is reobtained.
105 duration: datetime.timedelta,
106 identifier: str = None,
107 utcnow=datetime.datetime.utcnow,
109 super().__init__(client, path, duration, identifier, utcnow)
112 self.identifier = identifier
115 def release(self) -> bool:
116 """Release the lease, if it's presently being held.
119 True if the lease was successfully released,
122 self.client.ensure_path(self.path)
123 holder_path = self.path + "/lease_holder"
124 lock = self.client.Lock(self.path, self.identifier)
127 if not self._is_lease_held_pre_locked():
128 logger.debug("Can't release lease; I don't have it!")
132 if self.client.exists(holder_path):
133 self.client.delete(holder_path)
134 end_lease = now.strftime(self._date_format)
136 # Release by moving end to now.
138 'version': self._version,
139 'holder': self.identifier,
142 self.client.create(holder_path, self._encode(data))
143 self.obtained = False
144 logger.debug('Successfully released lease')
147 except CancelledError as e:
148 logger.debug('Exception %s in zookeeper?', e)
151 def try_renew(self, duration: datetime.timedelta) -> bool:
152 """Attempt to renew a lease that is currently held. Note that
153 this will cause self to evaluate to False briefly as the lease
157 duration: the amount of additional time to add to the
158 current lease expiration.
161 True if the lease was successfully renewed,
165 if not self.obtained:
167 self.obtained = False
168 self._attempt_obtaining(
169 self.client, self.path, duration, self.identifier, self.utcnow
173 def _is_lease_held_pre_locked(self) -> bool:
174 self.client.ensure_path(self.path)
175 holder_path = self.path + "/lease_holder"
177 if self.client.exists(holder_path):
178 raw, _ = self.client.get(holder_path)
179 data = self._decode(raw)
180 if data["version"] != self._version:
182 current_end = datetime.datetime.strptime(data['end'], self._date_format)
183 if data['holder'] == self.identifier and now <= current_end:
184 logger.debug('Yes, we hold the lease and it isn\'t expired.')
189 """Note that this implementation differs from that of the base
190 class in that it probes zookeeper to ensure that the lease is
191 not yet expired and is therefore more expensive.
194 if not self.obtained:
196 lock = self.client.Lock(self.path, self.identifier)
199 ret = self._is_lease_held_pre_locked()
200 except CancelledError:
206 f: Optional[Callable] = None,
208 lease_id: str = PROGRAM_NAME,
209 contender_id: str = platform.node(),
210 duration: datetime.timedelta = datetime.timedelta(minutes=5),
211 also_pass_lease: bool = False,
212 also_pass_zk_client: bool = False,
214 """Obtain an exclusive lease identified by the lease_id name
215 before invoking a function or skip invoking the function if the
216 lease cannot be obtained.
218 Note that we use a hacky "RenewableReleasableLease" and not the
219 kazoo NonBlockingLease because the former allows us to release the
220 lease when the user code returns whereas the latter does not.
223 lease_id: string identifying the lease to obtain
224 contender_id: string identifying who's attempting to obtain
225 duration: how long should the lease be held, if obtained?
226 also_pass_lease: pass the lease into the user function
227 also_pass_zk_client: pass our zk client into the user function
230 ... lease_id='zookeeper_doctest',
231 ... duration=datetime.timedelta(seconds=5),
233 ... def f(name: str) -> int:
234 ... print(f'Hello, {name}')
242 if not lease_id.startswith('/leases/'):
243 lease_id = f'/leases/{lease_id}'
244 lease_id = file_utils.fix_multiple_slashes(lease_id)
246 def wrapper(func: Callable) -> Callable:
247 @functools.wraps(func)
248 def wrapper2(*args, **kwargs) -> Optional[Any]:
249 zk = get_started_zk_client()
251 'Trying to obtain %s for contender %s now...',
255 lease = RenewableReleasableLease(
263 'Successfully obtained %s for contender %s; invoking user function.',
267 if also_pass_zk_client:
270 args = (*args, lease)
271 ret = func(*args, *kwargs)
273 # We don't care if this release operation succeeds;
274 # there are legitimate cases where it will fail such
275 # as when the user code has already voluntarily
276 # released the lease.
280 'Failed to obtain %s for contender %s, shutting down.',
285 logger.debug('Shutting down zookeeper client.')
297 def run_for_election(
298 f: Optional[Callable] = None,
300 election_id: str = PROGRAM_NAME,
301 contender_id: str = platform.node(),
302 also_pass_zk_client: bool = False,
304 """Run as a contender for a leader election. If/when we become
305 the leader, invoke the user's function.
307 The user's function will be executed on a new thread and must
308 accept a "stop processing" event that it must check regularly.
309 This event will be set automatically by the wrapper in the event
310 that we lose connection to zookeeper (and hence are no longer
311 confident that we are still the leader).
313 The user's function may return at any time which will cause
314 the wrapper to also return and effectively cede leadership.
316 Because the user's code is run in a separate thread, it may
317 not return anything / whatever it returns will be dropped.
320 election_id: global string identifier for the election
321 contender_id: string identifying who is running for leader
322 also_pass_zk_client: pass the zk client into the user code
324 >>> @run_for_election(
325 ... election_id='zookeeper_doctest',
326 ... also_pass_zk_client=True
328 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
332 ... print(f"Hello, {name}, I'm the leader.")
333 ... if stop_now.is_set():
334 ... print("Oops, not anymore?!")
339 ... print("I'm sick of being leader.")
343 Hello, Scott, I'm the leader.
344 Hello, Scott, I'm the leader.
345 Hello, Scott, I'm the leader.
346 I'm sick of being leader.
349 if not election_id.startswith('/elections/'):
350 election_id = f'/elections/{election_id}'
351 election_id = file_utils.fix_multiple_slashes(election_id)
354 """Helper wrapper class."""
356 def __init__(self, func: Callable) -> None:
357 functools.update_wrapper(self, func)
359 self.zk = get_started_zk_client()
360 self.stop_event = threading.Event()
361 self.stop_event.clear()
363 def zk_listener(self, state: KazooState) -> None:
364 logger.debug('Listener received state %s.', state)
365 if state != KazooState.CONNECTED:
367 'Bad connection to zookeeper (state=%s); bailing out.',
370 self.stop_event.set()
372 def runit(self, *args, **kwargs) -> None:
373 # Possibly augment args if requested; always pass stop_event
374 if also_pass_zk_client:
375 args = (*args, self.zk)
376 args = (*args, self.stop_event)
378 thread = threading.Thread(
384 'Invoking user code on separate thread: %s',
389 # Periodically poll the zookeeper state (fail safe for
390 # listener) and the state of the child thread.
392 state = self.zk.client_state
393 if state != KazooState.CONNECTED:
395 'Bad connection to zookeeper (state=%s); bailing out.',
398 self.stop_event.set()
399 logger.debug('Waiting for user thread to tear down...')
401 logger.debug('User thread exited after our notification.')
404 thread.join(timeout=5.0)
405 if not thread.is_alive():
406 logger.info('User thread exited on its own.')
409 def __call__(self, *args, **kwargs):
410 election = self.zk.Election(election_id, contender_id)
411 self.zk.add_listener(self.zk_listener)
412 election.run(self.runit, *args, **kwargs)
421 if __name__ == '__main__':