Add zookeeper.py; some handy wrappers for leader election and leases.
authorScott Gasch <[email protected]>
Sat, 20 Aug 2022 01:24:01 +0000 (18:24 -0700)
committerScott Gasch <[email protected]>
Sat, 20 Aug 2022 01:24:01 +0000 (18:24 -0700)
zookeeper.py [new file with mode: 0755]

diff --git a/zookeeper.py b/zookeeper.py
new file mode 100755 (executable)
index 0000000..42771c9
--- /dev/null
@@ -0,0 +1,215 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+# © Copyright 2022, Scott Gasch
+
+"""This is a module for making it easier to deal with Zookeeper / Kazoo."""
+
+
+import datetime
+import functools
+import logging
+import platform
+import threading
+import time
+from typing import Callable, Optional
+
+from kazoo.client import KazooClient
+from kazoo.protocol.states import KazooState
+
+import config
+import file_utils
+import scott_secrets
+
+logger = logging.getLogger(__name__)
+
+
+def obtain_lease(
+    f: Optional[Callable] = None,
+    *,
+    lease_id: str = config.PROGRAM_NAME,
+    contender_id: str = platform.node(),
+    initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
+    also_pass_lease: bool = False,
+    also_pass_zk_client: bool = False,
+):
+    """Obtain the named lease before invoking a function and skip
+    invoking the function if the lease cannot be obtained.
+
+    >>> @obtain_lease(lease_id='zookeeper_doctest')
+    ... def f(name: str) -> int:
+    ...     print(f'Hello, {name}')
+    ...     return 123
+
+    >>> f('Scott')
+    Hello, Scott
+    123
+
+    """
+    if not lease_id.startswith('/leases/'):
+        lease_id = f'/leases/{lease_id}'
+        lease_id = file_utils.fix_multiple_slashes(lease_id)
+    zk = KazooClient(
+        hosts=scott_secrets.ZOOKEEPER_NODES,
+        use_ssl=True,
+        verify_certs=False,
+        keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+        keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+        certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+    )
+    zk.start()
+    logger.debug('We have an active zookeeper connection.')
+
+    def wrapper(func: Callable) -> Callable:
+        @functools.wraps(func)
+        def wrapper2(*args, **kwargs):
+            logger.debug(
+                'Trying to obtain %s for contender %s now...',
+                lease_id,
+                contender_id,
+            )
+            lease = zk.NonBlockingLease(
+                lease_id,
+                initial_duration,
+                contender_id,
+            )
+            if lease:
+                logger.debug(
+                    'Successfully obtained %s for contender %s; invoking user function.',
+                    lease_id,
+                    contender_id,
+                )
+                if also_pass_zk_client:
+                    args = (*args, zk)
+                if also_pass_lease:
+                    args = (*args, lease)
+                ret = func(*args, *kwargs)
+            else:
+                logger.debug(
+                    'Failed to obtain %s for contender %s, doing nothing more.',
+                    lease_id,
+                    contender_id,
+                )
+                ret = None
+            logger.debug('Shutting down zookeeper client.')
+            zk.stop()
+            return ret
+
+        return wrapper2
+
+    if f is None:
+        return wrapper
+    else:
+        return wrapper(f)
+
+
+def run_for_election(
+    f: Optional[Callable] = None,
+    *,
+    election_id: str = config.PROGRAM_NAME,
+    contender_id: str = platform.node(),
+    also_pass_zk_client: bool = False,
+):
+    """Run as a contender for a leader election.  If/when we become
+    the leader, invoke the user's function.
+
+    The user's function will be executed on a new thread and must
+    accept a "stop processing" event that it must check regularly.
+    This event will be set automatically by the wrapper in the event
+    that we lose connection to zookeeper (and hence are no longer
+    confident that we are still the leader).
+
+    The user's function may return at any time which will cause
+    the wrapper to also return and effectively cede leadership.
+
+    Because the user's code is run in a separate thread, it may
+    not return anything.
+
+    >>> @run_for_election(
+    ...         election_id='zookeeper_doctest',
+    ...         also_pass_zk_client=True
+    ... )
+    ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
+    ...     import time
+    ...     count = 0
+    ...     while True:
+    ...         print(f"Hello, {name}, I'm the leader.")
+    ...         if stop_now.is_set():
+    ...             print("Oops, not anymore?!")
+    ...             return
+    ...         time.sleep(0.1)
+    ...         count += 1
+    ...         if count >= 3:
+    ...             print("I'm sick of being leader.")
+    ...             return
+
+    >>> g("Scott")
+    Hello, Scott, I'm the leader.
+    Hello, Scott, I'm the leader.
+    Hello, Scott, I'm the leader.
+    I'm sick of being leader.
+
+    """
+    if not election_id.startswith('/elections/'):
+        election_id = f'/elections/{election_id}'
+        election_id = file_utils.fix_multiple_slashes(election_id)
+    zk = KazooClient(
+        hosts=scott_secrets.ZOOKEEPER_NODES,
+        use_ssl=True,
+        verify_certs=False,
+        keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+        keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
+        certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
+    )
+    zk.start()
+    logger.debug('We have an active zookeeper connection.')
+
+    def wrapper(func: Callable) -> Callable:
+        @functools.wraps(func)
+        def runit(func, *args, **kwargs):
+            stop_event = threading.Event()
+            stop_event.clear()
+            if also_pass_zk_client:
+                args = (*args, zk)
+            args = (*args, stop_event)
+            logger.debug('Invoking user code on separate thread.')
+            thread = threading.Thread(
+                target=func,
+                args=args,
+                kwargs=kwargs,
+            )
+            thread.start()
+
+            while True:
+                state = zk.client_state
+                if state != KazooState.CONNECTED:
+                    logger.error(
+                        'Bad connection to zookeeper (state=%s); bailing out.',
+                        state,
+                    )
+                    stop_event.set()
+                    thread.join()
+
+                if not thread.is_alive():
+                    logger.info('Child thread exited, I\'m exiting too.')
+                    return
+                time.sleep(5.0)
+
+        @functools.wraps(runit)
+        def wrapper2(*args, **kwargs):
+            election = zk.Election(election_id, contender_id)
+            election.run(runit, func, *args, **kwargs)
+            zk.stop()
+
+        return wrapper2
+
+    if f is None:
+        return wrapper
+    else:
+        return wrapper(f)
+
+
+if __name__ == '__main__':
+    import doctest
+
+    doctest.testmod()