2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
15 from typing import Callable, Optional
17 from kazoo.client import KazooClient
18 from kazoo.protocol.states import KazooState
24 logger = logging.getLogger(__name__)
28 f: Optional[Callable] = None,
30 lease_id: str = config.PROGRAM_NAME,
31 contender_id: str = platform.node(),
32 initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
33 also_pass_lease: bool = False,
34 also_pass_zk_client: bool = False,
36 """Obtain the named lease before invoking a function and skip
37 invoking the function if the lease cannot be obtained.
39 >>> @obtain_lease(lease_id='zookeeper_doctest')
40 ... def f(name: str) -> int:
41 ... print(f'Hello, {name}')
49 if not lease_id.startswith('/leases/'):
50 lease_id = f'/leases/{lease_id}'
51 lease_id = file_utils.fix_multiple_slashes(lease_id)
53 hosts=scott_secrets.ZOOKEEPER_NODES,
56 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
57 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
58 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
61 logger.debug('We have an active zookeeper connection.')
63 def wrapper(func: Callable) -> Callable:
64 @functools.wraps(func)
65 def wrapper2(*args, **kwargs):
67 'Trying to obtain %s for contender %s now...',
71 lease = zk.NonBlockingLease(
78 'Successfully obtained %s for contender %s; invoking user function.',
82 if also_pass_zk_client:
86 ret = func(*args, *kwargs)
89 'Failed to obtain %s for contender %s, doing nothing more.',
94 logger.debug('Shutting down zookeeper client.')
106 def run_for_election(
107 f: Optional[Callable] = None,
109 election_id: str = config.PROGRAM_NAME,
110 contender_id: str = platform.node(),
111 also_pass_zk_client: bool = False,
113 """Run as a contender for a leader election. If/when we become
114 the leader, invoke the user's function.
116 The user's function will be executed on a new thread and must
117 accept a "stop processing" event that it must check regularly.
118 This event will be set automatically by the wrapper in the event
119 that we lose connection to zookeeper (and hence are no longer
120 confident that we are still the leader).
122 The user's function may return at any time which will cause
123 the wrapper to also return and effectively cede leadership.
125 Because the user's code is run in a separate thread, it may
128 >>> @run_for_election(
129 ... election_id='zookeeper_doctest',
130 ... also_pass_zk_client=True
132 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
136 ... print(f"Hello, {name}, I'm the leader.")
137 ... if stop_now.is_set():
138 ... print("Oops, not anymore?!")
143 ... print("I'm sick of being leader.")
147 Hello, Scott, I'm the leader.
148 Hello, Scott, I'm the leader.
149 Hello, Scott, I'm the leader.
150 I'm sick of being leader.
153 if not election_id.startswith('/elections/'):
154 election_id = f'/elections/{election_id}'
155 election_id = file_utils.fix_multiple_slashes(election_id)
157 hosts=scott_secrets.ZOOKEEPER_NODES,
160 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
161 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
162 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
165 logger.debug('We have an active zookeeper connection.')
167 def wrapper(func: Callable) -> Callable:
168 @functools.wraps(func)
169 def runit(func, *args, **kwargs):
170 stop_event = threading.Event()
172 if also_pass_zk_client:
174 args = (*args, stop_event)
175 logger.debug('Invoking user code on separate thread.')
176 thread = threading.Thread(
184 state = zk.client_state
185 if state != KazooState.CONNECTED:
187 'Bad connection to zookeeper (state=%s); bailing out.',
193 if not thread.is_alive():
194 logger.info('Child thread exited, I\'m exiting too.')
198 @functools.wraps(runit)
199 def wrapper2(*args, **kwargs):
200 election = zk.Election(election_id, contender_id)
201 election.run(runit, func, *args, **kwargs)
212 if __name__ == '__main__':