from typing import Any, Callable, Optional
from kazoo.client import KazooClient
+from kazoo.exceptions import CancelledError
from kazoo.protocol.states import KazooState
+from kazoo.recipe.lease import NonBlockingLease
import file_utils
import scott_secrets
PROGRAM_NAME: str = os.path.basename(sys.argv[0])
+class RenewableReleasableLease(NonBlockingLease):
+ """This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease
+ that adds some behaviors:
+
+ + Ability to renew the lease if it's already held without
+ going through the effort of reobtaining the same lease.
+
+ + Ability to release the lease if it's held and not yet
+ expired.
+
+ It also is more picky than the base class in terms of when it
+ evaluates to "True" (indicating that the lease is held); it will
+ begin to evaluate to "False" as soon as the lease has expired
+ even if you use to hold it. This means client code should be
+ aware that the lease can disappear and it also means that the
+ performance of evaulating the lease requires a round trip to
+ zookeeper every time.
+
+ Note that it is not valid to release the lease more than once
+ (since you no longer have it the second time). The code ignores
+ this. It's also not possible to reobtain an expired or released
+ lease by calling renew. Go create a new lease object at that
+ point.
+
+ """
+
+ def __init__(
+ self,
+ client: KazooClient,
+ path: str,
+ duration: datetime.timedelta,
+ identifier: str = None,
+ utcnow=datetime.datetime.utcnow,
+ ):
+ super().__init__(client, path, duration, identifier, utcnow)
+ self.client = client
+ self.path = path
+ self.identifier = identifier
+ self.utcnow = utcnow
+
+ def release(self):
+ """Release the lease, if it's presently being held."""
+ self.client.ensure_path(self.path)
+ holder_path = self.path + "/lease_holder"
+ lock = self.client.Lock(self.path, self.identifier)
+ try:
+ with lock:
+ if not self._is_lease_held_pre_locked():
+ logger.debug("Can't release lease; I don't have it!")
+ return
+
+ now = self.utcnow()
+ if self.client.exists(holder_path):
+ self.client.delete(holder_path)
+ end_lease = now.strftime(self._date_format)
+
+ # Release by moving end to now.
+ data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
+ self.client.create(holder_path, self._encode(data))
+ self.obtained = False
+
+ except CancelledError:
+ pass
+
+ def try_renew(self, duration: datetime.timedelta) -> bool:
+ if not self.obtained:
+ return False
+ self.obtained = False
+ self._attempt_obtaining(self.client, self.path, duration, self.identifier, self.utcnow)
+ return self.obtained
+
+ def _is_lease_held_pre_locked(self) -> bool:
+ self.client.ensure_path(self.path)
+ holder_path = self.path + "/lease_holder"
+ now = self.utcnow()
+ if self.client.exists(holder_path):
+ raw, _ = self.client.get(holder_path)
+ data = self._decode(raw)
+ if data["version"] != self._version:
+ return False
+ current_end = datetime.datetime.strptime(data['end'], self._date_format)
+ if data['holder'] != self.identifier or now > current_end:
+ return False
+ else:
+ return True
+ return False
+
+ def __bool__(self):
+ if not self.obtained:
+ return False
+ lock = self.client.Lock(self.path, self.identifier)
+ try:
+ with lock:
+ ret = self._is_lease_held_pre_locked()
+ except CancelledError:
+ return False
+ return ret
+
+
def obtain_lease(
f: Optional[Callable] = None,
*,
before invoking a function or skip invoking the function if the
lease cannot be obtained.
- There is no method of releasing a lease manually provided on
- the Kazoo public lease API. Therefore, letting the lease expire
- is the only mechanism by which it becomes re-acquirable at this
- time. Thus, due consideration is in order when choosing the
- initial duration.
-
- According to Kazoo docs, "The client may renew the lease without
- losing it by obtaining a new lease with the same path (lease_id)
- and same identity (contender_id)."
+ Note that we use a hacky "RenewableReleasableLease" and not the
+ kazoo NonBlockingLease because the former allows us to release the
+ lease when the user code returns whereas the latter does not.
Args:
lease_id: string identifying the lease to obtain
lease_id,
contender_id,
)
- lease = zk.NonBlockingLease(
+ lease = RenewableReleasableLease(
+ zk,
lease_id,
duration,
contender_id,
if also_pass_lease:
args = (*args, lease)
ret = func(*args, *kwargs)
- # Release the lease?
+ lease.release()
else:
logger.debug(
'Failed to obtain %s for contender %s, doing nothing more.',