Break a dependency from zookeeper to config.
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import os
13 import platform
14 import sys
15 import threading
16 import time
17 from typing import Callable, Optional
18
19 from kazoo.client import KazooClient
20 from kazoo.protocol.states import KazooState
21
22 import file_utils
23 import scott_secrets
24
25 logger = logging.getLogger(__name__)
26
27
28 # On module load, grab what we presume to be our process' program name.
29 # This is used, by default, as part of internal zookeeper paths (e.g.
30 # to name a lease or election).
31 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
32
33
34 def obtain_lease(
35     f: Optional[Callable] = None,
36     *,
37     lease_id: str = PROGRAM_NAME,
38     contender_id: str = platform.node(),
39     initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
40     also_pass_lease: bool = False,
41     also_pass_zk_client: bool = False,
42 ):
43     """Obtain the named lease before invoking a function and skip
44     invoking the function if the lease cannot be obtained.
45
46     >>> @obtain_lease(
47     ...         lease_id='zookeeper_doctest',
48     ...         initial_duration=datetime.timedelta(seconds=10),
49     ... )
50     ... def f(name: str) -> int:
51     ...     print(f'Hello, {name}')
52     ...     return 123
53
54     >>> f('Scott')
55     Hello, Scott
56     123
57
58     """
59     if not lease_id.startswith('/leases/'):
60         lease_id = f'/leases/{lease_id}'
61         lease_id = file_utils.fix_multiple_slashes(lease_id)
62     zk = KazooClient(
63         hosts=scott_secrets.ZOOKEEPER_NODES,
64         use_ssl=True,
65         verify_certs=False,
66         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
67         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
68         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
69     )
70     zk.start()
71     logger.debug('We have an active zookeeper connection.')
72
73     def wrapper(func: Callable) -> Callable:
74         @functools.wraps(func)
75         def wrapper2(*args, **kwargs):
76             logger.debug(
77                 'Trying to obtain %s for contender %s now...',
78                 lease_id,
79                 contender_id,
80             )
81             lease = zk.NonBlockingLease(
82                 lease_id,
83                 initial_duration,
84                 contender_id,
85             )
86             if lease:
87                 logger.debug(
88                     'Successfully obtained %s for contender %s; invoking user function.',
89                     lease_id,
90                     contender_id,
91                 )
92                 if also_pass_zk_client:
93                     args = (*args, zk)
94                 if also_pass_lease:
95                     args = (*args, lease)
96                 ret = func(*args, *kwargs)
97             else:
98                 logger.debug(
99                     'Failed to obtain %s for contender %s, doing nothing more.',
100                     lease_id,
101                     contender_id,
102                 )
103                 ret = None
104             logger.debug('Shutting down zookeeper client.')
105             zk.stop()
106             return ret
107
108         return wrapper2
109
110     if f is None:
111         return wrapper
112     else:
113         return wrapper(f)
114
115
116 def run_for_election(
117     f: Optional[Callable] = None,
118     *,
119     election_id: str = PROGRAM_NAME,
120     contender_id: str = platform.node(),
121     also_pass_zk_client: bool = False,
122 ):
123     """Run as a contender for a leader election.  If/when we become
124     the leader, invoke the user's function.
125
126     The user's function will be executed on a new thread and must
127     accept a "stop processing" event that it must check regularly.
128     This event will be set automatically by the wrapper in the event
129     that we lose connection to zookeeper (and hence are no longer
130     confident that we are still the leader).
131
132     The user's function may return at any time which will cause
133     the wrapper to also return and effectively cede leadership.
134
135     Because the user's code is run in a separate thread, it may
136     not return anything.
137
138     >>> @run_for_election(
139     ...         election_id='zookeeper_doctest',
140     ...         also_pass_zk_client=True
141     ... )
142     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
143     ...     import time
144     ...     count = 0
145     ...     while True:
146     ...         print(f"Hello, {name}, I'm the leader.")
147     ...         if stop_now.is_set():
148     ...             print("Oops, not anymore?!")
149     ...             return
150     ...         time.sleep(0.1)
151     ...         count += 1
152     ...         if count >= 3:
153     ...             print("I'm sick of being leader.")
154     ...             return
155
156     >>> g("Scott")
157     Hello, Scott, I'm the leader.
158     Hello, Scott, I'm the leader.
159     Hello, Scott, I'm the leader.
160     I'm sick of being leader.
161
162     """
163     if not election_id.startswith('/elections/'):
164         election_id = f'/elections/{election_id}'
165         election_id = file_utils.fix_multiple_slashes(election_id)
166     zk = KazooClient(
167         hosts=scott_secrets.ZOOKEEPER_NODES,
168         use_ssl=True,
169         verify_certs=False,
170         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
171         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
172         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
173     )
174     zk.start()
175     logger.debug('We have an active zookeeper connection.')
176
177     def wrapper(func: Callable) -> Callable:
178         @functools.wraps(func)
179         def runit(func, *args, **kwargs):
180             stop_event = threading.Event()
181             stop_event.clear()
182             if also_pass_zk_client:
183                 args = (*args, zk)
184             args = (*args, stop_event)
185             logger.debug('Invoking user code on separate thread.')
186             thread = threading.Thread(
187                 target=func,
188                 args=args,
189                 kwargs=kwargs,
190             )
191             thread.start()
192
193             while True:
194                 state = zk.client_state
195                 if state != KazooState.CONNECTED:
196                     logger.error(
197                         'Bad connection to zookeeper (state=%s); bailing out.',
198                         state,
199                     )
200                     stop_event.set()
201                     thread.join()
202
203                 if not thread.is_alive():
204                     logger.info('Child thread exited, I\'m exiting too.')
205                     return
206                 time.sleep(5.0)
207
208         @functools.wraps(runit)
209         def wrapper2(*args, **kwargs):
210             election = zk.Election(election_id, contender_id)
211             election.run(runit, func, *args, **kwargs)
212             zk.stop()
213
214         return wrapper2
215
216     if f is None:
217         return wrapper
218     else:
219         return wrapper(f)
220
221
222 if __name__ == '__main__':
223     import doctest
224
225     doctest.testmod()