@zookeeper.obtain_lease(also_pass_lease=True, duration=datetime.timedelta(minutes=1))
def test_release_lease(self, lease: zookeeper.RenewableReleasableLease):
self.assertTrue(lease)
- lease.release()
+ self.assertTrue(lease.release())
self.assertFalse(lease)
- lease.release()
+ self.assertFalse(lease.release())
self.assertFalse(lease)
@zookeeper.obtain_lease(also_pass_lease=True, duration=datetime.timedelta(minutes=1))
self.assertTrue(lease)
self.assertTrue(lease.try_renew(datetime.timedelta(minutes=2)))
self.assertTrue(lease)
- lease.release()
+ self.assertTrue(lease.release())
@zookeeper.obtain_lease(
also_pass_lease=True,
)
def test_cant_renew_lease_after_released(self, lease: zookeeper.RenewableReleasableLease):
self.assertTrue(lease)
- lease.release()
+ self.assertTrue(lease.release())
self.assertFalse(lease)
self.assertFalse(lease.try_renew(datetime.timedelta(minutes=2)))
self.identifier = identifier
self.utcnow = utcnow
- def release(self):
+ def release(self) -> bool:
"""Release the lease, if it's presently being held."""
self.client.ensure_path(self.path)
holder_path = self.path + "/lease_holder"
with lock:
if not self._is_lease_held_pre_locked():
logger.debug("Can't release lease; I don't have it!")
- return
+ return False
now = self.utcnow()
if self.client.exists(holder_path):
data = {'version': self._version, 'holder': self.identifier, 'end': end_lease}
self.client.create(holder_path, self._encode(data))
self.obtained = False
+ logger.debug('Successfully released lease')
+ return True
- except CancelledError:
- pass
+ except CancelledError as e:
+ logger.debug('Exception %s in zookeeper?', e)
+ return False
def try_renew(self, duration: datetime.timedelta) -> bool:
if not self.obtained:
if data["version"] != self._version:
return False
current_end = datetime.datetime.strptime(data['end'], self._date_format)
- if data['holder'] != self.identifier or now > current_end:
- return False
- else:
+ if data['holder'] == self.identifier and now <= current_end:
+ logger.debug('Yes, we hold the lease and it isn\'t expired.')
return True
return False
if also_pass_lease:
args = (*args, lease)
ret = func(*args, *kwargs)
+
+ # We don't care if this release operation succeeds;
+ # there are legitimate cases where it will fail such
+ # as when the user code has already voluntarily
+ # released the lease.
lease.release()
else:
logger.debug(
- 'Failed to obtain %s for contender %s, doing nothing more.',
+ 'Failed to obtain %s for contender %s, shutting down.',
lease_id,
contender_id,
)
args = (*args, self.zk)
args = (*args, self.stop_event)
- logger.debug('Invoking user code on separate thread.')
thread = threading.Thread(
target=self.func,
args=args,
kwargs=kwargs,
)
+ logger.debug(
+ 'Invoking user code on separate thread: %s',
+ thread.getName(),
+ )
thread.start()
- # Watch the state (fail safe for listener) and the thread.
+ # Periodically poll the zookeeper state (fail safe for
+ # listener) and the state of the child thread.
while True:
state = self.zk.client_state
if state != KazooState.CONNECTED:
self.stop_event.set()
logger.debug('Waiting for user thread to tear down...')
thread.join()
+ logger.debug('User thread exited after our notification.')
return
thread.join(timeout=5.0)
if not thread.is_alive():
- logger.info('Child thread exited, I\'m exiting too.')
+ logger.info('User thread exited on its own.')
return
def __call__(self, *args, **kwargs):