9decfef82a0961fbda0c5f1f0689b639bebe276d
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 from typing import Callable, Optional
17
18 from kazoo.client import KazooClient
19 from kazoo.protocol.states import KazooState
20
21 import file_utils
22 import scott_secrets
23
24 logger = logging.getLogger(__name__)
25
26
27 # On module load, grab what we presume to be our process' program name.
28 # This is used, by default, to construct internal zookeeper paths (e.g.
29 # to identify a lease or election).
30 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
31
32
33 def obtain_lease(
34     f: Optional[Callable] = None,
35     *,
36     lease_id: str = PROGRAM_NAME,
37     contender_id: str = platform.node(),
38     duration: datetime.timedelta = datetime.timedelta(minutes=5),
39     also_pass_lease: bool = False,
40     also_pass_zk_client: bool = False,
41 ):
42     """Obtain an exclusive lease identified by the lease_id name
43     before invoking a function or skip invoking the function if the
44     lease cannot be obtained.
45
46     Args:
47         lease_id: string identifying the lease to obtain
48         contender_id: string identifying who's attempting to obtain
49         duration: how long should the lease be held, if obtained?
50         also_pass_lease: pass the lease into the user function
51         also_pass_zk_client: pass our zk client into the user function
52
53     >>> @obtain_lease(
54     ...         lease_id='zookeeper_doctest',
55     ...         duration=datetime.timedelta(seconds=5),
56     ... )
57     ... def f(name: str) -> int:
58     ...     print(f'Hello, {name}')
59     ...     return 123
60
61     >>> f('Scott')
62     Hello, Scott
63     123
64
65     """
66     if not lease_id.startswith('/leases/'):
67         lease_id = f'/leases/{lease_id}'
68         lease_id = file_utils.fix_multiple_slashes(lease_id)
69
70     def wrapper(func: Callable) -> Callable:
71         @functools.wraps(func)
72         def wrapper2(*args, **kwargs):
73             zk = KazooClient(
74                 hosts=scott_secrets.ZOOKEEPER_NODES,
75                 use_ssl=True,
76                 verify_certs=False,
77                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
78                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
79                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
80             )
81             zk.start()
82             logger.debug('We have an active zookeeper connection.')
83
84             logger.debug(
85                 'Trying to obtain %s for contender %s now...',
86                 lease_id,
87                 contender_id,
88             )
89             lease = zk.NonBlockingLease(
90                 lease_id,
91                 duration,
92                 contender_id,
93             )
94             if lease:
95                 logger.debug(
96                     'Successfully obtained %s for contender %s; invoking user function.',
97                     lease_id,
98                     contender_id,
99                 )
100                 if also_pass_zk_client:
101                     args = (*args, zk)
102                 if also_pass_lease:
103                     args = (*args, lease)
104                 ret = func(*args, *kwargs)
105             else:
106                 logger.debug(
107                     'Failed to obtain %s for contender %s, doing nothing more.',
108                     lease_id,
109                     contender_id,
110                 )
111                 ret = None
112             logger.debug('Shutting down zookeeper client.')
113             zk.stop()
114             return ret
115
116         return wrapper2
117
118     if f is None:
119         return wrapper
120     else:
121         return wrapper(f)
122
123
124 def run_for_election(
125     f: Optional[Callable] = None,
126     *,
127     election_id: str = PROGRAM_NAME,
128     contender_id: str = platform.node(),
129     also_pass_zk_client: bool = False,
130 ):
131     """Run as a contender for a leader election.  If/when we become
132     the leader, invoke the user's function.
133
134     The user's function will be executed on a new thread and must
135     accept a "stop processing" event that it must check regularly.
136     This event will be set automatically by the wrapper in the event
137     that we lose connection to zookeeper (and hence are no longer
138     confident that we are still the leader).
139
140     The user's function may return at any time which will cause
141     the wrapper to also return and effectively cede leadership.
142
143     Because the user's code is run in a separate thread, it may
144     not return anything / whatever it returns will be dropped.
145
146     Args:
147         election_id: global string identifier for the election
148         contender_id: string identifying who is running for leader
149         also_pass_zk_client: pass the zk client into the user code
150
151     >>> @run_for_election(
152     ...         election_id='zookeeper_doctest',
153     ...         also_pass_zk_client=True
154     ... )
155     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
156     ...     import time
157     ...     count = 0
158     ...     while True:
159     ...         print(f"Hello, {name}, I'm the leader.")
160     ...         if stop_now.is_set():
161     ...             print("Oops, not anymore?!")
162     ...             return
163     ...         time.sleep(0.1)
164     ...         count += 1
165     ...         if count >= 3:
166     ...             print("I'm sick of being leader.")
167     ...             return
168
169     >>> g("Scott")
170     Hello, Scott, I'm the leader.
171     Hello, Scott, I'm the leader.
172     Hello, Scott, I'm the leader.
173     I'm sick of being leader.
174
175     """
176     if not election_id.startswith('/elections/'):
177         election_id = f'/elections/{election_id}'
178         election_id = file_utils.fix_multiple_slashes(election_id)
179
180     class wrapper:
181         """Helper wrapper class."""
182
183         def __init__(self, func: Callable) -> None:
184             functools.update_wrapper(self, func)
185             self.func = func
186             self.zk = KazooClient(
187                 hosts=scott_secrets.ZOOKEEPER_NODES,
188                 use_ssl=True,
189                 verify_certs=False,
190                 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
191                 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
192                 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
193             )
194             self.zk.start()
195             logger.debug('We have an active zookeeper connection.')
196             self.stop_event = threading.Event()
197             self.stop_event.clear()
198
199         def zk_listener(self, state: KazooState) -> None:
200             logger.debug('Listener received state %s.', state)
201             if state != KazooState.CONNECTED:
202                 logger.debug(
203                     'Bad connection to zookeeper (state=%s); bailing out.',
204                     state,
205                 )
206                 self.stop_event.set()
207
208         def runit(self, *args, **kwargs) -> None:
209             # Possibly augment args if requested; always pass stop_event
210             if also_pass_zk_client:
211                 args = (*args, self.zk)
212             args = (*args, self.stop_event)
213
214             logger.debug('Invoking user code on separate thread.')
215             thread = threading.Thread(
216                 target=self.func,
217                 args=args,
218                 kwargs=kwargs,
219             )
220             thread.start()
221
222             # Watch the state (fail safe for listener) and the thread.
223             while True:
224                 state = self.zk.client_state
225                 if state != KazooState.CONNECTED:
226                     logger.error(
227                         'Bad connection to zookeeper (state=%s); bailing out.',
228                         state,
229                     )
230                     self.stop_event.set()
231                     logger.debug('Waiting for user thread to tear down...')
232                     thread.join()
233                     return
234
235                 thread.join(timeout=5.0)
236                 if not thread.is_alive():
237                     logger.info('Child thread exited, I\'m exiting too.')
238                     return
239
240         def __call__(self, *args, **kwargs):
241             election = self.zk.Election(election_id, contender_id)
242             self.zk.add_listener(self.zk_listener)
243             election.run(self.runit, *args, **kwargs)
244             self.zk.stop()
245
246     if f is None:
247         return wrapper
248     else:
249         return wrapper(f)
250
251
252 if __name__ == '__main__':
253     import doctest
254
255     doctest.testmod()