Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Any, Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
22
23 import file_utils
24 import scott_secrets
25
26 logger = logging.getLogger(__name__)
27
28
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
33
34
35 class RenewableReleasableLease(NonBlockingLease):
36     """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37     that adds some behaviors:
38
39         + Ability to renew the lease if it's already held without
40           going through the effort of reobtaining the same lease
41           name.
42
43         + Ability to release the lease if it's held and not yet
44           expired.
45
46     It also is more picky than the base class in terms of when it
47     evaluates to "True" (indicating that the lease is held); it will
48     begin to evaluate to "False" as soon as the lease has expired even
49     if you used to hold it.  This means client code should be aware
50     that the lease can disappear (expire) while held and it also means
51     that the performance of evaulating the lease (i.e. if lease:)
52     requires a round trip to zookeeper every time.
53
54     Note that it is not valid to release the lease more than once
55     (since you no longer have it the second time).  The code ignores
56     the 2nd..nth attempt.  It's also not possible to reobtain an
57     expired or released lease by calling renew.  Go create a new lease
58     object at that point.  Finally, note that when you renew the lease
59     it will evaluate to False briefly as it is reobtained.
60     """
61
62     def __init__(
63         self,
64         client: KazooClient,
65         path: str,
66         duration: datetime.timedelta,
67         identifier: str = None,
68         utcnow=datetime.datetime.utcnow,
69     ):
70         super().__init__(client, path, duration, identifier, utcnow)
71         self.client = client
72         self.path = path
73         self.identifier = identifier
74         self.utcnow = utcnow
75
76     def release(self) -> bool:
77         """Release the lease, if it's presently being held.
78
79         Returns:
80             True if the lease was successfully released,
81             False otherwise.
82         """
83         self.client.ensure_path(self.path)
84         holder_path = self.path + "/lease_holder"
85         lock = self.client.Lock(self.path, self.identifier)
86         try:
87             with lock:
88                 if not self._is_lease_held_pre_locked():
89                     logger.debug("Can't release lease; I don't have it!")
90                     return False
91
92                 now = self.utcnow()
93                 if self.client.exists(holder_path):
94                     self.client.delete(holder_path)
95                 end_lease = now.strftime(self._date_format)
96
97                 # Release by moving end to now.
98                 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
99                 self.client.create(holder_path, self._encode(data))
100                 self.obtained = False
101                 logger.debug('Successfully released lease')
102                 return True
103
104         except CancelledError as e:
105             logger.debug('Exception %s in zookeeper?', e)
106         return False
107
108     def try_renew(self, duration: datetime.timedelta) -> bool:
109         """Attempt to renew a lease that is currently held.  Note that
110         this will cause self to evaluate to False briefly as the lease
111         is renewed.
112
113         Args:
114             duration: the amount of additional time to add to the
115                       current lease expiration.
116
117         Returns:
118             True if the lease was successfully renewed,
119             False otherwise.
120         """
121
122         if not self.obtained:
123             return False
124         self.obtained = False
125         self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
126         return self.obtained
127
128     def _is_lease_held_pre_locked(self) -> bool:
129         self.client.ensure_path(self.path)
130         holder_path = self.path + "/lease_holder"
131         now = self.utcnow()
132         if self.client.exists(holder_path):
133             raw, _ = self.client.get(holder_path)
134             data = self._decode(raw)
135             if data["version"] != self._version:
136                 return False
137             current_end = datetime.datetime.strptime(data['end'], self._date_format)
138             if data['holder'] == self.identifier and now <= current_end:
139                 logger.debug('Yes, we hold the lease and it isn\'t expired.')
140                 return True
141         return False
142
143     def __bool__(self):
144         """Note that this implementation differs from that of the base
145         class in that it probes zookeeper to ensure that the lease is
146         not yet expired and is therefore more expensive.
147         """
148
149         if not self.obtained:
150             return False
151         lock = self.client.Lock(self.path, self.identifier)
152         try:
153             with lock:
154                 ret = self._is_lease_held_pre_locked()
155         except CancelledError:
156             return False
157         return ret
158
159
160 def obtain_lease(
161     f: Optional[Callable] = None,
162     *,
163     lease_id: str = PROGRAM_NAME,
164     contender_id: str = platform.node(),
165     duration: datetime.timedelta = datetime.timedelta(minutes=5),
166     also_pass_lease: bool = False,
167     also_pass_zk_client: bool = False,
168 ):
169     """Obtain an exclusive lease identified by the lease_id name
170     before invoking a function or skip invoking the function if the
171     lease cannot be obtained.
172
173     Note that we use a hacky "RenewableReleasableLease" and not the
174     kazoo NonBlockingLease because the former allows us to release the
175     lease when the user code returns whereas the latter does not.
176
177     Args:
178         lease_id: string identifying the lease to obtain
179         contender_id: string identifying who's attempting to obtain
180         duration: how long should the lease be held, if obtained?
181         also_pass_lease: pass the lease into the user function
182         also_pass_zk_client: pass our zk client into the user function
183
184     >>> @obtain_lease(
185     ...         lease_id='zookeeper_doctest',
186     ...         duration=datetime.timedelta(seconds=5),
187     ... )
188     ... def f(name: str) -> int:
189     ...     print(f'Hello, {name}')
190     ...     return 123
191
192     >>> f('Scott')
193     Hello, Scott
194     123
195
196     """
197     if not lease_id.startswith('/leases/'):
198         lease_id = f'/leases/{lease_id}'
199         lease_id = file_utils.fix_multiple_slashes(lease_id)
200
201     def wrapper(func: Callable) -> Callable:
202         @functools.wraps(func)
203         def wrapper2(*args, **kwargs) -> Optional[Any]:
204             zk = KazooClient(
205                 hosts=scott_secrets.ZOOKEEPER_NODES,
206                 use_ssl=True,
207                 verify_certs=False,
208                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
209                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
210                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
211             )
212             zk.start()
213             logger.debug('We have an active zookeeper connection.')
214
215             logger.debug(
216                 'Trying to obtain %s for contender %s now...',
217                 lease_id,
218                 contender_id,
219             )
220             lease = RenewableReleasableLease(
221                 zk,
222                 lease_id,
223                 duration,
224                 contender_id,
225             )
226             if lease:
227                 logger.debug(
228                     'Successfully obtained %s for contender %s; invoking user function.',
229                     lease_id,
230                     contender_id,
231                 )
232                 if also_pass_zk_client:
233                     args = (*args, zk)
234                 if also_pass_lease:
235                     args = (*args, lease)
236                 ret = func(*args, *kwargs)
237
238                 # We don't care if this release operation succeeds;
239                 # there are legitimate cases where it will fail such
240                 # as when the user code has already voluntarily
241                 # released the lease.
242                 lease.release()
243             else:
244                 logger.debug(
245                     'Failed to obtain %s for contender %s, shutting down.',
246                     lease_id,
247                     contender_id,
248                 )
249                 ret = None
250             logger.debug('Shutting down zookeeper client.')
251             zk.stop()
252             return ret
253
254         return wrapper2
255
256     if f is None:
257         return wrapper
258     else:
259         return wrapper(f)
260
261
262 def run_for_election(
263     f: Optional[Callable] = None,
264     *,
265     election_id: str = PROGRAM_NAME,
266     contender_id: str = platform.node(),
267     also_pass_zk_client: bool = False,
268 ):
269     """Run as a contender for a leader election.  If/when we become
270     the leader, invoke the user's function.
271
272     The user's function will be executed on a new thread and must
273     accept a "stop processing" event that it must check regularly.
274     This event will be set automatically by the wrapper in the event
275     that we lose connection to zookeeper (and hence are no longer
276     confident that we are still the leader).
277
278     The user's function may return at any time which will cause
279     the wrapper to also return and effectively cede leadership.
280
281     Because the user's code is run in a separate thread, it may
282     not return anything / whatever it returns will be dropped.
283
284     Args:
285         election_id: global string identifier for the election
286         contender_id: string identifying who is running for leader
287         also_pass_zk_client: pass the zk client into the user code
288
289     >>> @run_for_election(
290     ...         election_id='zookeeper_doctest',
291     ...         also_pass_zk_client=True
292     ... )
293     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
294     ...     import time
295     ...     count = 0
296     ...     while True:
297     ...         print(f"Hello, {name}, I'm the leader.")
298     ...         if stop_now.is_set():
299     ...             print("Oops, not anymore?!")
300     ...             return
301     ...         time.sleep(0.1)
302     ...         count += 1
303     ...         if count >= 3:
304     ...             print("I'm sick of being leader.")
305     ...             return
306
307     >>> g("Scott")
308     Hello, Scott, I'm the leader.
309     Hello, Scott, I'm the leader.
310     Hello, Scott, I'm the leader.
311     I'm sick of being leader.
312
313     """
314     if not election_id.startswith('/elections/'):
315         election_id = f'/elections/{election_id}'
316         election_id = file_utils.fix_multiple_slashes(election_id)
317
318     class wrapper:
319         """Helper wrapper class."""
320
321         def __init__(self, func: Callable) -> None:
322             functools.update_wrapper(self, func)
323             self.func = func
324             self.zk = KazooClient(
325                 hosts=scott_secrets.ZOOKEEPER_NODES,
326                 use_ssl=True,
327                 verify_certs=False,
328                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
329                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
330                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
331             )
332             self.zk.start()
333             logger.debug('We have an active zookeeper connection.')
334             self.stop_event = threading.Event()
335             self.stop_event.clear()
336
337         def zk_listener(self, state: KazooState) -> None:
338             logger.debug('Listener received state %s.', state)
339             if state != KazooState.CONNECTED:
340                 logger.debug(
341                     'Bad connection to zookeeper (state=%s); bailing out.',
342                     state,
343                 )
344                 self.stop_event.set()
345
346         def runit(self, *args, **kwargs) -> None:
347             # Possibly augment args if requested; always pass stop_event
348             if also_pass_zk_client:
349                 args = (*args, self.zk)
350             args = (*args, self.stop_event)
351
352             thread = threading.Thread(
353                 target=self.func,
354                 args=args,
355                 kwargs=kwargs,
356             )
357             logger.debug(
358                 'Invoking user code on separate thread: %s',
359                 thread.getName(),
360             )
361             thread.start()
362
363             # Periodically poll the zookeeper state (fail safe for
364             # listener) and the state of the child thread.
365             while True:
366                 state = self.zk.client_state
367                 if state != KazooState.CONNECTED:
368                     logger.error(
369                         'Bad connection to zookeeper (state=%s); bailing out.',
370                         state,
371                     )
372                     self.stop_event.set()
373                     logger.debug('Waiting for user thread to tear down...')
374                     thread.join()
375                     logger.debug('User thread exited after our notification.')
376                     return
377
378                 thread.join(timeout=5.0)
379                 if not thread.is_alive():
380                     logger.info('User thread exited on its own.')
381                     return
382
383         def __call__(self, *args, **kwargs):
384             election = self.zk.Election(election_id, contender_id)
385             self.zk.add_listener(self.zk_listener)
386             election.run(self.runit, *args, **kwargs)
387             self.zk.stop()
388
389     if f is None:
390         return wrapper
391     else:
392         return wrapper(f)
393
394
395 if __name__ == '__main__':
396     import doctest
397
398     doctest.testmod()