--- /dev/null
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# © Copyright 2022, Scott Gasch
+
+"""This is a module for making it easier to deal with Zookeeper / Kazoo."""
+
+
+import datetime
+import functools
+import logging
+import platform
+import threading
+import time
+from typing import Callable, Optional
+
+from kazoo.client import KazooClient
+from kazoo.protocol.states import KazooState
+
+import config
+import file_utils
+import scott_secrets
+
+logger = logging.getLogger(__name__)
+
+
+def obtain_lease(
+ f: Optional[Callable] = None,
+ *,
+ lease_id: str = config.PROGRAM_NAME,
+ contender_id: str = platform.node(),
+ initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
+ also_pass_lease: bool = False,
+ also_pass_zk_client: bool = False,
+):
+ """Obtain the named lease before invoking a function and skip
+ invoking the function if the lease cannot be obtained.
+
+ >>> @obtain_lease(lease_id='zookeeper_doctest')
+ ... def f(name: str) -> int:
+ ... print(f'Hello, {name}')
+ ... return 123
+
+ >>> f('Scott')
+ Hello, Scott
+ 123
+
+ """
+ if not lease_id.startswith('/leases/'):
+ lease_id = f'/leases/{lease_id}'
+ lease_id = file_utils.fix_multiple_slashes(lease_id)
+ zk = KazooClient(
+ hosts=scott_secrets.ZOOKEEPER_NODES,
+ use_ssl=True,
+ verify_certs=False,
+ keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+ certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ )
+ zk.start()
+ logger.debug('We have an active zookeeper connection.')
+
+ def wrapper(func: Callable) -> Callable:
+ @functools.wraps(func)
+ def wrapper2(*args, **kwargs):
+ logger.debug(
+ 'Trying to obtain %s for contender %s now...',
+ lease_id,
+ contender_id,
+ )
+ lease = zk.NonBlockingLease(
+ lease_id,
+ initial_duration,
+ contender_id,
+ )
+ if lease:
+ logger.debug(
+ 'Successfully obtained %s for contender %s; invoking user function.',
+ lease_id,
+ contender_id,
+ )
+ if also_pass_zk_client:
+ args = (*args, zk)
+ if also_pass_lease:
+ args = (*args, lease)
+ ret = func(*args, *kwargs)
+ else:
+ logger.debug(
+ 'Failed to obtain %s for contender %s, doing nothing more.',
+ lease_id,
+ contender_id,
+ )
+ ret = None
+ logger.debug('Shutting down zookeeper client.')
+ zk.stop()
+ return ret
+
+ return wrapper2
+
+ if f is None:
+ return wrapper
+ else:
+ return wrapper(f)
+
+
+def run_for_election(
+ f: Optional[Callable] = None,
+ *,
+ election_id: str = config.PROGRAM_NAME,
+ contender_id: str = platform.node(),
+ also_pass_zk_client: bool = False,
+):
+ """Run as a contender for a leader election. If/when we become
+ the leader, invoke the user's function.
+
+ The user's function will be executed on a new thread and must
+ accept a "stop processing" event that it must check regularly.
+ This event will be set automatically by the wrapper in the event
+ that we lose connection to zookeeper (and hence are no longer
+ confident that we are still the leader).
+
+ The user's function may return at any time which will cause
+ the wrapper to also return and effectively cede leadership.
+
+ Because the user's code is run in a separate thread, it may
+ not return anything.
+
+ >>> @run_for_election(
+ ... election_id='zookeeper_doctest',
+ ... also_pass_zk_client=True
+ ... )
+ ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
+ ... import time
+ ... count = 0
+ ... while True:
+ ... print(f"Hello, {name}, I'm the leader.")
+ ... if stop_now.is_set():
+ ... print("Oops, not anymore?!")
+ ... return
+ ... time.sleep(0.1)
+ ... count += 1
+ ... if count >= 3:
+ ... print("I'm sick of being leader.")
+ ... return
+
+ >>> g("Scott")
+ Hello, Scott, I'm the leader.
+ Hello, Scott, I'm the leader.
+ Hello, Scott, I'm the leader.
+ I'm sick of being leader.
+
+ """
+ if not election_id.startswith('/elections/'):
+ election_id = f'/elections/{election_id}'
+ election_id = file_utils.fix_multiple_slashes(election_id)
+ zk = KazooClient(
+ hosts=scott_secrets.ZOOKEEPER_NODES,
+ use_ssl=True,
+ verify_certs=False,
+ keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+ certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+ )
+ zk.start()
+ logger.debug('We have an active zookeeper connection.')
+
+ def wrapper(func: Callable) -> Callable:
+ @functools.wraps(func)
+ def runit(func, *args, **kwargs):
+ stop_event = threading.Event()
+ stop_event.clear()
+ if also_pass_zk_client:
+ args = (*args, zk)
+ args = (*args, stop_event)
+ logger.debug('Invoking user code on separate thread.')
+ thread = threading.Thread(
+ target=func,
+ args=args,
+ kwargs=kwargs,
+ )
+ thread.start()
+
+ while True:
+ state = zk.client_state
+ if state != KazooState.CONNECTED:
+ logger.error(
+ 'Bad connection to zookeeper (state=%s); bailing out.',
+ state,
+ )
+ stop_event.set()
+ thread.join()
+
+ if not thread.is_alive():
+ logger.info('Child thread exited, I\'m exiting too.')
+ return
+ time.sleep(5.0)
+
+ @functools.wraps(runit)
+ def wrapper2(*args, **kwargs):
+ election = zk.Election(election_id, contender_id)
+ election.run(runit, func, *args, **kwargs)
+ zk.stop()
+
+ return wrapper2
+
+ if f is None:
+ return wrapper
+ else:
+ return wrapper(f)
+
+
+if __name__ == '__main__':
+ import doctest
+
+ doctest.testmod()