2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
16 from typing import Callable, Optional
18 from kazoo.client import KazooClient
19 from kazoo.protocol.states import KazooState
24 logger = logging.getLogger(__name__)
27 # On module load, grab what we presume to be our process' program name.
28 # This is used, by default, to construct internal zookeeper paths (e.g.
29 # to identify a lease or election).
30 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
34 f: Optional[Callable] = None,
36 lease_id: str = PROGRAM_NAME,
37 contender_id: str = platform.node(),
38 duration: datetime.timedelta = datetime.timedelta(minutes=5),
39 also_pass_lease: bool = False,
40 also_pass_zk_client: bool = False,
42 """Obtain an exclusive lease identified by the lease_id name
43 before invoking a function or skip invoking the function if the
44 lease cannot be obtained.
47 lease_id: string identifying the lease to obtain
48 contender_id: string identifying who's attempting to obtain
49 duration: how long should the lease be held, if obtained?
50 also_pass_lease: pass the lease into the user function
51 also_pass_zk_client: pass our zk client into the user function
54 ... lease_id='zookeeper_doctest',
55 ... duration=datetime.timedelta(seconds=5),
57 ... def f(name: str) -> int:
58 ... print(f'Hello, {name}')
66 if not lease_id.startswith('/leases/'):
67 lease_id = f'/leases/{lease_id}'
68 lease_id = file_utils.fix_multiple_slashes(lease_id)
70 def wrapper(func: Callable) -> Callable:
71 @functools.wraps(func)
72 def wrapper2(*args, **kwargs):
74 hosts=scott_secrets.ZOOKEEPER_NODES,
77 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
78 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
79 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
82 logger.debug('We have an active zookeeper connection.')
85 'Trying to obtain %s for contender %s now...',
89 lease = zk.NonBlockingLease(
96 'Successfully obtained %s for contender %s; invoking user function.',
100 if also_pass_zk_client:
103 args = (*args, lease)
104 ret = func(*args, *kwargs)
107 'Failed to obtain %s for contender %s, doing nothing more.',
112 logger.debug('Shutting down zookeeper client.')
124 def run_for_election(
125 f: Optional[Callable] = None,
127 election_id: str = PROGRAM_NAME,
128 contender_id: str = platform.node(),
129 also_pass_zk_client: bool = False,
131 """Run as a contender for a leader election. If/when we become
132 the leader, invoke the user's function.
134 The user's function will be executed on a new thread and must
135 accept a "stop processing" event that it must check regularly.
136 This event will be set automatically by the wrapper in the event
137 that we lose connection to zookeeper (and hence are no longer
138 confident that we are still the leader).
140 The user's function may return at any time which will cause
141 the wrapper to also return and effectively cede leadership.
143 Because the user's code is run in a separate thread, it may
144 not return anything / whatever it returns will be dropped.
147 election_id: global string identifier for the election
148 contender_id: string identifying who is running for leader
149 also_pass_zk_client: pass the zk client into the user code
151 >>> @run_for_election(
152 ... election_id='zookeeper_doctest',
153 ... also_pass_zk_client=True
155 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
159 ... print(f"Hello, {name}, I'm the leader.")
160 ... if stop_now.is_set():
161 ... print("Oops, not anymore?!")
166 ... print("I'm sick of being leader.")
170 Hello, Scott, I'm the leader.
171 Hello, Scott, I'm the leader.
172 Hello, Scott, I'm the leader.
173 I'm sick of being leader.
176 if not election_id.startswith('/elections/'):
177 election_id = f'/elections/{election_id}'
178 election_id = file_utils.fix_multiple_slashes(election_id)
181 """Helper wrapper class."""
183 def __init__(self, func: Callable) -> None:
184 functools.update_wrapper(self, func)
186 self.zk = KazooClient(
187 hosts=scott_secrets.ZOOKEEPER_NODES,
190 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
191 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
192 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
195 logger.debug('We have an active zookeeper connection.')
196 self.stop_event = threading.Event()
197 self.stop_event.clear()
199 def zk_listener(self, state: KazooState) -> None:
200 logger.debug('Listener received state %s.', state)
201 if state != KazooState.CONNECTED:
203 'Bad connection to zookeeper (state=%s); bailing out.',
206 self.stop_event.set()
208 def runit(self, *args, **kwargs) -> None:
209 # Possibly augment args if requested; always pass stop_event
210 if also_pass_zk_client:
211 args = (*args, self.zk)
212 args = (*args, self.stop_event)
214 logger.debug('Invoking user code on separate thread.')
215 thread = threading.Thread(
222 # Watch the state (fail safe for listener) and the thread.
224 state = self.zk.client_state
225 if state != KazooState.CONNECTED:
227 'Bad connection to zookeeper (state=%s); bailing out.',
230 self.stop_event.set()
231 logger.debug('Waiting for user thread to tear down...')
235 thread.join(timeout=5.0)
236 if not thread.is_alive():
237 logger.info('Child thread exited, I\'m exiting too.')
240 def __call__(self, *args, **kwargs):
241 election = self.zk.Election(election_id, contender_id)
242 self.zk.add_listener(self.zk_listener)
243 election.run(self.runit, *args, **kwargs)
252 if __name__ == '__main__':