#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# © Copyright 2022, Scott Gasch
"""
This is a module for making it easier to deal with Zookeeper / Kazoo.
Apache Zookeeper (https://zookeeper.apache.org/) is a consistent centralized
datastore. :mod:`pyutils.config` optionally uses it to save/read program
configuration. But it's also very useful for things like distributed
master election, locking, etc...
"""
import datetime
import functools
import json
import logging
import os
import platform
import sys
import threading
from typing import Any, Callable, Optional, Tuple
from kazoo.client import KazooClient
from kazoo.exceptions import CancelledError
from kazoo.protocol.states import KazooState
from kazoo.recipe.lease import NonBlockingLease
from kazoo.retry import KazooRetry
from pyutils import argparse_utils, config
from pyutils.exceptions import PyUtilsException
from pyutils.files import file_utils
logger = logging.getLogger(__name__)
cfg = config.add_commandline_args(
f'Zookeeper ({__file__})',
'Args related python-zookeeper interactions',
)
cfg.add_argument(
'--zookeeper_config',
type=argparse_utils.valid_filename,
default=None,
metavar='FILENAME',
help='Path to file zookeeper configuration',
)
# On module load, grab what we presume to be our process' program name.
# This is used, by default, to construct internal zookeeper paths (e.g.
# to identify a lease or election).
PROGRAM_NAME: str = os.path.basename(sys.argv[0])
[docs]
def get_zookeeper_config() -> Optional[Tuple[str, str, str]]:
config_file = config.config['zookeeper_config']
if not config_file:
config_file = f'{os.environ["HOME"]}/.zookeeper_secrets'
try:
with open(config_file, 'r', encoding='utf-8') as rf:
contents = rf.read()
json_dict = json.loads(contents)
if (
'zookeeper_nodes' in json_dict
and 'zookeeper_client_cert_path' in json_dict
and 'zookeeper_client_passphrase' in json_dict
):
return (
json_dict['zookeeper_nodes'],
json_dict['zookeeper_client_cert_path'],
json_dict['zookeeper_client_passphrase'],
)
except Exception:
logger.exception("Ignoring exception from json parsing")
return None
[docs]
def get_started_zk_client() -> KazooClient:
"""
Returns:
A zk client library reference that has been connected and started
using the commandline provided address, certificates and passphrase.
"""
_ = get_zookeeper_config()
if _ is not None:
(zookeeper_nodes, zookeeper_client_cert_path, zookeeper_client_passphrase) = _
else:
raise PyUtilsException("No valid zookeeper config was found.")
zk = KazooClient(
connection_retry=KazooRetry(max_tries=10),
hosts=zookeeper_nodes,
use_ssl=True,
verify_certs=False,
keyfile=zookeeper_client_cert_path,
keyfile_password=zookeeper_client_passphrase,
certfile=zookeeper_client_cert_path,
)
zk.start()
logger.debug('We have an active zookeeper connection.')
return zk
[docs]
class RenewableReleasableLease(NonBlockingLease):
"""This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
(see https://kazoo.readthedocs.io/en/latest/api/recipe/lease.html#kazoo.recipe.lease.NonBlockingLease) that adds some behaviors:
+ Ability to renew the lease if it's already held without
going through the effort of reobtaining the same lease
name.
+ Ability to release the lease if it's held and not yet
expired.
It also is more picky than the base class in terms of when it
evaluates to "True" (indicating that the lease is held); it will
begin to evaluate to "False" as soon as the lease has expired even
if you used to hold it. This means client code should be aware
that the lease can disappear (expire) while held and it also means
that the performance of evaulating the lease (i.e. if lease:)
requires a round trip to zookeeper every time.
Note that it is not valid to release the lease more than once
(since you no longer have it the second time). The code ignores
the 2nd..nth attempt. It's also not possible to reobtain an
expired or released lease by calling renew. Go create a new lease
object at that point. Finally, note that when you renew the lease
it will evaluate to False briefly as it is reobtained.
"""
def __init__(
self,
client: KazooClient,
path: str,
duration: datetime.timedelta,
identifier: Optional[str] = None,
utcnow=datetime.datetime.utcnow,
):
"""Construct the RenewableReleasableLease.
Args:
client: a KazooClient that is connected and started
path: the path to the lease in zookeeper
duration: duration during which the lease is reserved
identifier: unique name to use for this lease holder.
Reuse in order to renew the lease.
utcnow: clock function, by default returning
:meth:`datetime.datetime.utcnow`. Used for testing.
"""
super().__init__(client, path, duration, identifier, utcnow)
self.client = client
self.path = path
self.identifier = identifier
self.utcnow = utcnow
[docs]
def release(self) -> bool:
"""Release the lease, if it's presently being held.
Returns:
True if the lease was successfully released,
False otherwise.
"""
self.client.ensure_path(self.path)
holder_path = self.path + "/lease_holder"
lock = self.client.Lock(self.path, self.identifier)
try:
with lock:
if not self._is_lease_held_pre_locked():
logger.debug("Can't release lease; I don't have it!")
return False
now = self.utcnow()
if self.client.exists(holder_path):
self.client.delete(holder_path)
end_lease = now.strftime(self._date_format)
# Release by moving end to now.
data = {
'version': self._version,
'holder': self.identifier,
'end': end_lease,
}
self.client.create(holder_path, self._encode(data))
self.obtained = False
logger.debug('Successfully released lease')
return True
except CancelledError as e:
logger.debug('Exception %s in zookeeper?', e)
return False
[docs]
def try_renew(self, duration: datetime.timedelta) -> bool:
"""Attempt to renew a lease that is currently held. Note that
this will cause self to evaluate to False briefly as the lease
is renewed.
Args:
duration: the amount of additional time to add to the
current lease expiration.
Returns:
True if the lease was successfully renewed,
False otherwise.
"""
if not self.obtained:
return False
self.obtained = False
self._attempt_obtaining(
self.client, self.path, duration, self.identifier, self.utcnow
)
return self.obtained
def _is_lease_held_pre_locked(self) -> bool:
self.client.ensure_path(self.path)
holder_path = self.path + "/lease_holder"
now = self.utcnow()
if self.client.exists(holder_path):
raw, _ = self.client.get(holder_path)
data = self._decode(raw)
if data["version"] != self._version:
return False
current_end = datetime.datetime.strptime(data['end'], self._date_format)
if data['holder'] == self.identifier and now <= current_end:
logger.debug('Yes, we hold the lease and it isn\'t expired.')
return True
return False
def __bool__(self):
"""
.. note:
This implementation differs from that of the base class in
that it probes zookeeper to ensure that the lease is not yet
expired and is therefore more expensive.
"""
if not self.obtained:
return False
lock = self.client.Lock(self.path, self.identifier)
try:
with lock:
ret = self._is_lease_held_pre_locked()
except CancelledError:
return False
return ret
[docs]
def obtain_lease(
f: Optional[Callable] = None,
*,
lease_id: str = PROGRAM_NAME,
contender_id: str = platform.node(),
duration: datetime.timedelta = datetime.timedelta(minutes=5),
also_pass_lease: bool = False,
also_pass_zk_client: bool = False,
):
"""Obtain an exclusive lease identified by the lease_id name
before invoking a function or skip invoking the function if the
lease cannot be obtained.
Note that we use a hacky "RenewableReleasableLease" and not the
kazoo NonBlockingLease because the former allows us to release the
lease when the user code returns whereas the latter does not.
Args:
lease_id: string identifying the lease to obtain
contender_id: string identifying who's attempting to obtain
duration: how long should the lease be held, if obtained?
also_pass_lease: pass the lease into the user function
also_pass_zk_client: pass our zk client into the user function
>>> @obtain_lease(
... lease_id='zookeeper_doctest',
... duration=datetime.timedelta(seconds=5),
... )
... def f(name: str) -> int:
... print(f'Hello, {name}')
... return 123
>>> f('Scott')
Hello, Scott
123
"""
if not lease_id.startswith('/leases/'):
lease_id = f'/leases/{lease_id}'
lease_id = file_utils.fix_multiple_slashes(lease_id)
def wrapper(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper2(*args, **kwargs) -> Optional[Any]:
zk = get_started_zk_client()
logger.debug(
'Trying to obtain %s for contender %s now...',
lease_id,
contender_id,
)
lease = RenewableReleasableLease(
zk,
lease_id,
duration,
contender_id,
)
if lease:
logger.debug(
'Successfully obtained %s for contender %s; invoking user function.',
lease_id,
contender_id,
)
if also_pass_zk_client:
args = (*args, zk)
if also_pass_lease:
args = (*args, lease)
ret = func(*args, *kwargs)
# We don't care if this release operation succeeds;
# there are legitimate cases where it will fail such
# as when the user code has already voluntarily
# released the lease.
lease.release()
else:
logger.debug(
'Failed to obtain %s for contender %s, shutting down.',
lease_id,
contender_id,
)
ret = None
logger.debug('Shutting down zookeeper client.')
zk.stop()
return ret
return wrapper2
if f is None:
return wrapper
else:
return wrapper(f)
[docs]
def run_for_election(
f: Optional[Callable] = None,
*,
election_id: str = PROGRAM_NAME,
contender_id: str = platform.node(),
also_pass_zk_client: bool = False,
):
"""Run as a contender for a leader election. If/when we become
the leader, invoke the user's function.
The user's function will be executed on a new thread and must
accept a "stop processing" event that it must check regularly.
This event will be set automatically by the wrapper in the event
that we lose connection to zookeeper (and hence are no longer
confident that we are still the leader).
The user's function may return at any time which will cause
the wrapper to also return and effectively cede leadership.
Because the user's code is run in a separate thread, it may
not return anything / whatever it returns will be dropped.
Args:
election_id: global string identifier for the election
contender_id: string identifying who is running for leader
also_pass_zk_client: pass the zk client into the user code
>>> @run_for_election(
... election_id='zookeeper_doctest',
... also_pass_zk_client=True
... )
... def g(name: str, zk: KazooClient, stop_now: threading.Event):
... import time
... count = 0
... while True:
... print(f"Hello, {name}, I'm the leader.")
... if stop_now.is_set():
... print("Oops, not anymore?!")
... return
... time.sleep(0.1)
... count += 1
... if count >= 3:
... print("I'm sick of being leader.")
... return
>>> g("Scott")
Hello, Scott, I'm the leader.
Hello, Scott, I'm the leader.
Hello, Scott, I'm the leader.
I'm sick of being leader.
"""
if not election_id.startswith('/elections/'):
election_id = f'/elections/{election_id}'
election_id = file_utils.fix_multiple_slashes(election_id)
class wrapper:
"""Helper wrapper class."""
def __init__(self, func: Callable) -> None:
functools.update_wrapper(self, func)
self.func = func
self.zk = get_started_zk_client()
self.stop_event = threading.Event()
self.stop_event.clear()
def zk_listener(self, state: KazooState) -> None:
logger.debug('Listener received state %s.', state)
if state != KazooState.CONNECTED:
logger.debug(
'Bad connection to zookeeper (state=%s); bailing out.',
state,
)
self.stop_event.set()
def runit(self, *args, **kwargs) -> None:
# Possibly augment args if requested; always pass stop_event
if also_pass_zk_client:
args = (*args, self.zk)
args = (*args, self.stop_event)
thread = threading.Thread(
target=self.func,
args=args,
kwargs=kwargs,
)
logger.debug(
'Invoking user code on separate thread: %s',
thread.name,
)
thread.start()
# Periodically poll the zookeeper state (fail safe for
# listener) and the state of the child thread.
while True:
state = self.zk.client_state
if state != KazooState.CONNECTED:
logger.error(
'Bad connection to zookeeper (state=%s); bailing out.',
state,
)
self.stop_event.set()
logger.debug('Waiting for user thread to tear down...')
thread.join()
logger.debug('User thread exited after our notification.')
return
thread.join(timeout=5.0)
if not thread.is_alive():
logger.info('User thread exited on its own.')
return
def __call__(self, *args, **kwargs):
election = self.zk.Election(election_id, contender_id)
self.zk.add_listener(self.zk_listener)
election.run(self.runit, *args, **kwargs)
self.zk.stop()
if f is None:
return wrapper
else:
return wrapper(f)
if __name__ == '__main__':
import doctest
doctest.testmod()