a1966dcd58258a46a3b12cb4192a09f5aba127bb
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Any, Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
22
23 import file_utils
24 import scott_secrets
25
26 logger = logging.getLogger(__name__)
27
28
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
33
34
35 class RenewableReleasableLease(NonBlockingLease):
36     """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37     that adds some behaviors:
38
39         + Ability to renew the lease if it's already held without
40           going through the effort of reobtaining the same lease.
41
42         + Ability to release the lease if it's held and not yet
43           expired.
44
45     It also is more picky than the base class in terms of when it
46     evaluates to "True" (indicating that the lease is held); it will
47     begin to evaluate to "False" as soon as the lease has expired
48     even if you use to hold it.  This means client code should be
49     aware that the lease can disappear and it also means that the
50     performance of evaulating the lease requires a round trip to
51     zookeeper every time.
52
53     Note that it is not valid to release the lease more than once
54     (since you no longer have it the second time).  The code ignores
55     this.  It's also not possible to reobtain an expired or released
56     lease by calling renew.  Go create a new lease object at that
57     point.
58
59     """
60
61     def __init__(
62         self,
63         client: KazooClient,
64         path: str,
65         duration: datetime.timedelta,
66         identifier: str = None,
67         utcnow=datetime.datetime.utcnow,
68     ):
69         super().__init__(client, path, duration, identifier, utcnow)
70         self.client = client
71         self.path = path
72         self.identifier = identifier
73         self.utcnow = utcnow
74
75     def release(self) -> bool:
76         """Release the lease, if it's presently being held."""
77         self.client.ensure_path(self.path)
78         holder_path = self.path + "/lease_holder"
79         lock = self.client.Lock(self.path, self.identifier)
80         try:
81             with lock:
82                 if not self._is_lease_held_pre_locked():
83                     logger.debug("Can't release lease; I don't have it!")
84                     return False
85
86                 now = self.utcnow()
87                 if self.client.exists(holder_path):
88                     self.client.delete(holder_path)
89                 end_lease = now.strftime(self._date_format)
90
91                 # Release by moving end to now.
92                 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
93                 self.client.create(holder_path, self._encode(data))
94                 self.obtained = False
95                 logger.debug('Successfully released lease')
96                 return True
97
98         except CancelledError as e:
99             logger.debug('Exception %s in zookeeper?', e)
100         return False
101
102     def try_renew(self, duration: datetime.timedelta) -> bool:
103         if not self.obtained:
104             return False
105         self.obtained = False
106         self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
107         return self.obtained
108
109     def _is_lease_held_pre_locked(self) -> bool:
110         self.client.ensure_path(self.path)
111         holder_path = self.path + "/lease_holder"
112         now = self.utcnow()
113         if self.client.exists(holder_path):
114             raw, _ = self.client.get(holder_path)
115             data = self._decode(raw)
116             if data["version"] != self._version:
117                 return False
118             current_end = datetime.datetime.strptime(data['end'], self._date_format)
119             if data['holder'] == self.identifier and now <= current_end:
120                 logger.debug('Yes, we hold the lease and it isn\'t expired.')
121                 return True
122         return False
123
124     def __bool__(self):
125         if not self.obtained:
126             return False
127         lock = self.client.Lock(self.path, self.identifier)
128         try:
129             with lock:
130                 ret = self._is_lease_held_pre_locked()
131         except CancelledError:
132             return False
133         return ret
134
135
136 def obtain_lease(
137     f: Optional[Callable] = None,
138     *,
139     lease_id: str = PROGRAM_NAME,
140     contender_id: str = platform.node(),
141     duration: datetime.timedelta = datetime.timedelta(minutes=5),
142     also_pass_lease: bool = False,
143     also_pass_zk_client: bool = False,
144 ):
145     """Obtain an exclusive lease identified by the lease_id name
146     before invoking a function or skip invoking the function if the
147     lease cannot be obtained.
148
149     Note that we use a hacky "RenewableReleasableLease" and not the
150     kazoo NonBlockingLease because the former allows us to release the
151     lease when the user code returns whereas the latter does not.
152
153     Args:
154         lease_id: string identifying the lease to obtain
155         contender_id: string identifying who's attempting to obtain
156         duration: how long should the lease be held, if obtained?
157         also_pass_lease: pass the lease into the user function
158         also_pass_zk_client: pass our zk client into the user function
159
160     >>> @obtain_lease(
161     ...         lease_id='zookeeper_doctest',
162     ...         duration=datetime.timedelta(seconds=5),
163     ... )
164     ... def f(name: str) -> int:
165     ...     print(f'Hello, {name}')
166     ...     return 123
167
168     >>> f('Scott')
169     Hello, Scott
170     123
171
172     """
173     if not lease_id.startswith('/leases/'):
174         lease_id = f'/leases/{lease_id}'
175         lease_id = file_utils.fix_multiple_slashes(lease_id)
176
177     def wrapper(func: Callable) -> Callable:
178         @functools.wraps(func)
179         def wrapper2(*args, **kwargs) -> Optional[Any]:
180             zk = KazooClient(
181                 hosts=scott_secrets.ZOOKEEPER_NODES,
182                 use_ssl=True,
183                 verify_certs=False,
184                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
185                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
186                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
187             )
188             zk.start()
189             logger.debug('We have an active zookeeper connection.')
190
191             logger.debug(
192                 'Trying to obtain %s for contender %s now...',
193                 lease_id,
194                 contender_id,
195             )
196             lease = RenewableReleasableLease(
197                 zk,
198                 lease_id,
199                 duration,
200                 contender_id,
201             )
202             if lease:
203                 logger.debug(
204                     'Successfully obtained %s for contender %s; invoking user function.',
205                     lease_id,
206                     contender_id,
207                 )
208                 if also_pass_zk_client:
209                     args = (*args, zk)
210                 if also_pass_lease:
211                     args = (*args, lease)
212                 ret = func(*args, *kwargs)
213
214                 # We don't care if this release operation succeeds;
215                 # there are legitimate cases where it will fail such
216                 # as when the user code has already voluntarily
217                 # released the lease.
218                 lease.release()
219             else:
220                 logger.debug(
221                     'Failed to obtain %s for contender %s, shutting down.',
222                     lease_id,
223                     contender_id,
224                 )
225                 ret = None
226             logger.debug('Shutting down zookeeper client.')
227             zk.stop()
228             return ret
229
230         return wrapper2
231
232     if f is None:
233         return wrapper
234     else:
235         return wrapper(f)
236
237
238 def run_for_election(
239     f: Optional[Callable] = None,
240     *,
241     election_id: str = PROGRAM_NAME,
242     contender_id: str = platform.node(),
243     also_pass_zk_client: bool = False,
244 ):
245     """Run as a contender for a leader election.  If/when we become
246     the leader, invoke the user's function.
247
248     The user's function will be executed on a new thread and must
249     accept a "stop processing" event that it must check regularly.
250     This event will be set automatically by the wrapper in the event
251     that we lose connection to zookeeper (and hence are no longer
252     confident that we are still the leader).
253
254     The user's function may return at any time which will cause
255     the wrapper to also return and effectively cede leadership.
256
257     Because the user's code is run in a separate thread, it may
258     not return anything / whatever it returns will be dropped.
259
260     Args:
261         election_id: global string identifier for the election
262         contender_id: string identifying who is running for leader
263         also_pass_zk_client: pass the zk client into the user code
264
265     >>> @run_for_election(
266     ...         election_id='zookeeper_doctest',
267     ...         also_pass_zk_client=True
268     ... )
269     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
270     ...     import time
271     ...     count = 0
272     ...     while True:
273     ...         print(f"Hello, {name}, I'm the leader.")
274     ...         if stop_now.is_set():
275     ...             print("Oops, not anymore?!")
276     ...             return
277     ...         time.sleep(0.1)
278     ...         count += 1
279     ...         if count >= 3:
280     ...             print("I'm sick of being leader.")
281     ...             return
282
283     >>> g("Scott")
284     Hello, Scott, I'm the leader.
285     Hello, Scott, I'm the leader.
286     Hello, Scott, I'm the leader.
287     I'm sick of being leader.
288
289     """
290     if not election_id.startswith('/elections/'):
291         election_id = f'/elections/{election_id}'
292         election_id = file_utils.fix_multiple_slashes(election_id)
293
294     class wrapper:
295         """Helper wrapper class."""
296
297         def __init__(self, func: Callable) -> None:
298             functools.update_wrapper(self, func)
299             self.func = func
300             self.zk = KazooClient(
301                 hosts=scott_secrets.ZOOKEEPER_NODES,
302                 use_ssl=True,
303                 verify_certs=False,
304                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
305                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
306                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
307             )
308             self.zk.start()
309             logger.debug('We have an active zookeeper connection.')
310             self.stop_event = threading.Event()
311             self.stop_event.clear()
312
313         def zk_listener(self, state: KazooState) -> None:
314             logger.debug('Listener received state %s.', state)
315             if state != KazooState.CONNECTED:
316                 logger.debug(
317                     'Bad connection to zookeeper (state=%s); bailing out.',
318                     state,
319                 )
320                 self.stop_event.set()
321
322         def runit(self, *args, **kwargs) -> None:
323             # Possibly augment args if requested; always pass stop_event
324             if also_pass_zk_client:
325                 args = (*args, self.zk)
326             args = (*args, self.stop_event)
327
328             thread = threading.Thread(
329                 target=self.func,
330                 args=args,
331                 kwargs=kwargs,
332             )
333             logger.debug(
334                 'Invoking user code on separate thread: %s',
335                 thread.getName(),
336             )
337             thread.start()
338
339             # Periodically poll the zookeeper state (fail safe for
340             # listener) and the state of the child thread.
341             while True:
342                 state = self.zk.client_state
343                 if state != KazooState.CONNECTED:
344                     logger.error(
345                         'Bad connection to zookeeper (state=%s); bailing out.',
346                         state,
347                     )
348                     self.stop_event.set()
349                     logger.debug('Waiting for user thread to tear down...')
350                     thread.join()
351                     logger.debug('User thread exited after our notification.')
352                     return
353
354                 thread.join(timeout=5.0)
355                 if not thread.is_alive():
356                     logger.info('User thread exited on its own.')
357                     return
358
359         def __call__(self, *args, **kwargs):
360             election = self.zk.Election(election_id, contender_id)
361             self.zk.add_listener(self.zk_listener)
362             election.run(self.runit, *args, **kwargs)
363             self.zk.stop()
364
365     if f is None:
366         return wrapper
367     else:
368         return wrapper(f)
369
370
371 if __name__ == '__main__':
372     import doctest
373
374     doctest.testmod()