#!/usr/bin/env python3 # -*- coding: utf-8 -*- # © Copyright 2022, Scott Gasch """This is a module for making it easier to deal with Zookeeper / Kazoo.""" import datetime import functools import logging import os import platform import sys import threading from typing import Callable, Optional from kazoo.client import KazooClient from kazoo.protocol.states import KazooState import file_utils import scott_secrets logger = logging.getLogger(__name__) # On module load, grab what we presume to be our process' program name. # This is used, by default, to construct internal zookeeper paths (e.g. # to identify a lease or election). PROGRAM_NAME: str = os.path.basename(sys.argv[0]) def obtain_lease( f: Optional[Callable] = None, *, lease_id: str = PROGRAM_NAME, contender_id: str = platform.node(), duration: datetime.timedelta = datetime.timedelta(minutes=5), also_pass_lease: bool = False, also_pass_zk_client: bool = False, ): """Obtain an exclusive lease identified by the lease_id name before invoking a function or skip invoking the function if the lease cannot be obtained. Args: lease_id: string identifying the lease to obtain contender_id: string identifying who's attempting to obtain duration: how long should the lease be held, if obtained? also_pass_lease: pass the lease into the user function also_pass_zk_client: pass our zk client into the user function >>> @obtain_lease( ... lease_id='zookeeper_doctest', ... duration=datetime.timedelta(seconds=5), ... ) ... def f(name: str) -> int: ... print(f'Hello, {name}') ... return 123 >>> f('Scott') Hello, Scott 123 """ if not lease_id.startswith('/leases/'): lease_id = f'/leases/{lease_id}' lease_id = file_utils.fix_multiple_slashes(lease_id) def wrapper(func: Callable) -> Callable: @functools.wraps(func) def wrapper2(*args, **kwargs): zk = KazooClient( hosts=scott_secrets.ZOOKEEPER_NODES, use_ssl=True, verify_certs=False, keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, ) zk.start() logger.debug('We have an active zookeeper connection.') logger.debug( 'Trying to obtain %s for contender %s now...', lease_id, contender_id, ) lease = zk.NonBlockingLease( lease_id, duration, contender_id, ) if lease: logger.debug( 'Successfully obtained %s for contender %s; invoking user function.', lease_id, contender_id, ) if also_pass_zk_client: args = (*args, zk) if also_pass_lease: args = (*args, lease) ret = func(*args, *kwargs) else: logger.debug( 'Failed to obtain %s for contender %s, doing nothing more.', lease_id, contender_id, ) ret = None logger.debug('Shutting down zookeeper client.') zk.stop() return ret return wrapper2 if f is None: return wrapper else: return wrapper(f) def run_for_election( f: Optional[Callable] = None, *, election_id: str = PROGRAM_NAME, contender_id: str = platform.node(), also_pass_zk_client: bool = False, ): """Run as a contender for a leader election. If/when we become the leader, invoke the user's function. The user's function will be executed on a new thread and must accept a "stop processing" event that it must check regularly. This event will be set automatically by the wrapper in the event that we lose connection to zookeeper (and hence are no longer confident that we are still the leader). The user's function may return at any time which will cause the wrapper to also return and effectively cede leadership. Because the user's code is run in a separate thread, it may not return anything / whatever it returns will be dropped. Args: election_id: global string identifier for the election contender_id: string identifying who is running for leader also_pass_zk_client: pass the zk client into the user code >>> @run_for_election( ... election_id='zookeeper_doctest', ... also_pass_zk_client=True ... ) ... def g(name: str, zk: KazooClient, stop_now: threading.Event): ... import time ... count = 0 ... while True: ... print(f"Hello, {name}, I'm the leader.") ... if stop_now.is_set(): ... print("Oops, not anymore?!") ... return ... time.sleep(0.1) ... count += 1 ... if count >= 3: ... print("I'm sick of being leader.") ... return >>> g("Scott") Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. I'm sick of being leader. """ if not election_id.startswith('/elections/'): election_id = f'/elections/{election_id}' election_id = file_utils.fix_multiple_slashes(election_id) class wrapper: """Helper wrapper class.""" def __init__(self, func: Callable) -> None: functools.update_wrapper(self, func) self.func = func self.zk = KazooClient( hosts=scott_secrets.ZOOKEEPER_NODES, use_ssl=True, verify_certs=False, keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, ) self.zk.start() logger.debug('We have an active zookeeper connection.') self.stop_event = threading.Event() self.stop_event.clear() def zk_listener(self, state: KazooState) -> None: logger.debug('Listener received state %s.', state) if state != KazooState.CONNECTED: logger.debug( 'Bad connection to zookeeper (state=%s); bailing out.', state, ) self.stop_event.set() def runit(self, *args, **kwargs) -> None: # Possibly augment args if requested; always pass stop_event if also_pass_zk_client: args = (*args, self.zk) args = (*args, self.stop_event) logger.debug('Invoking user code on separate thread.') thread = threading.Thread( target=self.func, args=args, kwargs=kwargs, ) thread.start() # Watch the state (fail safe for listener) and the thread. while True: state = self.zk.client_state if state != KazooState.CONNECTED: logger.error( 'Bad connection to zookeeper (state=%s); bailing out.', state, ) self.stop_event.set() logger.debug('Waiting for user thread to tear down...') thread.join() return thread.join(timeout=5.0) if not thread.is_alive(): logger.info('Child thread exited, I\'m exiting too.') return def __call__(self, *args, **kwargs): election = self.zk.Election(election_id, contender_id) self.zk.add_listener(self.zk_listener) election.run(self.runit, *args, **kwargs) self.zk.stop() if f is None: return wrapper else: return wrapper(f) if __name__ == '__main__': import doctest doctest.testmod()