Reduce the doctest lease duration...
[python_utils.git] / zookeeper.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2022, Scott Gasch
5
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
7
8
9 import datetime
10 import functools
11 import logging
12 import platform
13 import threading
14 import time
15 from typing import Callable, Optional
16
17 from kazoo.client import KazooClient
18 from kazoo.protocol.states import KazooState
19
20 import config
21 import file_utils
22 import scott_secrets
23
24 logger = logging.getLogger(__name__)
25
26
27 def obtain_lease(
28     f: Optional[Callable] = None,
29     *,
30     lease_id: str = config.PROGRAM_NAME,
31     contender_id: str = platform.node(),
32     initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
33     also_pass_lease: bool = False,
34     also_pass_zk_client: bool = False,
35 ):
36     """Obtain the named lease before invoking a function and skip
37     invoking the function if the lease cannot be obtained.
38
39     >>> @obtain_lease(
40     ...         lease_id='zookeeper_doctest',
41     ...         initial_duration=datetime.timedelta(seconds=10),
42     ... )
43     ... def f(name: str) -> int:
44     ...     print(f'Hello, {name}')
45     ...     return 123
46
47     >>> f('Scott')
48     Hello, Scott
49     123
50
51     """
52     if not lease_id.startswith('/leases/'):
53         lease_id = f'/leases/{lease_id}'
54         lease_id = file_utils.fix_multiple_slashes(lease_id)
55     zk = KazooClient(
56         hosts=scott_secrets.ZOOKEEPER_NODES,
57         use_ssl=True,
58         verify_certs=False,
59         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
60         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
61         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
62     )
63     zk.start()
64     logger.debug('We have an active zookeeper connection.')
65
66     def wrapper(func: Callable) -> Callable:
67         @functools.wraps(func)
68         def wrapper2(*args, **kwargs):
69             logger.debug(
70                 'Trying to obtain %s for contender %s now...',
71                 lease_id,
72                 contender_id,
73             )
74             lease = zk.NonBlockingLease(
75                 lease_id,
76                 initial_duration,
77                 contender_id,
78             )
79             if lease:
80                 logger.debug(
81                     'Successfully obtained %s for contender %s; invoking user function.',
82                     lease_id,
83                     contender_id,
84                 )
85                 if also_pass_zk_client:
86                     args = (*args, zk)
87                 if also_pass_lease:
88                     args = (*args, lease)
89                 ret = func(*args, *kwargs)
90             else:
91                 logger.debug(
92                     'Failed to obtain %s for contender %s, doing nothing more.',
93                     lease_id,
94                     contender_id,
95                 )
96                 ret = None
97             logger.debug('Shutting down zookeeper client.')
98             zk.stop()
99             return ret
100
101         return wrapper2
102
103     if f is None:
104         return wrapper
105     else:
106         return wrapper(f)
107
108
109 def run_for_election(
110     f: Optional[Callable] = None,
111     *,
112     election_id: str = config.PROGRAM_NAME,
113     contender_id: str = platform.node(),
114     also_pass_zk_client: bool = False,
115 ):
116     """Run as a contender for a leader election.  If/when we become
117     the leader, invoke the user's function.
118
119     The user's function will be executed on a new thread and must
120     accept a "stop processing" event that it must check regularly.
121     This event will be set automatically by the wrapper in the event
122     that we lose connection to zookeeper (and hence are no longer
123     confident that we are still the leader).
124
125     The user's function may return at any time which will cause
126     the wrapper to also return and effectively cede leadership.
127
128     Because the user's code is run in a separate thread, it may
129     not return anything.
130
131     >>> @run_for_election(
132     ...         election_id='zookeeper_doctest',
133     ...         also_pass_zk_client=True
134     ... )
135     ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
136     ...     import time
137     ...     count = 0
138     ...     while True:
139     ...         print(f"Hello, {name}, I'm the leader.")
140     ...         if stop_now.is_set():
141     ...             print("Oops, not anymore?!")
142     ...             return
143     ...         time.sleep(0.1)
144     ...         count += 1
145     ...         if count >= 3:
146     ...             print("I'm sick of being leader.")
147     ...             return
148
149     >>> g("Scott")
150     Hello, Scott, I'm the leader.
151     Hello, Scott, I'm the leader.
152     Hello, Scott, I'm the leader.
153     I'm sick of being leader.
154
155     """
156     if not election_id.startswith('/elections/'):
157         election_id = f'/elections/{election_id}'
158         election_id = file_utils.fix_multiple_slashes(election_id)
159     zk = KazooClient(
160         hosts=scott_secrets.ZOOKEEPER_NODES,
161         use_ssl=True,
162         verify_certs=False,
163         keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
164         keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
165         certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
166     )
167     zk.start()
168     logger.debug('We have an active zookeeper connection.')
169
170     def wrapper(func: Callable) -> Callable:
171         @functools.wraps(func)
172         def runit(func, *args, **kwargs):
173             stop_event = threading.Event()
174             stop_event.clear()
175             if also_pass_zk_client:
176                 args = (*args, zk)
177             args = (*args, stop_event)
178             logger.debug('Invoking user code on separate thread.')
179             thread = threading.Thread(
180                 target=func,
181                 args=args,
182                 kwargs=kwargs,
183             )
184             thread.start()
185
186             while True:
187                 state = zk.client_state
188                 if state != KazooState.CONNECTED:
189                     logger.error(
190                         'Bad connection to zookeeper (state=%s); bailing out.',
191                         state,
192                     )
193                     stop_event.set()
194                     thread.join()
195
196                 if not thread.is_alive():
197                     logger.info('Child thread exited, I\'m exiting too.')
198                     return
199                 time.sleep(5.0)
200
201         @functools.wraps(runit)
202         def wrapper2(*args, **kwargs):
203             election = zk.Election(election_id, contender_id)
204             election.run(runit, func, *args, **kwargs)
205             zk.stop()
206
207         return wrapper2
208
209     if f is None:
210         return wrapper
211     else:
212         return wrapper(f)
213
214
215 if __name__ == '__main__':
216     import doctest
217
218     doctest.testmod()