2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Any, Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.exceptions import CancelledError
20 from kazoo.protocol.states import KazooState
21 from kazoo.recipe.lease import NonBlockingLease
26 logger = logging.getLogger(__name__)
29 # On module load, grab what we presume to be our process' program name.
30 # This is used, by default, to construct internal zookeeper paths (e.g.
31 # to identify a lease or election).
32 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
35 class RenewableReleasableLease(NonBlockingLease):
36 """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
37 that adds some behaviors:
39 + Ability to renew the lease if it's already held without
40 going through the effort of reobtaining the same lease.
42 + Ability to release the lease if it's held and not yet
45 It also is more picky than the base class in terms of when it
46 evaluates to "True" (indicating that the lease is held); it will
47 begin to evaluate to "False" as soon as the lease has expired
48 even if you use to hold it. This means client code should be
49 aware that the lease can disappear and it also means that the
50 performance of evaulating the lease requires a round trip to
53 Note that it is not valid to release the lease more than once
54 (since you no longer have it the second time). The code ignores
55 this. It's also not possible to reobtain an expired or released
56 lease by calling renew. Go create a new lease object at that
65 duration: datetime.timedelta,
66 identifier: str = None,
67 utcnow=datetime.datetime.utcnow,
69 super().__init__(client, path, duration, identifier, utcnow)
72 self.identifier = identifier
76 """Release the lease, if it's presently being held."""
77 self.client.ensure_path(self.path)
78 holder_path = self.path + "/lease_holder"
79 lock = self.client.Lock(self.path, self.identifier)
82 if not self._is_lease_held_pre_locked():
83 logger.debug("Can't release lease; I don't have it!")
87 if self.client.exists(holder_path):
88 self.client.delete(holder_path)
89 end_lease = now.strftime(self._date_format)
91 # Release by moving end to now.
92 data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
93 self.client.create(holder_path, self._encode(data))
96 except CancelledError:
99 def try_renew(self, duration: datetime.timedelta) -> bool:
100 if not self.obtained:
102 self.obtained = False
103 self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
106 def _is_lease_held_pre_locked(self) -> bool:
107 self.client.ensure_path(self.path)
108 holder_path = self.path + "/lease_holder"
110 if self.client.exists(holder_path):
111 raw, _ = self.client.get(holder_path)
112 data = self._decode(raw)
113 if data["version"] != self._version:
115 current_end = datetime.datetime.strptime(data['end'], self._date_format)
116 if data['holder'] != self.identifier or now > current_end:
123 if not self.obtained:
125 lock = self.client.Lock(self.path, self.identifier)
128 ret = self._is_lease_held_pre_locked()
129 except CancelledError:
135 f: Optional[Callable] = None,
137 lease_id: str = PROGRAM_NAME,
138 contender_id: str = platform.node(),
139 duration: datetime.timedelta = datetime.timedelta(minutes=5),
140 also_pass_lease: bool = False,
141 also_pass_zk_client: bool = False,
143 """Obtain an exclusive lease identified by the lease_id name
144 before invoking a function or skip invoking the function if the
145 lease cannot be obtained.
147 Note that we use a hacky "RenewableReleasableLease" and not the
148 kazoo NonBlockingLease because the former allows us to release the
149 lease when the user code returns whereas the latter does not.
152 lease_id: string identifying the lease to obtain
153 contender_id: string identifying who's attempting to obtain
154 duration: how long should the lease be held, if obtained?
155 also_pass_lease: pass the lease into the user function
156 also_pass_zk_client: pass our zk client into the user function
159 ... lease_id='zookeeper_doctest',
160 ... duration=datetime.timedelta(seconds=5),
162 ... def f(name: str) -> int:
163 ... print(f'Hello, {name}')
171 if not lease_id.startswith('/leases/'):
172 lease_id = f'/leases/{lease_id}'
173 lease_id = file_utils.fix_multiple_slashes(lease_id)
175 def wrapper(func: Callable) -> Callable:
176 @functools.wraps(func)
177 def wrapper2(*args, **kwargs) -> Optional[Any]:
179 hosts=scott_secrets.ZOOKEEPER_NODES,
182 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
183 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
184 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
187 logger.debug('We have an active zookeeper connection.')
190 'Trying to obtain %s for contender %s now...',
194 lease = RenewableReleasableLease(
202 'Successfully obtained %s for contender %s; invoking user function.',
206 if also_pass_zk_client:
209 args = (*args, lease)
210 ret = func(*args, *kwargs)
214 'Failed to obtain %s for contender %s, doing nothing more.',
219 logger.debug('Shutting down zookeeper client.')
231 def run_for_election(
232 f: Optional[Callable] = None,
234 election_id: str = PROGRAM_NAME,
235 contender_id: str = platform.node(),
236 also_pass_zk_client: bool = False,
238 """Run as a contender for a leader election. If/when we become
239 the leader, invoke the user's function.
241 The user's function will be executed on a new thread and must
242 accept a "stop processing" event that it must check regularly.
243 This event will be set automatically by the wrapper in the event
244 that we lose connection to zookeeper (and hence are no longer
245 confident that we are still the leader).
247 The user's function may return at any time which will cause
248 the wrapper to also return and effectively cede leadership.
250 Because the user's code is run in a separate thread, it may
251 not return anything / whatever it returns will be dropped.
254 election_id: global string identifier for the election
255 contender_id: string identifying who is running for leader
256 also_pass_zk_client: pass the zk client into the user code
258 >>> @run_for_election(
259 ... election_id='zookeeper_doctest',
260 ... also_pass_zk_client=True
262 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
266 ... print(f"Hello, {name}, I'm the leader.")
267 ... if stop_now.is_set():
268 ... print("Oops, not anymore?!")
273 ... print("I'm sick of being leader.")
277 Hello, Scott, I'm the leader.
278 Hello, Scott, I'm the leader.
279 Hello, Scott, I'm the leader.
280 I'm sick of being leader.
283 if not election_id.startswith('/elections/'):
284 election_id = f'/elections/{election_id}'
285 election_id = file_utils.fix_multiple_slashes(election_id)
288 """Helper wrapper class."""
290 def __init__(self, func: Callable) -> None:
291 functools.update_wrapper(self, func)
293 self.zk = KazooClient(
294 hosts=scott_secrets.ZOOKEEPER_NODES,
297 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
298 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
299 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
302 logger.debug('We have an active zookeeper connection.')
303 self.stop_event = threading.Event()
304 self.stop_event.clear()
306 def zk_listener(self, state: KazooState) -> None:
307 logger.debug('Listener received state %s.', state)
308 if state != KazooState.CONNECTED:
310 'Bad connection to zookeeper (state=%s); bailing out.',
313 self.stop_event.set()
315 def runit(self, *args, **kwargs) -> None:
316 # Possibly augment args if requested; always pass stop_event
317 if also_pass_zk_client:
318 args = (*args, self.zk)
319 args = (*args, self.stop_event)
321 logger.debug('Invoking user code on separate thread.')
322 thread = threading.Thread(
329 # Watch the state (fail safe for listener) and the thread.
331 state = self.zk.client_state
332 if state != KazooState.CONNECTED:
334 'Bad connection to zookeeper (state=%s); bailing out.',
337 self.stop_event.set()
338 logger.debug('Waiting for user thread to tear down...')
342 thread.join(timeout=5.0)
343 if not thread.is_alive():
344 logger.info('Child thread exited, I\'m exiting too.')
347 def __call__(self, *args, **kwargs):
348 election = self.zk.Election(election_id, contender_id)
349 self.zk.add_listener(self.zk_listener)
350 election.run(self.runit, *args, **kwargs)
359 if __name__ == '__main__':