#!/usr/bin/env python3 # -*- coding: utf-8 -*- # © Copyright 2022, Scott Gasch """This is a module for making it easier to deal with Zookeeper / Kazoo.""" import datetime import functools import logging import platform import threading import time from typing import Callable, Optional from kazoo.client import KazooClient from kazoo.protocol.states import KazooState import config import file_utils import scott_secrets logger = logging.getLogger(__name__) def obtain_lease( f: Optional[Callable] = None, *, lease_id: str = config.PROGRAM_NAME, contender_id: str = platform.node(), initial_duration: datetime.timedelta = datetime.timedelta(minutes=5), also_pass_lease: bool = False, also_pass_zk_client: bool = False, ): """Obtain the named lease before invoking a function and skip invoking the function if the lease cannot be obtained. >>> @obtain_lease( ... lease_id='zookeeper_doctest', ... initial_duration=datetime.timedelta(seconds=10), ... ) ... def f(name: str) -> int: ... print(f'Hello, {name}') ... return 123 >>> f('Scott') Hello, Scott 123 """ if not lease_id.startswith('/leases/'): lease_id = f'/leases/{lease_id}' lease_id = file_utils.fix_multiple_slashes(lease_id) zk = KazooClient( hosts=scott_secrets.ZOOKEEPER_NODES, use_ssl=True, verify_certs=False, keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, ) zk.start() logger.debug('We have an active zookeeper connection.') def wrapper(func: Callable) -> Callable: @functools.wraps(func) def wrapper2(*args, **kwargs): logger.debug( 'Trying to obtain %s for contender %s now...', lease_id, contender_id, ) lease = zk.NonBlockingLease( lease_id, initial_duration, contender_id, ) if lease: logger.debug( 'Successfully obtained %s for contender %s; invoking user function.', lease_id, contender_id, ) if also_pass_zk_client: args = (*args, zk) if also_pass_lease: args = (*args, lease) ret = func(*args, *kwargs) else: logger.debug( 'Failed to obtain %s for contender %s, doing nothing more.', lease_id, contender_id, ) ret = None logger.debug('Shutting down zookeeper client.') zk.stop() return ret return wrapper2 if f is None: return wrapper else: return wrapper(f) def run_for_election( f: Optional[Callable] = None, *, election_id: str = config.PROGRAM_NAME, contender_id: str = platform.node(), also_pass_zk_client: bool = False, ): """Run as a contender for a leader election. If/when we become the leader, invoke the user's function. The user's function will be executed on a new thread and must accept a "stop processing" event that it must check regularly. This event will be set automatically by the wrapper in the event that we lose connection to zookeeper (and hence are no longer confident that we are still the leader). The user's function may return at any time which will cause the wrapper to also return and effectively cede leadership. Because the user's code is run in a separate thread, it may not return anything. >>> @run_for_election( ... election_id='zookeeper_doctest', ... also_pass_zk_client=True ... ) ... def g(name: str, zk: KazooClient, stop_now: threading.Event): ... import time ... count = 0 ... while True: ... print(f"Hello, {name}, I'm the leader.") ... if stop_now.is_set(): ... print("Oops, not anymore?!") ... return ... time.sleep(0.1) ... count += 1 ... if count >= 3: ... print("I'm sick of being leader.") ... return >>> g("Scott") Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. I'm sick of being leader. """ if not election_id.startswith('/elections/'): election_id = f'/elections/{election_id}' election_id = file_utils.fix_multiple_slashes(election_id) zk = KazooClient( hosts=scott_secrets.ZOOKEEPER_NODES, use_ssl=True, verify_certs=False, keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS, certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT, ) zk.start() logger.debug('We have an active zookeeper connection.') def wrapper(func: Callable) -> Callable: @functools.wraps(func) def runit(func, *args, **kwargs): stop_event = threading.Event() stop_event.clear() if also_pass_zk_client: args = (*args, zk) args = (*args, stop_event) logger.debug('Invoking user code on separate thread.') thread = threading.Thread( target=func, args=args, kwargs=kwargs, ) thread.start() while True: state = zk.client_state if state != KazooState.CONNECTED: logger.error( 'Bad connection to zookeeper (state=%s); bailing out.', state, ) stop_event.set() thread.join() if not thread.is_alive(): logger.info('Child thread exited, I\'m exiting too.') return time.sleep(5.0) @functools.wraps(runit) def wrapper2(*args, **kwargs): election = zk.Election(election_id, contender_id) election.run(runit, func, *args, **kwargs) zk.stop() return wrapper2 if f is None: return wrapper else: return wrapper(f) if __name__ == '__main__': import doctest doctest.testmod()