Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """
7 This is a module for making it easier to deal with Zookeeper / Kazoo.
8 Apache Zookeeper (https://zookeeper.apache.org/) is a consistent centralized
9 datastore.  :mod:`pyutils.config` optionally uses it to save/read program
10 configuration.  But it's also very useful for things like distributed
11 master election, locking, etc...
12 """
13
14
15 import datetime
16 import functools
17 import logging
18 import os
19 import platform
20 import sys
21 import threading
22 from typing import Any, Callable, Optional
23
24 from kazoo.client import KazooClient
25 from kazoo.exceptions import CancelledError
26 from kazoo.protocol.states import KazooState
27 from kazoo.recipe.lease import NonBlockingLease
28
29 from pyutils import argparse_utils, config
30 from pyutils.files import file_utils
31
32 logger = logging.getLogger(__name__)
33
34 cfg = config.add_commandline_args(
35     f'Zookeeper ({__file__})',
36     'Args related python-zookeeper interactions',
37 )
38 cfg.add_argument(
39     '--zookeeper_nodes',
40     type=str,
41     default=None,
42     help='Comma separated host:port or ip:port address(es)',
43 )
44 cfg.add_argument(
45     '--zookeeper_client_cert_path',
46     type=argparse_utils.valid_filename,
47     default=None,
48     metavar='FILENAME',
49     help='Path to file containing client certificate.',
50 )
51 cfg.add_argument(
52     '--zookeeper_client_passphrase',
53     type=str,
54     default=None,
55     metavar='PASSPHRASE',
56     help='Pass phrase for unlocking the client certificate.',
57 )
58
59
60 # On module load, grab what we presume to be our process' program name.
61 # This is used, by default, to construct internal zookeeper paths (e.g.
62 # to identify a lease or election).
63 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
64
65
66 def get_started_zk_client() -> KazooClient:
67     """
68     Returns:
69         A zk client library reference that has been connected and started
70         using the commandline provided address, certificates and passphrase.
71     """
72     zk = KazooClient(
73         hosts=config.config['zookeeper_nodes'],
74         use_ssl=True,
75         verify_certs=False,
76         keyfile=config.config['zookeeper_client_cert_path'],
77         keyfile_password=config.config['zookeeper_client_passphrase'],
78         certfile=config.config['zookeeper_client_cert_path'],
79     )
80     zk.start()
81     logger.debug('We have an active zookeeper connection.')
82     return zk
83
84
85 class RenewableReleasableLease(NonBlockingLease):
86     """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
87     (see https://kazoo.readthedocs.io/en/latest/api/recipe/lease.html#kazoo.recipe.lease.NonBlockingLease) that adds some behaviors:
88
89         + Ability to renew the lease if it's already held without
90           going through the effort of reobtaining the same lease
91           name.
92
93         + Ability to release the lease if it's held and not yet
94           expired.
95
96     It also is more picky than the base class in terms of when it
97     evaluates to "True" (indicating that the lease is held); it will
98     begin to evaluate to "False" as soon as the lease has expired even
99     if you used to hold it.  This means client code should be aware
100     that the lease can disappear (expire) while held and it also means
101     that the performance of evaulating the lease (i.e. if lease:)
102     requires a round trip to zookeeper every time.
103
104     Note that it is not valid to release the lease more than once
105     (since you no longer have it the second time).  The code ignores
106     the 2nd..nth attempt.  It's also not possible to reobtain an
107     expired or released lease by calling renew.  Go create a new lease
108     object at that point.  Finally, note that when you renew the lease
109     it will evaluate to False briefly as it is reobtained.
110     """
111
112     def __init__(
113         self,
114         client: KazooClient,
115         path: str,
116         duration: datetime.timedelta,
117         identifier: str = None,
118         utcnow=datetime.datetime.utcnow,
119     ):
120         """Construct the RenewableReleasableLease.
121
122         Args:
123             client: a KazooClient that is connected and started
124             path: the path to the lease in zookeeper
125             duration: duration during which the lease is reserved
126             identifier: unique name to use for this lease holder.
127                 Reuse in order to renew the lease.
128             utcnow: clock function, by default returning
129                 :meth:`datetime.datetime.utcnow`. Used for testing.
130
131         """
132         super().__init__(client, path, duration, identifier, utcnow)
133         self.client = client
134         self.path = path
135         self.identifier = identifier
136         self.utcnow = utcnow
137
138     def release(self) -> bool:
139         """Release the lease, if it's presently being held.
140
141         Returns:
142             True if the lease was successfully released,
143             False otherwise.
144         """
145         self.client.ensure_path(self.path)
146         holder_path = self.path + "/lease_holder"
147         lock = self.client.Lock(self.path, self.identifier)
148         try:
149             with lock:
150                 if not self._is_lease_held_pre_locked():
151                     logger.debug("Can't release lease; I don't have it!")
152                     return False
153
154                 now = self.utcnow()
155                 if self.client.exists(holder_path):
156                     self.client.delete(holder_path)
157                 end_lease = now.strftime(self._date_format)
158
159                 # Release by moving end to now.
160                 data = {
161                     'version': self._version,
162                     'holder': self.identifier,
163                     'end': end_lease,
164                 }
165                 self.client.create(holder_path, self._encode(data))
166                 self.obtained = False
167                 logger.debug('Successfully released lease')
168                 return True
169
170         except CancelledError as e:
171             logger.debug('Exception %s in zookeeper?', e)
172         return False
173
174     def try_renew(self, duration: datetime.timedelta) -> bool:
175         """Attempt to renew a lease that is currently held.  Note that
176         this will cause self to evaluate to False briefly as the lease
177         is renewed.
178
179         Args:
180             duration: the amount of additional time to add to the
181                 current lease expiration.
182
183         Returns:
184             True if the lease was successfully renewed,
185             False otherwise.
186         """
187
188         if not self.obtained:
189             return False
190         self.obtained = False
191         self._attempt_obtaining(
192             self.client, self.path, duration, self.identifier, self.utcnow
193         )
194         return self.obtained
195
196     def _is_lease_held_pre_locked(self) -> bool:
197         self.client.ensure_path(self.path)
198         holder_path = self.path + "/lease_holder"
199         now = self.utcnow()
200         if self.client.exists(holder_path):
201             raw, _ = self.client.get(holder_path)
202             data = self._decode(raw)
203             if data["version"] != self._version:
204                 return False
205             current_end = datetime.datetime.strptime(data['end'], self._date_format)
206             if data['holder'] == self.identifier and now <= current_end:
207                 logger.debug('Yes, we hold the lease and it isn\'t expired.')
208                 return True
209         return False
210
211     def __bool__(self):
212         """
213         .. note:
214
215             This implementation differs from that of the base class in
216             that it probes zookeeper to ensure that the lease is not yet
217             expired and is therefore more expensive.
218
219         """
220         if not self.obtained:
221             return False
222         lock = self.client.Lock(self.path, self.identifier)
223         try:
224             with lock:
225                 ret = self._is_lease_held_pre_locked()
226         except CancelledError:
227             return False
228         return ret
229
230
231 def obtain_lease(
232     f: Optional[Callable] = None,
233     *,
234     lease_id: str = PROGRAM_NAME,
235     contender_id: str = platform.node(),
236     duration: datetime.timedelta = datetime.timedelta(minutes=5),
237     also_pass_lease: bool = False,
238     also_pass_zk_client: bool = False,
239 ):
240     """Obtain an exclusive lease identified by the lease_id name
241     before invoking a function or skip invoking the function if the
242     lease cannot be obtained.
243
244     Note that we use a hacky "RenewableReleasableLease" and not the
245     kazoo NonBlockingLease because the former allows us to release the
246     lease when the user code returns whereas the latter does not.
247
248     Args:
249         lease_id: string identifying the lease to obtain
250         contender_id: string identifying who's attempting to obtain
251         duration: how long should the lease be held, if obtained?
252         also_pass_lease: pass the lease into the user function
253         also_pass_zk_client: pass our zk client into the user function
254
255     >>> @obtain_lease(
256     ...         lease_id='zookeeper_doctest',
257     ...         duration=datetime.timedelta(seconds=5),
258     ... )
259     ... def f(name: str) -> int:
260     ...     print(f'Hello, {name}')
261     ...     return 123
262
263     >>> f('Scott')
264     Hello, Scott
265     123
266
267     """
268     if not lease_id.startswith('/leases/'):
269         lease_id = f'/leases/{lease_id}'
270         lease_id = file_utils.fix_multiple_slashes(lease_id)
271
272     def wrapper(func: Callable) -> Callable:
273         @functools.wraps(func)
274         def wrapper2(*args, **kwargs) -> Optional[Any]:
275             zk = get_started_zk_client()
276             logger.debug(
277                 'Trying to obtain %s for contender %s now...',
278                 lease_id,
279                 contender_id,
280             )
281             lease = RenewableReleasableLease(
282                 zk,
283                 lease_id,
284                 duration,
285                 contender_id,
286             )
287             if lease:
288                 logger.debug(
289                     'Successfully obtained %s for contender %s; invoking user function.',
290                     lease_id,
291                     contender_id,
292                 )
293                 if also_pass_zk_client:
294                     args = (*args, zk)
295                 if also_pass_lease:
296                     args = (*args, lease)
297                 ret = func(*args, *kwargs)
298
299                 # We don't care if this release operation succeeds;
300                 # there are legitimate cases where it will fail such
301                 # as when the user code has already voluntarily
302                 # released the lease.
303                 lease.release()
304             else:
305                 logger.debug(
306                     'Failed to obtain %s for contender %s, shutting down.',
307                     lease_id,
308                     contender_id,
309                 )
310                 ret = None
311             logger.debug('Shutting down zookeeper client.')
312             zk.stop()
313             return ret
314
315         return wrapper2
316
317     if f is None:
318         return wrapper
319     else:
320         return wrapper(f)
321
322
323 def run_for_election(
324     f: Optional[Callable] = None,
325     *,
326     election_id: str = PROGRAM_NAME,
327     contender_id: str = platform.node(),
328     also_pass_zk_client: bool = False,
329 ):
330     """Run as a contender for a leader election.  If/when we become
331     the leader, invoke the user's function.
332
333     The user's function will be executed on a new thread and must
334     accept a "stop processing" event that it must check regularly.
335     This event will be set automatically by the wrapper in the event
336     that we lose connection to zookeeper (and hence are no longer
337     confident that we are still the leader).
338
339     The user's function may return at any time which will cause
340     the wrapper to also return and effectively cede leadership.
341
342     Because the user's code is run in a separate thread, it may
343     not return anything / whatever it returns will be dropped.
344
345     Args:
346         election_id: global string identifier for the election
347         contender_id: string identifying who is running for leader
348         also_pass_zk_client: pass the zk client into the user code
349
350     >>> @run_for_election(
351     ...         election_id='zookeeper_doctest',
352     ...         also_pass_zk_client=True
353     ... )
354     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
355     ...     import time
356     ...     count = 0
357     ...     while True:
358     ...         print(f"Hello, {name}, I'm the leader.")
359     ...         if stop_now.is_set():
360     ...             print("Oops, not anymore?!")
361     ...             return
362     ...         time.sleep(0.1)
363     ...         count += 1
364     ...         if count >= 3:
365     ...             print("I'm sick of being leader.")
366     ...             return
367
368     >>> g("Scott")
369     Hello, Scott, I'm the leader.
370     Hello, Scott, I'm the leader.
371     Hello, Scott, I'm the leader.
372     I'm sick of being leader.
373
374     """
375     if not election_id.startswith('/elections/'):
376         election_id = f'/elections/{election_id}'
377         election_id = file_utils.fix_multiple_slashes(election_id)
378
379     class wrapper:
380         """Helper wrapper class."""
381
382         def __init__(self, func: Callable) -> None:
383             functools.update_wrapper(self, func)
384             self.func = func
385             self.zk = get_started_zk_client()
386             self.stop_event = threading.Event()
387             self.stop_event.clear()
388
389         def zk_listener(self, state: KazooState) -> None:
390             logger.debug('Listener received state %s.', state)
391             if state != KazooState.CONNECTED:
392                 logger.debug(
393                     'Bad connection to zookeeper (state=%s); bailing out.',
394                     state,
395                 )
396                 self.stop_event.set()
397
398         def runit(self, *args, **kwargs) -> None:
399             # Possibly augment args if requested; always pass stop_event
400             if also_pass_zk_client:
401                 args = (*args, self.zk)
402             args = (*args, self.stop_event)
403
404             thread = threading.Thread(
405                 target=self.func,
406                 args=args,
407                 kwargs=kwargs,
408             )
409             logger.debug(
410                 'Invoking user code on separate thread: %s',
411                 thread.getName(),
412             )
413             thread.start()
414
415             # Periodically poll the zookeeper state (fail safe for
416             # listener) and the state of the child thread.
417             while True:
418                 state = self.zk.client_state
419                 if state != KazooState.CONNECTED:
420                     logger.error(
421                         'Bad connection to zookeeper (state=%s); bailing out.',
422                         state,
423                     )
424                     self.stop_event.set()
425                     logger.debug('Waiting for user thread to tear down...')
426                     thread.join()
427                     logger.debug('User thread exited after our notification.')
428                     return
429
430                 thread.join(timeout=5.0)
431                 if not thread.is_alive():
432                     logger.info('User thread exited on its own.')
433                     return
434
435         def __call__(self, *args, **kwargs):
436             election = self.zk.Election(election_id, contender_id)
437             self.zk.add_listener(self.zk_listener)
438             election.run(self.runit, *args, **kwargs)
439             self.zk.stop()
440
441     if f is None:
442         return wrapper
443     else:
444         return wrapper(f)
445
446
447 if __name__ == '__main__':
448     import doctest
449
450     doctest.testmod()