Adds a releasable and renewable lease subclass.
authorScott Gasch <[email protected]>
Sat, 20 Aug 2022 23:13:25 +0000 (16:13 -0700)
committerScott Gasch <[email protected]>
Sat, 20 Aug 2022 23:13:25 +0000 (16:13 -0700)
zookeeper.py

index 480ce1a041497abfaedd6f6a159060b57ec484da..01504621d5e33c898c782b7f83bf71a51eaf465a 100644 (file)
@@ -16,7 +16,9 @@ import threading
 from typing import Any, Callable, Optional
 
 from kazoo.client import KazooClient
+from kazoo.exceptions import CancelledError
 from kazoo.protocol.states import KazooState
+from kazoo.recipe.lease import NonBlockingLease
 
 import file_utils
 import scott_secrets
@@ -30,6 +32,105 @@ logger = logging.getLogger(__name__)
 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
 
 
+class RenewableReleasableLease(NonBlockingLease):
+    """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
+    that adds some behaviors:
+
+        + Ability to renew the lease if it's already held without
+          going through the effort of reobtaining the same lease.
+
+        + Ability to release the lease if it's held and not yet
+          expired.
+
+    It also is more picky than the base class in terms of when it
+    evaluates to "True" (indicating that the lease is held); it will
+    begin to evaluate to "False" as soon as the lease has expired
+    even if you use to hold it.  This means client code should be
+    aware that the lease can disappear and it also means that the
+    performance of evaulating the lease requires a round trip to
+    zookeeper every time.
+
+    Note that it is not valid to release the lease more than once
+    (since you no longer have it the second time).  The code ignores
+    this.  It's also not possible to reobtain an expired or released
+    lease by calling renew.  Go create a new lease object at that
+    point.
+
+    """
+
+    def __init__(
+        self,
+        client: KazooClient,
+        path: str,
+        duration: datetime.timedelta,
+        identifier: str = None,
+        utcnow=datetime.datetime.utcnow,
+    ):
+        super().__init__(client, path, duration, identifier, utcnow)
+        self.client = client
+        self.path = path
+        self.identifier = identifier
+        self.utcnow = utcnow
+
+    def release(self):
+        """Release the lease, if it's presently being held."""
+        self.client.ensure_path(self.path)
+        holder_path = self.path + "/lease_holder"
+        lock = self.client.Lock(self.path, self.identifier)
+        try:
+            with lock:
+                if not self._is_lease_held_pre_locked():
+                    logger.debug("Can't release lease; I don't have it!")
+                    return
+
+                now = self.utcnow()
+                if self.client.exists(holder_path):
+                    self.client.delete(holder_path)
+                end_lease = now.strftime(self._date_format)
+
+                # Release by moving end to now.
+                data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
+                self.client.create(holder_path, self._encode(data))
+                self.obtained = False
+
+        except CancelledError:
+            pass
+
+    def try_renew(self, duration: datetime.timedelta) -> bool:
+        if not self.obtained:
+            return False
+        self.obtained = False
+        self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
+        return self.obtained
+
+    def _is_lease_held_pre_locked(self) -> bool:
+        self.client.ensure_path(self.path)
+        holder_path = self.path + "/lease_holder"
+        now = self.utcnow()
+        if self.client.exists(holder_path):
+            raw, _ = self.client.get(holder_path)
+            data = self._decode(raw)
+            if data["version"] != self._version:
+                return False
+            current_end = datetime.datetime.strptime(data['end'], self._date_format)
+            if data['holder'] != self.identifier or now > current_end:
+                return False
+            else:
+                return True
+        return False
+
+    def __bool__(self):
+        if not self.obtained:
+            return False
+        lock = self.client.Lock(self.path, self.identifier)
+        try:
+            with lock:
+                ret = self._is_lease_held_pre_locked()
+        except CancelledError:
+            return False
+        return ret
+
+
 def obtain_lease(
     f: Optional[Callable] = None,
     *,
@@ -43,15 +144,9 @@ def obtain_lease(
     before invoking a function or skip invoking the function if the
     lease cannot be obtained.
 
-    There is no method of releasing a lease manually provided on
-    the Kazoo public lease API.  Therefore, letting the lease expire
-    is the only mechanism by which it becomes re-acquirable at this
-    time.  Thus, due consideration is in order when choosing the
-    initial duration.
-
-    According to Kazoo docs, "The client may renew the lease without
-    losing it by obtaining a new lease with the same path (lease_id)
-    and same identity (contender_id)."
+    Note that we use a hacky "RenewableReleasableLease" and not the
+    kazoo NonBlockingLease because the former allows us to release the
+    lease when the user code returns whereas the latter does not.
 
     Args:
         lease_id: string identifying the lease to obtain
@@ -96,7 +191,8 @@ def obtain_lease(
                 lease_id,
                 contender_id,
             )
-            lease = zk.NonBlockingLease(
+            lease = RenewableReleasableLease(
+                zk,
                 lease_id,
                 duration,
                 contender_id,
@@ -112,7 +208,7 @@ def obtain_lease(
                 if also_pass_lease:
                     args = (*args, lease)
                 ret = func(*args, *kwargs)
-                # Release the lease?
+                lease.release()
             else:
                 logger.debug(
                     'Failed to obtain %s for contender %s, doing nothing more.',