From 4f457d760b6163eec639cf9e7bab4d5b9c1c29f0 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Fri, 19 Aug 2022 18:24:01 -0700 Subject: [PATCH] Add zookeeper.py; some handy wrappers for leader election and leases. --- zookeeper.py | 215 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100755 zookeeper.py diff --git a/zookeeper.py b/zookeeper.py new file mode 100755 index 0000000..42771c9 --- /dev/null +++ b/zookeeper.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# © Copyright 2022, Scott Gasch + +"""This is a module for making it easier to deal with Zookeeper / Kazoo.""" + + +import datetime +import functools +import logging +import platform +import threading +import time +from typing import Callable, Optional + +from kazoo.client import KazooClient +from kazoo.protocol.states import KazooState + +import config +import file_utils +import scott_secrets + +logger = logging.getLogger(__name__) + + +def obtain_lease( + f: Optional[Callable] = None, + *, + lease_id: str = config.PROGRAM_NAME, + contender_id: str = platform.node(), + initial_duration: datetime.timedelta = datetime.timedelta(minutes=5), + also_pass_lease: bool = False, + also_pass_zk_client: bool = False, +): + """Obtain the named lease before invoking a function and skip + invoking the function if the lease cannot be obtained. + + >>> @obtain_lease(lease_id='zookeeper_doctest') + ... def f(name: str) -> int: + ... print(f'Hello, {name}') + ... return 123 + + >>> f('Scott') + Hello, Scott + 123 + + """ + if not lease_id.startswith('/leases/'): + lease_id = f'/leases/{lease_id}' + lease_id = file_utils.fix_multiple_slashes(lease_id) + zk = KazooClient( + hosts=scott_secrets.ZOOKEEPER_NODES, + use_ssl=True, + verify_certs=False, + keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, + keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, + certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, + ) + zk.start() + logger.debug('We have an active zookeeper connection.') + + def wrapper(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper2(*args, **kwargs): + logger.debug( + 'Trying to obtain %s for contender %s now...', + lease_id, + contender_id, + ) + lease = zk.NonBlockingLease( + lease_id, + initial_duration, + contender_id, + ) + if lease: + logger.debug( + 'Successfully obtained %s for contender %s; invoking user function.', + lease_id, + contender_id, + ) + if also_pass_zk_client: + args = (*args, zk) + if also_pass_lease: + args = (*args, lease) + ret = func(*args, *kwargs) + else: + logger.debug( + 'Failed to obtain %s for contender %s, doing nothing more.', + lease_id, + contender_id, + ) + ret = None + logger.debug('Shutting down zookeeper client.') + zk.stop() + return ret + + return wrapper2 + + if f is None: + return wrapper + else: + return wrapper(f) + + +def run_for_election( + f: Optional[Callable] = None, + *, + election_id: str = config.PROGRAM_NAME, + contender_id: str = platform.node(), + also_pass_zk_client: bool = False, +): + """Run as a contender for a leader election. If/when we become + the leader, invoke the user's function. + + The user's function will be executed on a new thread and must + accept a "stop processing" event that it must check regularly. + This event will be set automatically by the wrapper in the event + that we lose connection to zookeeper (and hence are no longer + confident that we are still the leader). + + The user's function may return at any time which will cause + the wrapper to also return and effectively cede leadership. + + Because the user's code is run in a separate thread, it may + not return anything. + + >>> @run_for_election( + ... election_id='zookeeper_doctest', + ... also_pass_zk_client=True + ... ) + ... def g(name: str, zk: KazooClient, stop_now: threading.Event): + ... import time + ... count = 0 + ... while True: + ... print(f"Hello, {name}, I'm the leader.") + ... if stop_now.is_set(): + ... print("Oops, not anymore?!") + ... return + ... time.sleep(0.1) + ... count += 1 + ... if count >= 3: + ... print("I'm sick of being leader.") + ... return + + >>> g("Scott") + Hello, Scott, I'm the leader. + Hello, Scott, I'm the leader. + Hello, Scott, I'm the leader. + I'm sick of being leader. + + """ + if not election_id.startswith('/elections/'): + election_id = f'/elections/{election_id}' + election_id = file_utils.fix_multiple_slashes(election_id) + zk = KazooClient( + hosts=scott_secrets.ZOOKEEPER_NODES, + use_ssl=True, + verify_certs=False, + keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, + keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, + certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, + ) + zk.start() + logger.debug('We have an active zookeeper connection.') + + def wrapper(func: Callable) -> Callable: + @functools.wraps(func) + def runit(func, *args, **kwargs): + stop_event = threading.Event() + stop_event.clear() + if also_pass_zk_client: + args = (*args, zk) + args = (*args, stop_event) + logger.debug('Invoking user code on separate thread.') + thread = threading.Thread( + target=func, + args=args, + kwargs=kwargs, + ) + thread.start() + + while True: + state = zk.client_state + if state != KazooState.CONNECTED: + logger.error( + 'Bad connection to zookeeper (state=%s); bailing out.', + state, + ) + stop_event.set() + thread.join() + + if not thread.is_alive(): + logger.info('Child thread exited, I\'m exiting too.') + return + time.sleep(5.0) + + @functools.wraps(runit) + def wrapper2(*args, **kwargs): + election = zk.Election(election_id, contender_id) + election.run(runit, func, *args, **kwargs) + zk.stop() + + return wrapper2 + + if f is None: + return wrapper + else: + return wrapper(f) + + +if __name__ == '__main__': + import doctest + + doctest.testmod() -- 2.45.0