Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Any, Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
22
23 from pyutils import argparse_utils, config
24 from pyutils.files import file_utils
25
26 logger = logging.getLogger(__name__)
27
28 cfg = config.add_commandline_args(
29     f'Zookeeper ({__file__})',
30     'Args related python-zookeeper interactions',
31 )
32 cfg.add_argument(
33     '--zookeeper_nodes',
34     type=str,
35     default=None,
36     help='Comma separated host:port or ip:port address(es)',
37 )
38 cfg.add_argument(
39     '--zookeeper_client_cert_path',
40     type=argparse_utils.valid_filename,
41     default=None,
42     metavar='FILENAME',
43     help='Path to file containing client certificate.',
44 )
45 cfg.add_argument(
46     '--zookeeper_client_passphrase',
47     type=str,
48     default=None,
49     metavar='PASSPHRASE',
50     help='Pass phrase for unlocking the client certificate.',
51 )
52
53
54 # On module load, grab what we presume to be our process' program name.
55 # This is used, by default, to construct internal zookeeper paths (e.g.
56 # to identify a lease or election).
57 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
58
59
60 def get_started_zk_client() -> KazooClient:
61     zk = KazooClient(
62         hosts=config.config['zookeeper_nodes'],
63         use_ssl=True,
64         verify_certs=False,
65         keyfile=config.config['zookeeper_client_cert_path'],
66         keyfile_password=config.config['zookeeper_client_passphrase'],
67         certfile=config.config['zookeeper_client_cert_path'],
68     )
69     zk.start()
70     logger.debug('We have an active zookeeper connection.')
71     return zk
72
73
74 class RenewableReleasableLease(NonBlockingLease):
75     """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
76     that adds some behaviors:
77
78         + Ability to renew the lease if it's already held without
79           going through the effort of reobtaining the same lease
80           name.
81
82         + Ability to release the lease if it's held and not yet
83           expired.
84
85     It also is more picky than the base class in terms of when it
86     evaluates to "True" (indicating that the lease is held); it will
87     begin to evaluate to "False" as soon as the lease has expired even
88     if you used to hold it.  This means client code should be aware
89     that the lease can disappear (expire) while held and it also means
90     that the performance of evaulating the lease (i.e. if lease:)
91     requires a round trip to zookeeper every time.
92
93     Note that it is not valid to release the lease more than once
94     (since you no longer have it the second time).  The code ignores
95     the 2nd..nth attempt.  It's also not possible to reobtain an
96     expired or released lease by calling renew.  Go create a new lease
97     object at that point.  Finally, note that when you renew the lease
98     it will evaluate to False briefly as it is reobtained.
99     """
100
101     def __init__(
102         self,
103         client: KazooClient,
104         path: str,
105         duration: datetime.timedelta,
106         identifier: str = None,
107         utcnow=datetime.datetime.utcnow,
108     ):
109         super().__init__(client, path, duration, identifier, utcnow)
110         self.client = client
111         self.path = path
112         self.identifier = identifier
113         self.utcnow = utcnow
114
115     def release(self) -> bool:
116         """Release the lease, if it's presently being held.
117
118         Returns:
119             True if the lease was successfully released,
120             False otherwise.
121         """
122         self.client.ensure_path(self.path)
123         holder_path = self.path + "/lease_holder"
124         lock = self.client.Lock(self.path, self.identifier)
125         try:
126             with lock:
127                 if not self._is_lease_held_pre_locked():
128                     logger.debug("Can't release lease; I don't have it!")
129                     return False
130
131                 now = self.utcnow()
132                 if self.client.exists(holder_path):
133                     self.client.delete(holder_path)
134                 end_lease = now.strftime(self._date_format)
135
136                 # Release by moving end to now.
137                 data = {
138                     'version': self._version,
139                     'holder': self.identifier,
140                     'end': end_lease,
141                 }
142                 self.client.create(holder_path, self._encode(data))
143                 self.obtained = False
144                 logger.debug('Successfully released lease')
145                 return True
146
147         except CancelledError as e:
148             logger.debug('Exception %s in zookeeper?', e)
149         return False
150
151     def try_renew(self, duration: datetime.timedelta) -> bool:
152         """Attempt to renew a lease that is currently held.  Note that
153         this will cause self to evaluate to False briefly as the lease
154         is renewed.
155
156         Args:
157             duration: the amount of additional time to add to the
158                       current lease expiration.
159
160         Returns:
161             True if the lease was successfully renewed,
162             False otherwise.
163         """
164
165         if not self.obtained:
166             return False
167         self.obtained = False
168         self._attempt_obtaining(
169             self.client, self.path, duration, self.identifier, self.utcnow
170         )
171         return self.obtained
172
173     def _is_lease_held_pre_locked(self) -> bool:
174         self.client.ensure_path(self.path)
175         holder_path = self.path + "/lease_holder"
176         now = self.utcnow()
177         if self.client.exists(holder_path):
178             raw, _ = self.client.get(holder_path)
179             data = self._decode(raw)
180             if data["version"] != self._version:
181                 return False
182             current_end = datetime.datetime.strptime(data['end'], self._date_format)
183             if data['holder'] == self.identifier and now <= current_end:
184                 logger.debug('Yes, we hold the lease and it isn\'t expired.')
185                 return True
186         return False
187
188     def __bool__(self):
189         """Note that this implementation differs from that of the base
190         class in that it probes zookeeper to ensure that the lease is
191         not yet expired and is therefore more expensive.
192         """
193
194         if not self.obtained:
195             return False
196         lock = self.client.Lock(self.path, self.identifier)
197         try:
198             with lock:
199                 ret = self._is_lease_held_pre_locked()
200         except CancelledError:
201             return False
202         return ret
203
204
205 def obtain_lease(
206     f: Optional[Callable] = None,
207     *,
208     lease_id: str = PROGRAM_NAME,
209     contender_id: str = platform.node(),
210     duration: datetime.timedelta = datetime.timedelta(minutes=5),
211     also_pass_lease: bool = False,
212     also_pass_zk_client: bool = False,
213 ):
214     """Obtain an exclusive lease identified by the lease_id name
215     before invoking a function or skip invoking the function if the
216     lease cannot be obtained.
217
218     Note that we use a hacky "RenewableReleasableLease" and not the
219     kazoo NonBlockingLease because the former allows us to release the
220     lease when the user code returns whereas the latter does not.
221
222     Args:
223         lease_id: string identifying the lease to obtain
224         contender_id: string identifying who's attempting to obtain
225         duration: how long should the lease be held, if obtained?
226         also_pass_lease: pass the lease into the user function
227         also_pass_zk_client: pass our zk client into the user function
228
229     >>> @obtain_lease(
230     ...         lease_id='zookeeper_doctest',
231     ...         duration=datetime.timedelta(seconds=5),
232     ... )
233     ... def f(name: str) -> int:
234     ...     print(f'Hello, {name}')
235     ...     return 123
236
237     >>> f('Scott')
238     Hello, Scott
239     123
240
241     """
242     if not lease_id.startswith('/leases/'):
243         lease_id = f'/leases/{lease_id}'
244         lease_id = file_utils.fix_multiple_slashes(lease_id)
245
246     def wrapper(func: Callable) -> Callable:
247         @functools.wraps(func)
248         def wrapper2(*args, **kwargs) -> Optional[Any]:
249             zk = get_started_zk_client()
250             logger.debug(
251                 'Trying to obtain %s for contender %s now...',
252                 lease_id,
253                 contender_id,
254             )
255             lease = RenewableReleasableLease(
256                 zk,
257                 lease_id,
258                 duration,
259                 contender_id,
260             )
261             if lease:
262                 logger.debug(
263                     'Successfully obtained %s for contender %s; invoking user function.',
264                     lease_id,
265                     contender_id,
266                 )
267                 if also_pass_zk_client:
268                     args = (*args, zk)
269                 if also_pass_lease:
270                     args = (*args, lease)
271                 ret = func(*args, *kwargs)
272
273                 # We don't care if this release operation succeeds;
274                 # there are legitimate cases where it will fail such
275                 # as when the user code has already voluntarily
276                 # released the lease.
277                 lease.release()
278             else:
279                 logger.debug(
280                     'Failed to obtain %s for contender %s, shutting down.',
281                     lease_id,
282                     contender_id,
283                 )
284                 ret = None
285             logger.debug('Shutting down zookeeper client.')
286             zk.stop()
287             return ret
288
289         return wrapper2
290
291     if f is None:
292         return wrapper
293     else:
294         return wrapper(f)
295
296
297 def run_for_election(
298     f: Optional[Callable] = None,
299     *,
300     election_id: str = PROGRAM_NAME,
301     contender_id: str = platform.node(),
302     also_pass_zk_client: bool = False,
303 ):
304     """Run as a contender for a leader election.  If/when we become
305     the leader, invoke the user's function.
306
307     The user's function will be executed on a new thread and must
308     accept a "stop processing" event that it must check regularly.
309     This event will be set automatically by the wrapper in the event
310     that we lose connection to zookeeper (and hence are no longer
311     confident that we are still the leader).
312
313     The user's function may return at any time which will cause
314     the wrapper to also return and effectively cede leadership.
315
316     Because the user's code is run in a separate thread, it may
317     not return anything / whatever it returns will be dropped.
318
319     Args:
320         election_id: global string identifier for the election
321         contender_id: string identifying who is running for leader
322         also_pass_zk_client: pass the zk client into the user code
323
324     >>> @run_for_election(
325     ...         election_id='zookeeper_doctest',
326     ...         also_pass_zk_client=True
327     ... )
328     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
329     ...     import time
330     ...     count = 0
331     ...     while True:
332     ...         print(f"Hello, {name}, I'm the leader.")
333     ...         if stop_now.is_set():
334     ...             print("Oops, not anymore?!")
335     ...             return
336     ...         time.sleep(0.1)
337     ...         count += 1
338     ...         if count >= 3:
339     ...             print("I'm sick of being leader.")
340     ...             return
341
342     >>> g("Scott")
343     Hello, Scott, I'm the leader.
344     Hello, Scott, I'm the leader.
345     Hello, Scott, I'm the leader.
346     I'm sick of being leader.
347
348     """
349     if not election_id.startswith('/elections/'):
350         election_id = f'/elections/{election_id}'
351         election_id = file_utils.fix_multiple_slashes(election_id)
352
353     class wrapper:
354         """Helper wrapper class."""
355
356         def __init__(self, func: Callable) -> None:
357             functools.update_wrapper(self, func)
358             self.func = func
359             self.zk = get_started_zk_client()
360             self.stop_event = threading.Event()
361             self.stop_event.clear()
362
363         def zk_listener(self, state: KazooState) -> None:
364             logger.debug('Listener received state %s.', state)
365             if state != KazooState.CONNECTED:
366                 logger.debug(
367                     'Bad connection to zookeeper (state=%s); bailing out.',
368                     state,
369                 )
370                 self.stop_event.set()
371
372         def runit(self, *args, **kwargs) -> None:
373             # Possibly augment args if requested; always pass stop_event
374             if also_pass_zk_client:
375                 args = (*args, self.zk)
376             args = (*args, self.stop_event)
377
378             thread = threading.Thread(
379                 target=self.func,
380                 args=args,
381                 kwargs=kwargs,
382             )
383             logger.debug(
384                 'Invoking user code on separate thread: %s',
385                 thread.getName(),
386             )
387             thread.start()
388
389             # Periodically poll the zookeeper state (fail safe for
390             # listener) and the state of the child thread.
391             while True:
392                 state = self.zk.client_state
393                 if state != KazooState.CONNECTED:
394                     logger.error(
395                         'Bad connection to zookeeper (state=%s); bailing out.',
396                         state,
397                     )
398                     self.stop_event.set()
399                     logger.debug('Waiting for user thread to tear down...')
400                     thread.join()
401                     logger.debug('User thread exited after our notification.')
402                     return
403
404                 thread.join(timeout=5.0)
405                 if not thread.is_alive():
406                     logger.info('User thread exited on its own.')
407                     return
408
409         def __call__(self, *args, **kwargs):
410             election = self.zk.Election(election_id, contender_id)
411             self.zk.add_listener(self.zk_listener)
412             election.run(self.runit, *args, **kwargs)
413             self.zk.stop()
414
415     if f is None:
416         return wrapper
417     else:
418         return wrapper(f)
419
420
421 if __name__ == '__main__':
422     import doctest
423
424     doctest.testmod()