2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Any, Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.protocol.states import KazooState
24 logger = logging.getLogger(__name__)
27 # On module load, grab what we presume to be our process' program name.
28 # This is used, by default, to construct internal zookeeper paths (e.g.
29 # to identify a lease or election).
30 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
34 f: Optional[Callable] = None,
36 lease_id: str = PROGRAM_NAME,
37 contender_id: str = platform.node(),
38 duration: datetime.timedelta = datetime.timedelta(minutes=5),
39 also_pass_lease: bool = False,
40 also_pass_zk_client: bool = False,
42 """Obtain an exclusive lease identified by the lease_id name
43 before invoking a function or skip invoking the function if the
44 lease cannot be obtained.
46 There is no method of releasing a lease manually provided on
47 the Kazoo public lease API. Therefore, letting the lease expire
48 is the only mechanism by which it becomes re-acquirable at this
49 time. Thus, due consideration is in order when choosing the
52 According to Kazoo docs, "The client may renew the lease without
53 losing it by obtaining a new lease with the same path (lease_id)
54 and same identity (contender_id)."
57 lease_id: string identifying the lease to obtain
58 contender_id: string identifying who's attempting to obtain
59 duration: how long should the lease be held, if obtained?
60 also_pass_lease: pass the lease into the user function
61 also_pass_zk_client: pass our zk client into the user function
64 ... lease_id='zookeeper_doctest',
65 ... duration=datetime.timedelta(seconds=5),
67 ... def f(name: str) -> int:
68 ... print(f'Hello, {name}')
76 if not lease_id.startswith('/leases/'):
77 lease_id = f'/leases/{lease_id}'
78 lease_id = file_utils.fix_multiple_slashes(lease_id)
80 def wrapper(func: Callable) -> Callable:
81 @functools.wraps(func)
82 def wrapper2(*args, **kwargs) -> Optional[Any]:
84 hosts=scott_secrets.ZOOKEEPER_NODES,
87 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
88 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
89 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
92 logger.debug('We have an active zookeeper connection.')
95 'Trying to obtain %s for contender %s now...',
99 lease = zk.NonBlockingLease(
106 'Successfully obtained %s for contender %s; invoking user function.',
110 if also_pass_zk_client:
113 args = (*args, lease)
114 ret = func(*args, *kwargs)
118 'Failed to obtain %s for contender %s, doing nothing more.',
123 logger.debug('Shutting down zookeeper client.')
135 def run_for_election(
136 f: Optional[Callable] = None,
138 election_id: str = PROGRAM_NAME,
139 contender_id: str = platform.node(),
140 also_pass_zk_client: bool = False,
142 """Run as a contender for a leader election. If/when we become
143 the leader, invoke the user's function.
145 The user's function will be executed on a new thread and must
146 accept a "stop processing" event that it must check regularly.
147 This event will be set automatically by the wrapper in the event
148 that we lose connection to zookeeper (and hence are no longer
149 confident that we are still the leader).
151 The user's function may return at any time which will cause
152 the wrapper to also return and effectively cede leadership.
154 Because the user's code is run in a separate thread, it may
155 not return anything / whatever it returns will be dropped.
158 election_id: global string identifier for the election
159 contender_id: string identifying who is running for leader
160 also_pass_zk_client: pass the zk client into the user code
162 >>> @run_for_election(
163 ... election_id='zookeeper_doctest',
164 ... also_pass_zk_client=True
166 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
170 ... print(f"Hello, {name}, I'm the leader.")
171 ... if stop_now.is_set():
172 ... print("Oops, not anymore?!")
177 ... print("I'm sick of being leader.")
181 Hello, Scott, I'm the leader.
182 Hello, Scott, I'm the leader.
183 Hello, Scott, I'm the leader.
184 I'm sick of being leader.
187 if not election_id.startswith('/elections/'):
188 election_id = f'/elections/{election_id}'
189 election_id = file_utils.fix_multiple_slashes(election_id)
192 """Helper wrapper class."""
194 def __init__(self, func: Callable) -> None:
195 functools.update_wrapper(self, func)
197 self.zk = KazooClient(
198 hosts=scott_secrets.ZOOKEEPER_NODES,
201 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
202 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
203 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
206 logger.debug('We have an active zookeeper connection.')
207 self.stop_event = threading.Event()
208 self.stop_event.clear()
210 def zk_listener(self, state: KazooState) -> None:
211 logger.debug('Listener received state %s.', state)
212 if state != KazooState.CONNECTED:
214 'Bad connection to zookeeper (state=%s); bailing out.',
217 self.stop_event.set()
219 def runit(self, *args, **kwargs) -> None:
220 # Possibly augment args if requested; always pass stop_event
221 if also_pass_zk_client:
222 args = (*args, self.zk)
223 args = (*args, self.stop_event)
225 logger.debug('Invoking user code on separate thread.')
226 thread = threading.Thread(
233 # Watch the state (fail safe for listener) and the thread.
235 state = self.zk.client_state
236 if state != KazooState.CONNECTED:
238 'Bad connection to zookeeper (state=%s); bailing out.',
241 self.stop_event.set()
242 logger.debug('Waiting for user thread to tear down...')
246 thread.join(timeout=5.0)
247 if not thread.is_alive():
248 logger.info('Child thread exited, I\'m exiting too.')
251 def __call__(self, *args, **kwargs):
252 election = self.zk.Election(election_id, contender_id)
253 self.zk.add_listener(self.zk_listener)
254 election.run(self.runit, *args, **kwargs)
263 if __name__ == '__main__':