42771c91287d7c079c0bb0ec2f303ddfa9ce4f91
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import platform
13 import threading
14 import time
15 from typing import Callable, Optional
16
17 from kazoo.client import KazooClient
18 from kazoo.protocol.states import KazooState
19
20 import config
21 import file_utils
22 import scott_secrets
23
24 logger = logging.getLogger(__name__)
25
26
27 def obtain_lease(
28     f: Optional[Callable] = None,
29     *,
30     lease_id: str = config.PROGRAM_NAME,
31     contender_id: str = platform.node(),
32     initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
33     also_pass_lease: bool = False,
34     also_pass_zk_client: bool = False,
35 ):
36     """Obtain the named lease before invoking a function and skip
37     invoking the function if the lease cannot be obtained.
38
39     >>> @obtain_lease(lease_id='zookeeper_doctest')
40     ... def f(name: str) -> int:
41     ...     print(f'Hello, {name}')
42     ...     return 123
43
44     >>> f('Scott')
45     Hello, Scott
46     123
47
48     """
49     if not lease_id.startswith('/leases/'):
50         lease_id = f'/leases/{lease_id}'
51         lease_id = file_utils.fix_multiple_slashes(lease_id)
52     zk = KazooClient(
53         hosts=scott_secrets.ZOOKEEPER_NODES,
54         use_ssl=True,
55         verify_certs=False,
56         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
57         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
58         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
59     )
60     zk.start()
61     logger.debug('We have an active zookeeper connection.')
62
63     def wrapper(func: Callable) -> Callable:
64         @functools.wraps(func)
65         def wrapper2(*args, **kwargs):
66             logger.debug(
67                 'Trying to obtain %s for contender %s now...',
68                 lease_id,
69                 contender_id,
70             )
71             lease = zk.NonBlockingLease(
72                 lease_id,
73                 initial_duration,
74                 contender_id,
75             )
76             if lease:
77                 logger.debug(
78                     'Successfully obtained %s for contender %s; invoking user function.',
79                     lease_id,
80                     contender_id,
81                 )
82                 if also_pass_zk_client:
83                     args = (*args, zk)
84                 if also_pass_lease:
85                     args = (*args, lease)
86                 ret = func(*args, *kwargs)
87             else:
88                 logger.debug(
89                     'Failed to obtain %s for contender %s, doing nothing more.',
90                     lease_id,
91                     contender_id,
92                 )
93                 ret = None
94             logger.debug('Shutting down zookeeper client.')
95             zk.stop()
96             return ret
97
98         return wrapper2
99
100     if f is None:
101         return wrapper
102     else:
103         return wrapper(f)
104
105
106 def run_for_election(
107     f: Optional[Callable] = None,
108     *,
109     election_id: str = config.PROGRAM_NAME,
110     contender_id: str = platform.node(),
111     also_pass_zk_client: bool = False,
112 ):
113     """Run as a contender for a leader election.  If/when we become
114     the leader, invoke the user's function.
115
116     The user's function will be executed on a new thread and must
117     accept a "stop processing" event that it must check regularly.
118     This event will be set automatically by the wrapper in the event
119     that we lose connection to zookeeper (and hence are no longer
120     confident that we are still the leader).
121
122     The user's function may return at any time which will cause
123     the wrapper to also return and effectively cede leadership.
124
125     Because the user's code is run in a separate thread, it may
126     not return anything.
127
128     >>> @run_for_election(
129     ...         election_id='zookeeper_doctest',
130     ...         also_pass_zk_client=True
131     ... )
132     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
133     ...     import time
134     ...     count = 0
135     ...     while True:
136     ...         print(f"Hello, {name}, I'm the leader.")
137     ...         if stop_now.is_set():
138     ...             print("Oops, not anymore?!")
139     ...             return
140     ...         time.sleep(0.1)
141     ...         count += 1
142     ...         if count >= 3:
143     ...             print("I'm sick of being leader.")
144     ...             return
145
146     >>> g("Scott")
147     Hello, Scott, I'm the leader.
148     Hello, Scott, I'm the leader.
149     Hello, Scott, I'm the leader.
150     I'm sick of being leader.
151
152     """
153     if not election_id.startswith('/elections/'):
154         election_id = f'/elections/{election_id}'
155         election_id = file_utils.fix_multiple_slashes(election_id)
156     zk = KazooClient(
157         hosts=scott_secrets.ZOOKEEPER_NODES,
158         use_ssl=True,
159         verify_certs=False,
160         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
161         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
162         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
163     )
164     zk.start()
165     logger.debug('We have an active zookeeper connection.')
166
167     def wrapper(func: Callable) -> Callable:
168         @functools.wraps(func)
169         def runit(func, *args, **kwargs):
170             stop_event = threading.Event()
171             stop_event.clear()
172             if also_pass_zk_client:
173                 args = (*args, zk)
174             args = (*args, stop_event)
175             logger.debug('Invoking user code on separate thread.')
176             thread = threading.Thread(
177                 target=func,
178                 args=args,
179                 kwargs=kwargs,
180             )
181             thread.start()
182
183             while True:
184                 state = zk.client_state
185                 if state != KazooState.CONNECTED:
186                     logger.error(
187                         'Bad connection to zookeeper (state=%s); bailing out.',
188                         state,
189                     )
190                     stop_event.set()
191                     thread.join()
192
193                 if not thread.is_alive():
194                     logger.info('Child thread exited, I\'m exiting too.')
195                     return
196                 time.sleep(5.0)
197
198         @functools.wraps(runit)
199         def wrapper2(*args, **kwargs):
200             election = zk.Election(election_id, contender_id)
201             election.run(runit, func, *args, **kwargs)
202             zk.stop()
203
204         return wrapper2
205
206     if f is None:
207         return wrapper
208     else:
209         return wrapper(f)
210
211
212 if __name__ == '__main__':
213     import doctest
214
215     doctest.testmod()