From 3e6c5ad6717350924ad842bee63a12f687d5b89f Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Sat, 20 Aug 2022 16:13:25 -0700 Subject: [PATCH] Adds a releasable and renewable lease subclass. --- zookeeper.py | 118 ++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 107 insertions(+), 11 deletions(-) diff --git a/zookeeper.py b/zookeeper.py index 480ce1a..0150462 100644 --- a/zookeeper.py +++ b/zookeeper.py @@ -16,7 +16,9 @@ import threading from typing import Any, Callable, Optional from kazoo.client import KazooClient +from kazoo.exceptions import CancelledError from kazoo.protocol.states import KazooState +from kazoo.recipe.lease import NonBlockingLease import file_utils import scott_secrets @@ -30,6 +32,105 @@ logger = logging.getLogger(__name__) PROGRAM_NAME: str = os.path.basename(sys.argv[0]) +class RenewableReleasableLease(NonBlockingLease): + """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease + that adds some behaviors: + + + Ability to renew the lease if it's already held without + going through the effort of reobtaining the same lease. + + + Ability to release the lease if it's held and not yet + expired. + + It also is more picky than the base class in terms of when it + evaluates to "True" (indicating that the lease is held); it will + begin to evaluate to "False" as soon as the lease has expired + even if you use to hold it. This means client code should be + aware that the lease can disappear and it also means that the + performance of evaulating the lease requires a round trip to + zookeeper every time. + + Note that it is not valid to release the lease more than once + (since you no longer have it the second time). The code ignores + this. It's also not possible to reobtain an expired or released + lease by calling renew. Go create a new lease object at that + point. + + """ + + def __init__( + self, + client: KazooClient, + path: str, + duration: datetime.timedelta, + identifier: str = None, + utcnow=datetime.datetime.utcnow, + ): + super().__init__(client, path, duration, identifier, utcnow) + self.client = client + self.path = path + self.identifier = identifier + self.utcnow = utcnow + + def release(self): + """Release the lease, if it's presently being held.""" + self.client.ensure_path(self.path) + holder_path = self.path + "/lease_holder" + lock = self.client.Lock(self.path, self.identifier) + try: + with lock: + if not self._is_lease_held_pre_locked(): + logger.debug("Can't release lease; I don't have it!") + return + + now = self.utcnow() + if self.client.exists(holder_path): + self.client.delete(holder_path) + end_lease = now.strftime(self._date_format) + + # Release by moving end to now. + data = {'version': self._version, 'holder': self.identifier, 'end': end_lease} + self.client.create(holder_path, self._encode(data)) + self.obtained = False + + except CancelledError: + pass + + def try_renew(self, duration: datetime.timedelta) -> bool: + if not self.obtained: + return False + self.obtained = False + self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow) + return self.obtained + + def _is_lease_held_pre_locked(self) -> bool: + self.client.ensure_path(self.path) + holder_path = self.path + "/lease_holder" + now = self.utcnow() + if self.client.exists(holder_path): + raw, _ = self.client.get(holder_path) + data = self._decode(raw) + if data["version"] != self._version: + return False + current_end = datetime.datetime.strptime(data['end'], self._date_format) + if data['holder'] != self.identifier or now > current_end: + return False + else: + return True + return False + + def __bool__(self): + if not self.obtained: + return False + lock = self.client.Lock(self.path, self.identifier) + try: + with lock: + ret = self._is_lease_held_pre_locked() + except CancelledError: + return False + return ret + + def obtain_lease( f: Optional[Callable] = None, *, @@ -43,15 +144,9 @@ def obtain_lease( before invoking a function or skip invoking the function if the lease cannot be obtained. - There is no method of releasing a lease manually provided on - the Kazoo public lease API. Therefore, letting the lease expire - is the only mechanism by which it becomes re-acquirable at this - time. Thus, due consideration is in order when choosing the - initial duration. - - According to Kazoo docs, "The client may renew the lease without - losing it by obtaining a new lease with the same path (lease_id) - and same identity (contender_id)." + Note that we use a hacky "RenewableReleasableLease" and not the + kazoo NonBlockingLease because the former allows us to release the + lease when the user code returns whereas the latter does not. Args: lease_id: string identifying the lease to obtain @@ -96,7 +191,8 @@ def obtain_lease( lease_id, contender_id, ) - lease = zk.NonBlockingLease( + lease = RenewableReleasableLease( + zk, lease_id, duration, contender_id, @@ -112,7 +208,7 @@ def obtain_lease( if also_pass_lease: args = (*args, lease) ret = func(*args, *kwargs) - # Release the lease? + lease.release() else: logger.debug( 'Failed to obtain %s for contender %s, doing nothing more.', -- 2.47.1