From f38b4e71714093d2804c18a963f05e85a75a8ffe Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Sat, 20 Aug 2022 19:06:09 -0700 Subject: [PATCH] Make release return a bool. Improve tests and logging. --- tests/zookeeper_test.py | 8 ++++---- zookeeper.py | 34 +++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/tests/zookeeper_test.py b/tests/zookeeper_test.py index 6c00314..05749c1 100644 --- a/tests/zookeeper_test.py +++ b/tests/zookeeper_test.py @@ -20,9 +20,9 @@ class TestZookeeper(unittest.TestCase): @zookeeper.obtain_lease(also_pass_lease=True, duration=datetime.timedelta(minutes=1)) def test_release_lease(self, lease: zookeeper.RenewableReleasableLease): self.assertTrue(lease) - lease.release() + self.assertTrue(lease.release()) self.assertFalse(lease) - lease.release() + self.assertFalse(lease.release()) self.assertFalse(lease) @zookeeper.obtain_lease(also_pass_lease=True, duration=datetime.timedelta(minutes=1)) @@ -30,7 +30,7 @@ class TestZookeeper(unittest.TestCase): self.assertTrue(lease) self.assertTrue(lease.try_renew(datetime.timedelta(minutes=2))) self.assertTrue(lease) - lease.release() + self.assertTrue(lease.release()) @zookeeper.obtain_lease( also_pass_lease=True, @@ -38,7 +38,7 @@ class TestZookeeper(unittest.TestCase): ) def test_cant_renew_lease_after_released(self, lease: zookeeper.RenewableReleasableLease): self.assertTrue(lease) - lease.release() + self.assertTrue(lease.release()) self.assertFalse(lease) self.assertFalse(lease.try_renew(datetime.timedelta(minutes=2))) diff --git a/zookeeper.py b/zookeeper.py index 0150462..a1966dc 100644 --- a/zookeeper.py +++ b/zookeeper.py @@ -72,7 +72,7 @@ class RenewableReleasableLease(NonBlockingLease): self.identifier = identifier self.utcnow = utcnow - def release(self): + def release(self) -> bool: """Release the lease, if it's presently being held.""" self.client.ensure_path(self.path) holder_path = self.path + "/lease_holder" @@ -81,7 +81,7 @@ class RenewableReleasableLease(NonBlockingLease): with lock: if not self._is_lease_held_pre_locked(): logger.debug("Can't release lease; I don't have it!") - return + return False now = self.utcnow() if self.client.exists(holder_path): @@ -92,9 +92,12 @@ class RenewableReleasableLease(NonBlockingLease): data = {'version': self._version, 'holder': self.identifier, 'end': end_lease} self.client.create(holder_path, self._encode(data)) self.obtained = False + logger.debug('Successfully released lease') + return True - except CancelledError: - pass + except CancelledError as e: + logger.debug('Exception %s in zookeeper?', e) + return False def try_renew(self, duration: datetime.timedelta) -> bool: if not self.obtained: @@ -113,9 +116,8 @@ class RenewableReleasableLease(NonBlockingLease): if data["version"] != self._version: return False current_end = datetime.datetime.strptime(data['end'], self._date_format) - if data['holder'] != self.identifier or now > current_end: - return False - else: + if data['holder'] == self.identifier and now <= current_end: + logger.debug('Yes, we hold the lease and it isn\'t expired.') return True return False @@ -208,10 +210,15 @@ def obtain_lease( if also_pass_lease: args = (*args, lease) ret = func(*args, *kwargs) + + # We don't care if this release operation succeeds; + # there are legitimate cases where it will fail such + # as when the user code has already voluntarily + # released the lease. lease.release() else: logger.debug( - 'Failed to obtain %s for contender %s, doing nothing more.', + 'Failed to obtain %s for contender %s, shutting down.', lease_id, contender_id, ) @@ -318,15 +325,19 @@ def run_for_election( args = (*args, self.zk) args = (*args, self.stop_event) - logger.debug('Invoking user code on separate thread.') thread = threading.Thread( target=self.func, args=args, kwargs=kwargs, ) + logger.debug( + 'Invoking user code on separate thread: %s', + thread.getName(), + ) thread.start() - # Watch the state (fail safe for listener) and the thread. + # Periodically poll the zookeeper state (fail safe for + # listener) and the state of the child thread. while True: state = self.zk.client_state if state != KazooState.CONNECTED: @@ -337,11 +348,12 @@ def run_for_election( self.stop_event.set() logger.debug('Waiting for user thread to tear down...') thread.join() + logger.debug('User thread exited after our notification.') return thread.join(timeout=5.0) if not thread.is_alive(): - logger.info('Child thread exited, I\'m exiting too.') + logger.info('User thread exited on its own.') return def __call__(self, *args, **kwargs): -- 2.47.1