2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
17 from typing import Callable, Optional
19 from kazoo.client import KazooClient
20 from kazoo.protocol.states import KazooState
25 logger = logging.getLogger(__name__)
28 # On module load, grab what we presume to be our process' program name.
29 # This is used, by default, as part of internal zookeeper paths (e.g.
30 # to name a lease or election).
31 PROGRAM_NAME: str = os.path.basename(sys.argv[0])
35 f: Optional[Callable] = None,
37 lease_id: str = PROGRAM_NAME,
38 contender_id: str = platform.node(),
39 initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
40 also_pass_lease: bool = False,
41 also_pass_zk_client: bool = False,
43 """Obtain the named lease before invoking a function and skip
44 invoking the function if the lease cannot be obtained.
47 ... lease_id='zookeeper_doctest',
48 ... initial_duration=datetime.timedelta(seconds=10),
50 ... def f(name: str) -> int:
51 ... print(f'Hello, {name}')
59 if not lease_id.startswith('/leases/'):
60 lease_id = f'/leases/{lease_id}'
61 lease_id = file_utils.fix_multiple_slashes(lease_id)
63 hosts=scott_secrets.ZOOKEEPER_NODES,
66 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
67 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
68 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
71 logger.debug('We have an active zookeeper connection.')
73 def wrapper(func: Callable) -> Callable:
74 @functools.wraps(func)
75 def wrapper2(*args, **kwargs):
77 'Trying to obtain %s for contender %s now...',
81 lease = zk.NonBlockingLease(
88 'Successfully obtained %s for contender %s; invoking user function.',
92 if also_pass_zk_client:
96 ret = func(*args, *kwargs)
99 'Failed to obtain %s for contender %s, doing nothing more.',
104 logger.debug('Shutting down zookeeper client.')
116 def run_for_election(
117 f: Optional[Callable] = None,
119 election_id: str = PROGRAM_NAME,
120 contender_id: str = platform.node(),
121 also_pass_zk_client: bool = False,
123 """Run as a contender for a leader election. If/when we become
124 the leader, invoke the user's function.
126 The user's function will be executed on a new thread and must
127 accept a "stop processing" event that it must check regularly.
128 This event will be set automatically by the wrapper in the event
129 that we lose connection to zookeeper (and hence are no longer
130 confident that we are still the leader).
132 The user's function may return at any time which will cause
133 the wrapper to also return and effectively cede leadership.
135 Because the user's code is run in a separate thread, it may
138 >>> @run_for_election(
139 ... election_id='zookeeper_doctest',
140 ... also_pass_zk_client=True
142 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
146 ... print(f"Hello, {name}, I'm the leader.")
147 ... if stop_now.is_set():
148 ... print("Oops, not anymore?!")
153 ... print("I'm sick of being leader.")
157 Hello, Scott, I'm the leader.
158 Hello, Scott, I'm the leader.
159 Hello, Scott, I'm the leader.
160 I'm sick of being leader.
163 if not election_id.startswith('/elections/'):
164 election_id = f'/elections/{election_id}'
165 election_id = file_utils.fix_multiple_slashes(election_id)
167 hosts=scott_secrets.ZOOKEEPER_NODES,
170 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
171 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
172 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
175 logger.debug('We have an active zookeeper connection.')
177 def wrapper(func: Callable) -> Callable:
178 @functools.wraps(func)
179 def runit(func, *args, **kwargs):
180 stop_event = threading.Event()
182 if also_pass_zk_client:
184 args = (*args, stop_event)
185 logger.debug('Invoking user code on separate thread.')
186 thread = threading.Thread(
194 state = zk.client_state
195 if state != KazooState.CONNECTED:
197 'Bad connection to zookeeper (state=%s); bailing out.',
203 if not thread.is_alive():
204 logger.info('Child thread exited, I\'m exiting too.')
208 @functools.wraps(runit)
209 def wrapper2(*args, **kwargs):
210 election = zk.Election(election_id, contender_id)
211 election.run(runit, func, *args, **kwargs)
222 if __name__ == '__main__':