2 # -*- coding: utf-8 -*-
4 # © Copyright 2022, Scott Gasch
6 """This is a module for making it easier to deal with Zookeeper / Kazoo."""
15 from typing import Callable, Optional
17 from kazoo.client import KazooClient
18 from kazoo.protocol.states import KazooState
24 logger = logging.getLogger(__name__)
28 f: Optional[Callable] = None,
30 lease_id: str = config.PROGRAM_NAME,
31 contender_id: str = platform.node(),
32 initial_duration: datetime.timedelta = datetime.timedelta(minutes=5),
33 also_pass_lease: bool = False,
34 also_pass_zk_client: bool = False,
36 """Obtain the named lease before invoking a function and skip
37 invoking the function if the lease cannot be obtained.
40 ... lease_id='zookeeper_doctest',
41 ... initial_duration=datetime.timedelta(seconds=10),
43 ... def f(name: str) -> int:
44 ... print(f'Hello, {name}')
52 if not lease_id.startswith('/leases/'):
53 lease_id = f'/leases/{lease_id}'
54 lease_id = file_utils.fix_multiple_slashes(lease_id)
56 hosts=scott_secrets.ZOOKEEPER_NODES,
59 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
60 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
61 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
64 logger.debug('We have an active zookeeper connection.')
66 def wrapper(func: Callable) -> Callable:
67 @functools.wraps(func)
68 def wrapper2(*args, **kwargs):
70 'Trying to obtain %s for contender %s now...',
74 lease = zk.NonBlockingLease(
81 'Successfully obtained %s for contender %s; invoking user function.',
85 if also_pass_zk_client:
89 ret = func(*args, *kwargs)
92 'Failed to obtain %s for contender %s, doing nothing more.',
97 logger.debug('Shutting down zookeeper client.')
109 def run_for_election(
110 f: Optional[Callable] = None,
112 election_id: str = config.PROGRAM_NAME,
113 contender_id: str = platform.node(),
114 also_pass_zk_client: bool = False,
116 """Run as a contender for a leader election. If/when we become
117 the leader, invoke the user's function.
119 The user's function will be executed on a new thread and must
120 accept a "stop processing" event that it must check regularly.
121 This event will be set automatically by the wrapper in the event
122 that we lose connection to zookeeper (and hence are no longer
123 confident that we are still the leader).
125 The user's function may return at any time which will cause
126 the wrapper to also return and effectively cede leadership.
128 Because the user's code is run in a separate thread, it may
131 >>> @run_for_election(
132 ... election_id='zookeeper_doctest',
133 ... also_pass_zk_client=True
135 ... def g(name: str, zk: KazooClient, stop_now: threading.Event):
139 ... print(f"Hello, {name}, I'm the leader.")
140 ... if stop_now.is_set():
141 ... print("Oops, not anymore?!")
146 ... print("I'm sick of being leader.")
150 Hello, Scott, I'm the leader.
151 Hello, Scott, I'm the leader.
152 Hello, Scott, I'm the leader.
153 I'm sick of being leader.
156 if not election_id.startswith('/elections/'):
157 election_id = f'/elections/{election_id}'
158 election_id = file_utils.fix_multiple_slashes(election_id)
160 hosts=scott_secrets.ZOOKEEPER_NODES,
163 keyfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
164 keyfile_password=scott_secrets.ZOOKEEPER_CLIENT_PASS,
165 certfile=scott_secrets.ZOOKEEPER_CLIENT_CERT,
168 logger.debug('We have an active zookeeper connection.')
170 def wrapper(func: Callable) -> Callable:
171 @functools.wraps(func)
172 def runit(func, *args, **kwargs):
173 stop_event = threading.Event()
175 if also_pass_zk_client:
177 args = (*args, stop_event)
178 logger.debug('Invoking user code on separate thread.')
179 thread = threading.Thread(
187 state = zk.client_state
188 if state != KazooState.CONNECTED:
190 'Bad connection to zookeeper (state=%s); bailing out.',
196 if not thread.is_alive():
197 logger.info('Child thread exited, I\'m exiting too.')
201 @functools.wraps(runit)
202 def wrapper2(*args, **kwargs):
203 election = zk.Election(election_id, contender_id)
204 election.run(runit, func, *args, **kwargs)
215 if __name__ == '__main__':