11 import multiprocessing
18 from typing import Callable, Optional
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
26 logger = logging.getLogger(__name__)
29 def timed(func: Callable) -> Callable:
30 """Print the runtime of the decorated function."""
32 @functools.wraps(func)
33 def wrapper_timer(*args, **kwargs):
34 start_time = time.perf_counter()
35 value = func(*args, **kwargs)
36 end_time = time.perf_counter()
37 run_time = end_time - start_time
38 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
45 def invocation_logged(func: Callable) -> Callable:
46 """Log the call of a function."""
48 @functools.wraps(func)
49 def wrapper_invocation_logged(*args, **kwargs):
50 now = datetime.datetime.now()
51 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
52 msg = f"[{ts}]: Entered {func.__name__}"
55 ret = func(*args, **kwargs)
56 now = datetime.datetime.now()
57 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
58 msg = f"[{ts}]: Exited {func.__name__}"
62 return wrapper_invocation_logged
65 def debug_args(func: Callable) -> Callable:
66 """Print the function signature and return value at each call."""
68 @functools.wraps(func)
69 def wrapper_debug_args(*args, **kwargs):
70 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
71 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
72 signature = ", ".join(args_repr + kwargs_repr)
73 msg = f"Calling {func.__name__}({signature})"
76 value = func(*args, **kwargs)
77 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
80 return wrapper_debug_args
83 def debug_count_calls(func: Callable) -> Callable:
84 """Count function invocations and print a message befor every call."""
86 @functools.wraps(func)
87 def wrapper_debug_count_calls(*args, **kwargs):
88 wrapper_debug_count_calls.num_calls += 1
89 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
92 return func(*args, **kwargs)
93 wrapper_debug_count_calls.num_calls = 0
94 return wrapper_debug_count_calls
97 class DelayWhen(enum.Enum):
104 _func: Callable = None,
106 seconds: float = 1.0,
107 when: DelayWhen = DelayWhen.BEFORE_CALL,
109 """Delay the execution of a function by sleeping before and/or after.
111 Slow down a function by inserting a delay before and/or after its
115 def decorator_delay(func: Callable) -> Callable:
116 @functools.wraps(func)
117 def wrapper_delay(*args, **kwargs):
118 if when & DelayWhen.BEFORE_CALL:
120 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
123 retval = func(*args, **kwargs)
124 if when & DelayWhen.AFTER_CALL:
126 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
133 return decorator_delay
135 return decorator_delay(_func)
138 class _SingletonWrapper:
140 A singleton wrapper class. Its instances would be created
141 for each decorated class.
144 def __init__(self, cls):
145 self.__wrapped__ = cls
146 self._instance = None
148 def __call__(self, *args, **kwargs):
149 """Returns a single instance of decorated class"""
151 f"@singleton returning global instance of {self.__wrapped__.__name__}"
153 if self._instance is None:
154 self._instance = self.__wrapped__(*args, **kwargs)
155 return self._instance
160 A singleton decorator. Returns a wrapper objects. A call on that object
161 returns a single instance object of decorated class. Use the __wrapped__
162 attribute to access decorated class directly in unit tests
164 return _SingletonWrapper(cls)
167 def memoized(func: Callable) -> Callable:
168 """Keep a cache of previous function call results.
170 The cache here is a dict with a key based on the arguments to the
171 call. Consider also: functools.lru_cache for a more advanced
175 @functools.wraps(func)
176 def wrapper_memoized(*args, **kwargs):
177 cache_key = args + tuple(kwargs.items())
178 if cache_key not in wrapper_memoized.cache:
179 value = func(*args, **kwargs)
181 f"Memoizing {cache_key} => {value} for {func.__name__}"
183 wrapper_memoized.cache[cache_key] = value
185 logger.debug(f"Returning memoized value for {func.__name__}")
186 return wrapper_memoized.cache[cache_key]
187 wrapper_memoized.cache = dict()
188 return wrapper_memoized
194 predicate: Callable[..., bool],
195 delay_sec: float = 3,
196 backoff: float = 2.0,
198 """Retries a function or method up to a certain number of times
199 with a prescribed initial delay period and backoff rate.
201 tries is the maximum number of attempts to run the function.
202 delay_sec sets the initial delay period in seconds.
203 backoff is a multiplied (must be >1) used to modify the delay.
204 predicate is a function that will be passed the retval of the
205 decorated function and must return True to stop or False to
209 msg = f"backoff must be greater than or equal to 1, got {backoff}"
211 raise ValueError(msg)
213 tries = math.floor(tries)
215 msg = f"tries must be 0 or greater, got {tries}"
217 raise ValueError(msg)
220 msg = f"delay_sec must be greater than 0, got {delay_sec}"
222 raise ValueError(msg)
226 def f_retry(*args, **kwargs):
227 mtries, mdelay = tries, delay_sec # make mutable
228 retval = f(*args, **kwargs)
230 if predicate(retval) is True:
232 logger.debug("Predicate failed, sleeping and retrying.")
236 retval = f(*args, **kwargs)
242 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
243 return retry_predicate(
245 predicate=lambda x: x is True,
251 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
252 return retry_predicate(
254 predicate=lambda x: x is not None,
260 def deprecated(func):
261 """This is a decorator which can be used to mark functions
262 as deprecated. It will result in a warning being emitted
263 when the function is used.
266 @functools.wraps(func)
267 def wrapper_deprecated(*args, **kwargs):
268 msg = f"Call to deprecated function {func.__name__}"
270 warnings.warn(msg, category=DeprecationWarning)
271 return func(*args, **kwargs)
273 return wrapper_deprecated
278 Make a function immediately return a function of no args which,
279 when called, waits for the result, which will start being
280 processed in another thread.
283 @functools.wraps(func)
284 def lazy_thunked(*args, **kwargs):
285 wait_event = threading.Event()
292 func_result = func(*args, **kwargs)
293 result[0] = func_result
296 exc[1] = sys.exc_info() # (type, value, traceback)
297 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
306 raise exc[1][0](exc[1][1])
309 threading.Thread(target=worker_func).start()
315 ############################################################
317 ############################################################
319 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
321 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
324 def _raise_exception(exception, error_message: Optional[str]):
325 if error_message is None:
328 raise exception(error_message)
331 def _target(queue, function, *args, **kwargs):
332 """Run a function with arguments and return output via a queue.
334 This is a helper function for the Process created in _Timeout. It runs
335 the function with positional arguments and keyword arguments and then
336 returns the function's output by way of a queue. If an exception gets
337 raised, it is returned to _Timeout to be raised by the value property.
340 queue.put((True, function(*args, **kwargs)))
342 queue.put((False, sys.exc_info()[1]))
345 class _Timeout(object):
346 """Wrap a function and add a timeout (limit) attribute to it.
348 Instances of this class are automatically generated by the add_timeout
349 function defined below.
355 timeout_exception: Exception,
359 self.__limit = seconds
360 self.__function = function
361 self.__timeout_exception = timeout_exception
362 self.__error_message = error_message
363 self.__name__ = function.__name__
364 self.__doc__ = function.__doc__
365 self.__timeout = time.time()
366 self.__process = multiprocessing.Process()
367 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
369 def __call__(self, *args, **kwargs):
370 """Execute the embedded function object asynchronously.
372 The function given to the constructor is transparently called and
373 requires that "ready" be intermittently polled. If and when it is
374 True, the "value" property may then be checked for returned data.
376 self.__limit = kwargs.pop("timeout", self.__limit)
377 self.__queue = multiprocessing.Queue(1)
378 args = (self.__queue, self.__function) + args
379 self.__process = multiprocessing.Process(
380 target=_target, args=args, kwargs=kwargs
382 self.__process.daemon = True
383 self.__process.start()
384 if self.__limit is not None:
385 self.__timeout = self.__limit + time.time()
386 while not self.ready:
391 """Terminate any possible execution of the embedded function."""
392 if self.__process.is_alive():
393 self.__process.terminate()
394 _raise_exception(self.__timeout_exception, self.__error_message)
398 """Read-only property indicating status of "value" property."""
399 if self.__limit and self.__timeout < time.time():
401 return self.__queue.full() and not self.__queue.empty()
405 """Read-only property containing data returned from function."""
406 if self.ready is True:
407 flag, load = self.__queue.get()
414 seconds: float = 1.0,
415 use_signals: Optional[bool] = None,
416 timeout_exception=exceptions.TimeoutError,
417 error_message="Function call timed out",
419 """Add a timeout parameter to a function and return the function.
421 Note: the use_signals parameter is included in order to support
422 multiprocessing scenarios (signal can only be used from the process'
423 main thread). When not using signals, timeout granularity will be
424 rounded to the nearest 0.1s.
426 Raises an exception when the timeout is reached.
428 It is illegal to pass anything other than a function as the first
429 parameter. The function is wrapped and returned to the caller.
431 if use_signals is None:
433 use_signals = thread_utils.is_current_thread_main_thread()
435 def decorate(function):
439 def handler(signum, frame):
440 _raise_exception(timeout_exception, error_message)
442 @functools.wraps(function)
443 def new_function(*args, **kwargs):
444 new_seconds = kwargs.pop("timeout", seconds)
446 old = signal.signal(signal.SIGALRM, handler)
447 signal.setitimer(signal.ITIMER_REAL, new_seconds)
450 return function(*args, **kwargs)
453 return function(*args, **kwargs)
456 signal.setitimer(signal.ITIMER_REAL, 0)
457 signal.signal(signal.SIGALRM, old)
462 @functools.wraps(function)
463 def new_function(*args, **kwargs):
464 timeout_wrapper = _Timeout(
465 function, timeout_exception, error_message, seconds
467 return timeout_wrapper(*args, **kwargs)
474 class non_reentrant_code(object):
476 self._lock = threading.RLock
477 self._entered = False
479 def __call__(self, f):
480 def _gatekeeper(*args, **kwargs):
486 self._entered = False
491 class rlocked(object):
493 self._lock = threading.RLock
494 self._entered = False
496 def __call__(self, f):
497 def _gatekeeper(*args, **kwargs):
503 self._entered = False
507 def call_with_sample_rate(sample_rate: float) -> Callable:
508 if not 0.0 <= sample_rate <= 1.0:
509 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
511 raise ValueError(msg)
515 def _call_with_sample_rate(*args, **kwargs):
516 if random.uniform(0, 1) < sample_rate:
517 return f(*args, **kwargs)
520 f"@call_with_sample_rate skipping a call to {f.__name__}"
522 return _call_with_sample_rate
526 def decorate_matching_methods_with(decorator, acl=None):
527 """Apply decorator to all methods in a class whose names begin with
528 prefix. If prefix is None (default), decorate all methods in the
531 def decorate_the_class(cls):
532 for name, m in inspect.getmembers(cls, inspect.isfunction):
534 setattr(cls, name, decorator(m))
537 setattr(cls, name, decorator(m))
539 return decorate_the_class