Adds a releasable and renewable lease subclass.
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Any, Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
22
23 import file_utils
24 import scott_secrets
25
26 logger = logging.getLogger(__name__)
27
28
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
33
34
35 class RenewableReleasableLease(NonBlockingLease):
36     """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37     that adds some behaviors:
38
39         + Ability to renew the lease if it's already held without
40           going through the effort of reobtaining the same lease.
41
42         + Ability to release the lease if it's held and not yet
43           expired.
44
45     It also is more picky than the base class in terms of when it
46     evaluates to "True" (indicating that the lease is held); it will
47     begin to evaluate to "False" as soon as the lease has expired
48     even if you use to hold it.  This means client code should be
49     aware that the lease can disappear and it also means that the
50     performance of evaulating the lease requires a round trip to
51     zookeeper every time.
52
53     Note that it is not valid to release the lease more than once
54     (since you no longer have it the second time).  The code ignores
55     this.  It's also not possible to reobtain an expired or released
56     lease by calling renew.  Go create a new lease object at that
57     point.
58
59     """
60
61     def __init__(
62         self,
63         client: KazooClient,
64         path: str,
65         duration: datetime.timedelta,
66         identifier: str = None,
67         utcnow=datetime.datetime.utcnow,
68     ):
69         super().__init__(client, path, duration, identifier, utcnow)
70         self.client = client
71         self.path = path
72         self.identifier = identifier
73         self.utcnow = utcnow
74
75     def release(self):
76         """Release the lease, if it's presently being held."""
77         self.client.ensure_path(self.path)
78         holder_path = self.path + "/lease_holder"
79         lock = self.client.Lock(self.path, self.identifier)
80         try:
81             with lock:
82                 if not self._is_lease_held_pre_locked():
83                     logger.debug("Can't release lease; I don't have it!")
84                     return
85
86                 now = self.utcnow()
87                 if self.client.exists(holder_path):
88                     self.client.delete(holder_path)
89                 end_lease = now.strftime(self._date_format)
90
91                 # Release by moving end to now.
92                 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
93                 self.client.create(holder_path, self._encode(data))
94                 self.obtained = False
95
96         except CancelledError:
97             pass
98
99     def try_renew(self, duration: datetime.timedelta) -> bool:
100         if not self.obtained:
101             return False
102         self.obtained = False
103         self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
104         return self.obtained
105
106     def _is_lease_held_pre_locked(self) -> bool:
107         self.client.ensure_path(self.path)
108         holder_path = self.path + "/lease_holder"
109         now = self.utcnow()
110         if self.client.exists(holder_path):
111             raw, _ = self.client.get(holder_path)
112             data = self._decode(raw)
113             if data["version"] != self._version:
114                 return False
115             current_end = datetime.datetime.strptime(data['end'], self._date_format)
116             if data['holder'] != self.identifier or now > current_end:
117                 return False
118             else:
119                 return True
120         return False
121
122     def __bool__(self):
123         if not self.obtained:
124             return False
125         lock = self.client.Lock(self.path, self.identifier)
126         try:
127             with lock:
128                 ret = self._is_lease_held_pre_locked()
129         except CancelledError:
130             return False
131         return ret
132
133
134 def obtain_lease(
135     f: Optional[Callable] = None,
136     *,
137     lease_id: str = PROGRAM_NAME,
138     contender_id: str = platform.node(),
139     duration: datetime.timedelta = datetime.timedelta(minutes=5),
140     also_pass_lease: bool = False,
141     also_pass_zk_client: bool = False,
142 ):
143     """Obtain an exclusive lease identified by the lease_id name
144     before invoking a function or skip invoking the function if the
145     lease cannot be obtained.
146
147     Note that we use a hacky "RenewableReleasableLease" and not the
148     kazoo NonBlockingLease because the former allows us to release the
149     lease when the user code returns whereas the latter does not.
150
151     Args:
152         lease_id: string identifying the lease to obtain
153         contender_id: string identifying who's attempting to obtain
154         duration: how long should the lease be held, if obtained?
155         also_pass_lease: pass the lease into the user function
156         also_pass_zk_client: pass our zk client into the user function
157
158     >>> @obtain_lease(
159     ...         lease_id='zookeeper_doctest',
160     ...         duration=datetime.timedelta(seconds=5),
161     ... )
162     ... def f(name: str) -> int:
163     ...     print(f'Hello, {name}')
164     ...     return 123
165
166     >>> f('Scott')
167     Hello, Scott
168     123
169
170     """
171     if not lease_id.startswith('/leases/'):
172         lease_id = f'/leases/{lease_id}'
173         lease_id = file_utils.fix_multiple_slashes(lease_id)
174
175     def wrapper(func: Callable) -> Callable:
176         @functools.wraps(func)
177         def wrapper2(*args, **kwargs) -> Optional[Any]:
178             zk = KazooClient(
179                 hosts=scott_secrets.ZOOKEEPER_NODES,
180                 use_ssl=True,
181                 verify_certs=False,
182                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
183                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
184                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
185             )
186             zk.start()
187             logger.debug('We have an active zookeeper connection.')
188
189             logger.debug(
190                 'Trying to obtain %s for contender %s now...',
191                 lease_id,
192                 contender_id,
193             )
194             lease = RenewableReleasableLease(
195                 zk,
196                 lease_id,
197                 duration,
198                 contender_id,
199             )
200             if lease:
201                 logger.debug(
202                     'Successfully obtained %s for contender %s; invoking user function.',
203                     lease_id,
204                     contender_id,
205                 )
206                 if also_pass_zk_client:
207                     args = (*args, zk)
208                 if also_pass_lease:
209                     args = (*args, lease)
210                 ret = func(*args, *kwargs)
211                 lease.release()
212             else:
213                 logger.debug(
214                     'Failed to obtain %s for contender %s, doing nothing more.',
215                     lease_id,
216                     contender_id,
217                 )
218                 ret = None
219             logger.debug('Shutting down zookeeper client.')
220             zk.stop()
221             return ret
222
223         return wrapper2
224
225     if f is None:
226         return wrapper
227     else:
228         return wrapper(f)
229
230
231 def run_for_election(
232     f: Optional[Callable] = None,
233     *,
234     election_id: str = PROGRAM_NAME,
235     contender_id: str = platform.node(),
236     also_pass_zk_client: bool = False,
237 ):
238     """Run as a contender for a leader election.  If/when we become
239     the leader, invoke the user's function.
240
241     The user's function will be executed on a new thread and must
242     accept a "stop processing" event that it must check regularly.
243     This event will be set automatically by the wrapper in the event
244     that we lose connection to zookeeper (and hence are no longer
245     confident that we are still the leader).
246
247     The user's function may return at any time which will cause
248     the wrapper to also return and effectively cede leadership.
249
250     Because the user's code is run in a separate thread, it may
251     not return anything / whatever it returns will be dropped.
252
253     Args:
254         election_id: global string identifier for the election
255         contender_id: string identifying who is running for leader
256         also_pass_zk_client: pass the zk client into the user code
257
258     >>> @run_for_election(
259     ...         election_id='zookeeper_doctest',
260     ...         also_pass_zk_client=True
261     ... )
262     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
263     ...     import time
264     ...     count = 0
265     ...     while True:
266     ...         print(f"Hello, {name}, I'm the leader.")
267     ...         if stop_now.is_set():
268     ...             print("Oops, not anymore?!")
269     ...             return
270     ...         time.sleep(0.1)
271     ...         count += 1
272     ...         if count >= 3:
273     ...             print("I'm sick of being leader.")
274     ...             return
275
276     >>> g("Scott")
277     Hello, Scott, I'm the leader.
278     Hello, Scott, I'm the leader.
279     Hello, Scott, I'm the leader.
280     I'm sick of being leader.
281
282     """
283     if not election_id.startswith('/elections/'):
284         election_id = f'/elections/{election_id}'
285         election_id = file_utils.fix_multiple_slashes(election_id)
286
287     class wrapper:
288         """Helper wrapper class."""
289
290         def __init__(self, func: Callable) -> None:
291             functools.update_wrapper(self, func)
292             self.func = func
293             self.zk = KazooClient(
294                 hosts=scott_secrets.ZOOKEEPER_NODES,
295                 use_ssl=True,
296                 verify_certs=False,
297                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
298                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
299                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
300             )
301             self.zk.start()
302             logger.debug('We have an active zookeeper connection.')
303             self.stop_event = threading.Event()
304             self.stop_event.clear()
305
306         def zk_listener(self, state: KazooState) -> None:
307             logger.debug('Listener received state %s.', state)
308             if state != KazooState.CONNECTED:
309                 logger.debug(
310                     'Bad connection to zookeeper (state=%s); bailing out.',
311                     state,
312                 )
313                 self.stop_event.set()
314
315         def runit(self, *args, **kwargs) -> None:
316             # Possibly augment args if requested; always pass stop_event
317             if also_pass_zk_client:
318                 args = (*args, self.zk)
319             args = (*args, self.stop_event)
320
321             logger.debug('Invoking user code on separate thread.')
322             thread = threading.Thread(
323                 target=self.func,
324                 args=args,
325                 kwargs=kwargs,
326             )
327             thread.start()
328
329             # Watch the state (fail safe for listener) and the thread.
330             while True:
331                 state = self.zk.client_state
332                 if state != KazooState.CONNECTED:
333                     logger.error(
334                         'Bad connection to zookeeper (state=%s); bailing out.',
335                         state,
336                     )
337                     self.stop_event.set()
338                     logger.debug('Waiting for user thread to tear down...')
339                     thread.join()
340                     return
341
342                 thread.join(timeout=5.0)
343                 if not thread.is_alive():
344                     logger.info('Child thread exited, I\'m exiting too.')
345                     return
346
347         def __call__(self, *args, **kwargs):
348             election = self.zk.Election(election_id, contender_id)
349             self.zk.add_listener(self.zk_listener)
350             election.run(self.runit, *args, **kwargs)
351             self.zk.stop()
352
353     if f is None:
354         return wrapper
355     else:
356         return wrapper(f)
357
358
359 if __name__ == '__main__':
360     import doctest
361
362     doctest.testmod()