10 import multiprocessing
17 from typing import Any, Callable, Optional
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
25 logger = logging.getLogger(__name__)
28 def timed(func: Callable) -> Callable:
29 """Print the runtime of the decorated function.
36 >>> foo() # doctest: +ELLIPSIS
41 @functools.wraps(func)
42 def wrapper_timer(*args, **kwargs):
43 start_time = time.perf_counter()
44 value = func(*args, **kwargs)
45 end_time = time.perf_counter()
46 run_time = end_time - start_time
47 msg = f"Finished {func.__qualname__} in {run_time:.4f}s"
55 def invocation_logged(func: Callable) -> Callable:
56 """Log the call of a function.
58 >>> @invocation_logged
60 ... print('Hello, world.')
69 @functools.wraps(func)
70 def wrapper_invocation_logged(*args, **kwargs):
71 msg = f"Entered {func.__qualname__}"
74 ret = func(*args, **kwargs)
75 msg = f"Exited {func.__qualname__}"
80 return wrapper_invocation_logged
84 n_calls: int, *, per_period_in_seconds: float = 1.0
86 """Limit invocation of a wrapped function to n calls per period.
87 Thread safe. In testing this was relatively fair with multiple
88 threads using it though that hasn't been measured.
91 >>> import decorator_utils
92 >>> import thread_utils
96 >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
97 ... def limited(x: int):
101 >>> @thread_utils.background_thread
103 ... for _ in range(3):
106 >>> @thread_utils.background_thread
108 ... for _ in range(3):
111 >>> start = time.time()
116 >>> end = time.time()
117 >>> dur = end - start
125 min_interval_seconds = per_period_in_seconds / float(n_calls)
127 def wrapper_rate_limited(func: Callable) -> Callable:
128 cv = threading.Condition()
129 last_invocation_timestamp = [0.0]
131 def may_proceed() -> float:
133 last_invocation = last_invocation_timestamp[0]
134 if last_invocation != 0.0:
135 elapsed_since_last = now - last_invocation
136 wait_time = min_interval_seconds - elapsed_since_last
139 logger.debug(f'@{time.time()}> wait_time = {wait_time}')
142 def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
146 lambda: may_proceed() <= 0.0,
147 timeout=may_proceed(),
151 logger.debug(f'@{time.time()}> calling it...')
152 ret = func(*args, **kargs)
153 last_invocation_timestamp[0] = time.time()
155 f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
160 return wrapper_wrapper_rate_limited
162 return wrapper_rate_limited
165 def debug_args(func: Callable) -> Callable:
166 """Print the function signature and return value at each call.
169 ... def foo(a, b, c):
173 ... return (a + b, c)
175 >>> foo(1, 2.0, "test")
176 Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>)
180 foo returned (3.0, 'test'):<class 'tuple'>
184 @functools.wraps(func)
185 def wrapper_debug_args(*args, **kwargs):
186 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
187 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
188 signature = ", ".join(args_repr + kwargs_repr)
189 msg = f"Calling {func.__qualname__}({signature})"
192 value = func(*args, **kwargs)
193 msg = f"{func.__qualname__} returned {value!r}:{type(value)}"
198 return wrapper_debug_args
201 def debug_count_calls(func: Callable) -> Callable:
202 """Count function invocations and print a message befor every call.
204 >>> @debug_count_calls
208 ... return x * factoral(x - 1)
211 Call #1 of 'factoral'
212 Call #2 of 'factoral'
213 Call #3 of 'factoral'
214 Call #4 of 'factoral'
215 Call #5 of 'factoral'
220 @functools.wraps(func)
221 def wrapper_debug_count_calls(*args, **kwargs):
222 wrapper_debug_count_calls.num_calls += 1
224 f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
228 return func(*args, **kwargs)
230 wrapper_debug_count_calls.num_calls = 0
231 return wrapper_debug_count_calls
234 class DelayWhen(enum.IntEnum):
241 _func: Callable = None,
243 seconds: float = 1.0,
244 when: DelayWhen = DelayWhen.BEFORE_CALL,
246 """Delay the execution of a function by sleeping before and/or after.
248 Slow down a function by inserting a delay before and/or after its
253 >>> @delay(seconds=1.0)
257 >>> start = time.time()
259 >>> dur = time.time() - start
265 def decorator_delay(func: Callable) -> Callable:
266 @functools.wraps(func)
267 def wrapper_delay(*args, **kwargs):
268 if when & DelayWhen.BEFORE_CALL:
270 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
273 retval = func(*args, **kwargs)
274 if when & DelayWhen.AFTER_CALL:
276 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
284 return decorator_delay
286 return decorator_delay(_func)
289 class _SingletonWrapper:
291 A singleton wrapper class. Its instances would be created
292 for each decorated class.
296 def __init__(self, cls):
297 self.__wrapped__ = cls
298 self._instance = None
300 def __call__(self, *args, **kwargs):
301 """Returns a single instance of decorated class"""
303 f"@singleton returning global instance of {self.__wrapped__.__name__}"
305 if self._instance is None:
306 self._instance = self.__wrapped__(*args, **kwargs)
307 return self._instance
312 A singleton decorator. Returns a wrapper objects. A call on that object
313 returns a single instance object of decorated class. Use the __wrapped__
314 attribute to access decorated class directly in unit tests
317 ... class foo(object):
329 return _SingletonWrapper(cls)
332 def memoized(func: Callable) -> Callable:
333 """Keep a cache of previous function call results.
335 The cache here is a dict with a key based on the arguments to the
336 call. Consider also: functools.lru_cache for a more advanced
342 ... def expensive(arg) -> int:
343 ... # Simulate something slow to compute or lookup
347 >>> start = time.time()
348 >>> expensive(5) # Takes about 1 sec
351 >>> expensive(3) # Also takes about 1 sec
354 >>> expensive(5) # Pulls from cache, fast
357 >>> expensive(3) # Pulls from cache again, fast
360 >>> dur = time.time() - start
366 @functools.wraps(func)
367 def wrapper_memoized(*args, **kwargs):
368 cache_key = args + tuple(kwargs.items())
369 if cache_key not in wrapper_memoized.cache:
370 value = func(*args, **kwargs)
372 f"Memoizing {cache_key} => {value} for {func.__name__}"
374 wrapper_memoized.cache[cache_key] = value
376 logger.debug(f"Returning memoized value for {func.__name__}")
377 return wrapper_memoized.cache[cache_key]
379 wrapper_memoized.cache = dict()
380 return wrapper_memoized
386 predicate: Callable[..., bool],
387 delay_sec: float = 3.0,
388 backoff: float = 2.0,
390 """Retries a function or method up to a certain number of times
391 with a prescribed initial delay period and backoff rate.
393 tries is the maximum number of attempts to run the function.
394 delay_sec sets the initial delay period in seconds.
395 backoff is a multiplied (must be >1) used to modify the delay.
396 predicate is a function that will be passed the retval of the
397 decorated function and must return True to stop or False to
402 msg = f"backoff must be greater than or equal to 1, got {backoff}"
404 raise ValueError(msg)
406 tries = math.floor(tries)
408 msg = f"tries must be 0 or greater, got {tries}"
410 raise ValueError(msg)
413 msg = f"delay_sec must be greater than 0, got {delay_sec}"
415 raise ValueError(msg)
419 def f_retry(*args, **kwargs):
420 mtries, mdelay = tries, delay_sec # make mutable
421 logger.debug(f'deco_retry: will make up to {mtries} attempts...')
422 retval = f(*args, **kwargs)
424 if predicate(retval) is True:
425 logger.debug('Predicate succeeded, deco_retry is done.')
427 logger.debug("Predicate failed, sleeping and retrying.")
431 retval = f(*args, **kwargs)
439 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
440 """A helper for @retry_predicate that retries a decorated
441 function as long as it keeps returning False.
447 >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
451 ... return counter >= 3
453 >>> start = time.time()
454 >>> foo() # fail, delay 1.0, fail, delay 1.1, succeed
457 >>> dur = time.time() - start
466 return retry_predicate(
468 predicate=lambda x: x is True,
474 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
475 """Another helper for @retry_predicate above. Retries up to N
476 times so long as the wrapped function returns None with a delay
477 between each retry and a backoff that can increase the delay.
480 return retry_predicate(
482 predicate=lambda x: x is not None,
488 def deprecated(func):
489 """This is a decorator which can be used to mark functions
490 as deprecated. It will result in a warning being emitted
491 when the function is used.
495 @functools.wraps(func)
496 def wrapper_deprecated(*args, **kwargs):
497 msg = f"Call to deprecated function {func.__qualname__}"
499 warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
500 print(msg, file=sys.stderr)
501 return func(*args, **kwargs)
503 return wrapper_deprecated
508 Make a function immediately return a function of no args which,
509 when called, waits for the result, which will start being
510 processed in another thread.
513 @functools.wraps(func)
514 def lazy_thunked(*args, **kwargs):
515 wait_event = threading.Event()
522 func_result = func(*args, **kwargs)
523 result[0] = func_result
526 exc[1] = sys.exc_info() # (type, value, traceback)
527 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
535 raise exc[1][0](exc[1][1])
538 threading.Thread(target=worker_func).start()
544 ############################################################
546 ############################################################
548 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
550 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
553 def _raise_exception(exception, error_message: Optional[str]):
554 if error_message is None:
557 raise Exception(error_message)
560 def _target(queue, function, *args, **kwargs):
561 """Run a function with arguments and return output via a queue.
563 This is a helper function for the Process created in _Timeout. It runs
564 the function with positional arguments and keyword arguments and then
565 returns the function's output by way of a queue. If an exception gets
566 raised, it is returned to _Timeout to be raised by the value property.
569 queue.put((True, function(*args, **kwargs)))
571 queue.put((False, sys.exc_info()[1]))
574 class _Timeout(object):
575 """Wrap a function and add a timeout to it.
577 Instances of this class are automatically generated by the add_timeout
578 function defined below. Do not use directly.
584 timeout_exception: Exception,
588 self.__limit = seconds
589 self.__function = function
590 self.__timeout_exception = timeout_exception
591 self.__error_message = error_message
592 self.__name__ = function.__name__
593 self.__doc__ = function.__doc__
594 self.__timeout = time.time()
595 self.__process = multiprocessing.Process()
596 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
598 def __call__(self, *args, **kwargs):
599 """Execute the embedded function object asynchronously.
601 The function given to the constructor is transparently called and
602 requires that "ready" be intermittently polled. If and when it is
603 True, the "value" property may then be checked for returned data.
605 self.__limit = kwargs.pop("timeout", self.__limit)
606 self.__queue = multiprocessing.Queue(1)
607 args = (self.__queue, self.__function) + args
608 self.__process = multiprocessing.Process(
609 target=_target, args=args, kwargs=kwargs
611 self.__process.daemon = True
612 self.__process.start()
613 if self.__limit is not None:
614 self.__timeout = self.__limit + time.time()
615 while not self.ready:
620 """Terminate any possible execution of the embedded function."""
621 if self.__process.is_alive():
622 self.__process.terminate()
623 _raise_exception(self.__timeout_exception, self.__error_message)
627 """Read-only property indicating status of "value" property."""
628 if self.__limit and self.__timeout < time.time():
630 return self.__queue.full() and not self.__queue.empty()
634 """Read-only property containing data returned from function."""
635 if self.ready is True:
636 flag, load = self.__queue.get()
643 seconds: float = 1.0,
644 use_signals: Optional[bool] = None,
645 timeout_exception=exceptions.TimeoutError,
646 error_message="Function call timed out",
648 """Add a timeout parameter to a function and return the function.
650 Note: the use_signals parameter is included in order to support
651 multiprocessing scenarios (signal can only be used from the process'
652 main thread). When not using signals, timeout granularity will be
653 rounded to the nearest 0.1s.
655 Raises an exception when/if the timeout is reached.
657 It is illegal to pass anything other than a function as the first
658 parameter. The function is wrapped and returned to the caller.
661 ... def foo(delay: float):
662 ... time.sleep(delay)
669 Traceback (most recent call last):
671 Exception: Function call timed out
674 if use_signals is None:
677 use_signals = thread_utils.is_current_thread_main_thread()
679 def decorate(function):
682 def handler(signum, frame):
683 _raise_exception(timeout_exception, error_message)
685 @functools.wraps(function)
686 def new_function(*args, **kwargs):
687 new_seconds = kwargs.pop("timeout", seconds)
689 old = signal.signal(signal.SIGALRM, handler)
690 signal.setitimer(signal.ITIMER_REAL, new_seconds)
693 return function(*args, **kwargs)
696 return function(*args, **kwargs)
699 signal.setitimer(signal.ITIMER_REAL, 0)
700 signal.signal(signal.SIGALRM, old)
705 @functools.wraps(function)
706 def new_function(*args, **kwargs):
707 timeout_wrapper = _Timeout(
708 function, timeout_exception, error_message, seconds
710 return timeout_wrapper(*args, **kwargs)
717 class non_reentrant_code(object):
719 self._lock = threading.RLock
720 self._entered = False
722 def __call__(self, f):
723 def _gatekeeper(*args, **kwargs):
729 self._entered = False
734 class rlocked(object):
736 self._lock = threading.RLock
737 self._entered = False
739 def __call__(self, f):
740 def _gatekeeper(*args, **kwargs):
746 self._entered = False
751 def call_with_sample_rate(sample_rate: float) -> Callable:
752 if not 0.0 <= sample_rate <= 1.0:
753 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
755 raise ValueError(msg)
759 def _call_with_sample_rate(*args, **kwargs):
760 if random.uniform(0, 1) < sample_rate:
761 return f(*args, **kwargs)
764 f"@call_with_sample_rate skipping a call to {f.__name__}"
767 return _call_with_sample_rate
772 def decorate_matching_methods_with(decorator, acl=None):
773 """Apply decorator to all methods in a class whose names begin with
774 prefix. If prefix is None (default), decorate all methods in the
778 def decorate_the_class(cls):
779 for name, m in inspect.getmembers(cls, inspect.isfunction):
781 setattr(cls, name, decorator(m))
784 setattr(cls, name, decorator(m))
787 return decorate_the_class
790 if __name__ == '__main__':