480ce1a041497abfaedd6f6a159060b57ec484da
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Any, Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.protocol.states import KazooState
20
21 import file_utils
22 import scott_secrets
23
24 logger = logging.getLogger(__name__)
25
26
27 # On module load, grab what we presume to be our process' program name.
28 # This is used, by default, to construct internal zookeeper paths (e.g.
29 # to identify a lease or election).
30 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
31
32
33 def obtain_lease(
34     f: Optional[Callable] = None,
35     *,
36     lease_id: str = PROGRAM_NAME,
37     contender_id: str = platform.node(),
38     duration: datetime.timedelta = datetime.timedelta(minutes=5),
39     also_pass_lease: bool = False,
40     also_pass_zk_client: bool = False,
41 ):
42     """Obtain an exclusive lease identified by the lease_id name
43     before invoking a function or skip invoking the function if the
44     lease cannot be obtained.
45
46     There is no method of releasing a lease manually provided on
47     the Kazoo public lease API.  Therefore, letting the lease expire
48     is the only mechanism by which it becomes re-acquirable at this
49     time.  Thus, due consideration is in order when choosing the
50     initial duration.
51
52     According to Kazoo docs, "The client may renew the lease without
53     losing it by obtaining a new lease with the same path (lease_id)
54     and same identity (contender_id)."
55
56     Args:
57         lease_id: string identifying the lease to obtain
58         contender_id: string identifying who's attempting to obtain
59         duration: how long should the lease be held, if obtained?
60         also_pass_lease: pass the lease into the user function
61         also_pass_zk_client: pass our zk client into the user function
62
63     >>> @obtain_lease(
64     ...         lease_id='zookeeper_doctest',
65     ...         duration=datetime.timedelta(seconds=5),
66     ... )
67     ... def f(name: str) -> int:
68     ...     print(f'Hello, {name}')
69     ...     return 123
70
71     >>> f('Scott')
72     Hello, Scott
73     123
74
75     """
76     if not lease_id.startswith('/leases/'):
77         lease_id = f'/leases/{lease_id}'
78         lease_id = file_utils.fix_multiple_slashes(lease_id)
79
80     def wrapper(func: Callable) -> Callable:
81         @functools.wraps(func)
82         def wrapper2(*args, **kwargs) -> Optional[Any]:
83             zk = KazooClient(
84                 hosts=scott_secrets.ZOOKEEPER_NODES,
85                 use_ssl=True,
86                 verify_certs=False,
87                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
88                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
89                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
90             )
91             zk.start()
92             logger.debug('We have an active zookeeper connection.')
93
94             logger.debug(
95                 'Trying to obtain %s for contender %s now...',
96                 lease_id,
97                 contender_id,
98             )
99             lease = zk.NonBlockingLease(
100                 lease_id,
101                 duration,
102                 contender_id,
103             )
104             if lease:
105                 logger.debug(
106                     'Successfully obtained %s for contender %s; invoking user function.',
107                     lease_id,
108                     contender_id,
109                 )
110                 if also_pass_zk_client:
111                     args = (*args, zk)
112                 if also_pass_lease:
113                     args = (*args, lease)
114                 ret = func(*args, *kwargs)
115                 # Release the lease?
116             else:
117                 logger.debug(
118                     'Failed to obtain %s for contender %s, doing nothing more.',
119                     lease_id,
120                     contender_id,
121                 )
122                 ret = None
123             logger.debug('Shutting down zookeeper client.')
124             zk.stop()
125             return ret
126
127         return wrapper2
128
129     if f is None:
130         return wrapper
131     else:
132         return wrapper(f)
133
134
135 def run_for_election(
136     f: Optional[Callable] = None,
137     *,
138     election_id: str = PROGRAM_NAME,
139     contender_id: str = platform.node(),
140     also_pass_zk_client: bool = False,
141 ):
142     """Run as a contender for a leader election.  If/when we become
143     the leader, invoke the user's function.
144
145     The user's function will be executed on a new thread and must
146     accept a "stop processing" event that it must check regularly.
147     This event will be set automatically by the wrapper in the event
148     that we lose connection to zookeeper (and hence are no longer
149     confident that we are still the leader).
150
151     The user's function may return at any time which will cause
152     the wrapper to also return and effectively cede leadership.
153
154     Because the user's code is run in a separate thread, it may
155     not return anything / whatever it returns will be dropped.
156
157     Args:
158         election_id: global string identifier for the election
159         contender_id: string identifying who is running for leader
160         also_pass_zk_client: pass the zk client into the user code
161
162     >>> @run_for_election(
163     ...         election_id='zookeeper_doctest',
164     ...         also_pass_zk_client=True
165     ... )
166     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
167     ...     import time
168     ...     count = 0
169     ...     while True:
170     ...         print(f"Hello, {name}, I'm the leader.")
171     ...         if stop_now.is_set():
172     ...             print("Oops, not anymore?!")
173     ...             return
174     ...         time.sleep(0.1)
175     ...         count += 1
176     ...         if count >= 3:
177     ...             print("I'm sick of being leader.")
178     ...             return
179
180     >>> g("Scott")
181     Hello, Scott, I'm the leader.
182     Hello, Scott, I'm the leader.
183     Hello, Scott, I'm the leader.
184     I'm sick of being leader.
185
186     """
187     if not election_id.startswith('/elections/'):
188         election_id = f'/elections/{election_id}'
189         election_id = file_utils.fix_multiple_slashes(election_id)
190
191     class wrapper:
192         """Helper wrapper class."""
193
194         def __init__(self, func: Callable) -> None:
195             functools.update_wrapper(self, func)
196             self.func = func
197             self.zk = KazooClient(
198                 hosts=scott_secrets.ZOOKEEPER_NODES,
199                 use_ssl=True,
200                 verify_certs=False,
201                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
202                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
203                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
204             )
205             self.zk.start()
206             logger.debug('We have an active zookeeper connection.')
207             self.stop_event = threading.Event()
208             self.stop_event.clear()
209
210         def zk_listener(self, state: KazooState) -> None:
211             logger.debug('Listener received state %s.', state)
212             if state != KazooState.CONNECTED:
213                 logger.debug(
214                     'Bad connection to zookeeper (state=%s); bailing out.',
215                     state,
216                 )
217                 self.stop_event.set()
218
219         def runit(self, *args, **kwargs) -> None:
220             # Possibly augment args if requested; always pass stop_event
221             if also_pass_zk_client:
222                 args = (*args, self.zk)
223             args = (*args, self.stop_event)
224
225             logger.debug('Invoking user code on separate thread.')
226             thread = threading.Thread(
227                 target=self.func,
228                 args=args,
229                 kwargs=kwargs,
230             )
231             thread.start()
232
233             # Watch the state (fail safe for listener) and the thread.
234             while True:
235                 state = self.zk.client_state
236                 if state != KazooState.CONNECTED:
237                     logger.error(
238                         'Bad connection to zookeeper (state=%s); bailing out.',
239                         state,
240                     )
241                     self.stop_event.set()
242                     logger.debug('Waiting for user thread to tear down...')
243                     thread.join()
244                     return
245
246                 thread.join(timeout=5.0)
247                 if not thread.is_alive():
248                     logger.info('Child thread exited, I\'m exiting too.')
249                     return
250
251         def __call__(self, *args, **kwargs):
252             election = self.zk.Election(election_id, contender_id)
253             self.zk.add_listener(self.zk_listener)
254             election.run(self.runit, *args, **kwargs)
255             self.zk.stop()
256
257     if f is None:
258         return wrapper
259     else:
260         return wrapper(f)
261
262
263 if __name__ == '__main__':
264     import doctest
265
266     doctest.testmod()