10 import multiprocessing
18 from typing import Any, Callable, Optional
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
24 logger = logging.getLogger(__name__)
27 def timed(func: Callable) -> Callable:
28 """Print the runtime of the decorated function.
35 >>> foo() # doctest: +ELLIPSIS
40 @functools.wraps(func)
41 def wrapper_timer(*args, **kwargs):
42 start_time = time.perf_counter()
43 value = func(*args, **kwargs)
44 end_time = time.perf_counter()
45 run_time = end_time - start_time
46 msg = f"Finished {func.__qualname__} in {run_time:.4f}s"
54 def invocation_logged(func: Callable) -> Callable:
55 """Log the call of a function.
57 >>> @invocation_logged
59 ... print('Hello, world.')
68 @functools.wraps(func)
69 def wrapper_invocation_logged(*args, **kwargs):
70 msg = f"Entered {func.__qualname__}"
73 ret = func(*args, **kwargs)
74 msg = f"Exited {func.__qualname__}"
79 return wrapper_invocation_logged
82 def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
83 """Limit invocation of a wrapped function to n calls per period.
84 Thread safe. In testing this was relatively fair with multiple
85 threads using it though that hasn't been measured.
88 >>> import decorator_utils
89 >>> import thread_utils
93 >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
94 ... def limited(x: int):
98 >>> @thread_utils.background_thread
100 ... for _ in range(3):
103 >>> @thread_utils.background_thread
105 ... for _ in range(3):
108 >>> start = time.time()
113 >>> end = time.time()
114 >>> dur = end - start
122 min_interval_seconds = per_period_in_seconds / float(n_calls)
124 def wrapper_rate_limited(func: Callable) -> Callable:
125 cv = threading.Condition()
126 last_invocation_timestamp = [0.0]
128 def may_proceed() -> float:
130 last_invocation = last_invocation_timestamp[0]
131 if last_invocation != 0.0:
132 elapsed_since_last = now - last_invocation
133 wait_time = min_interval_seconds - elapsed_since_last
136 logger.debug(f'@{time.time()}> wait_time = {wait_time}')
139 def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
143 lambda: may_proceed() <= 0.0,
144 timeout=may_proceed(),
148 logger.debug(f'@{time.time()}> calling it...')
149 ret = func(*args, **kargs)
150 last_invocation_timestamp[0] = time.time()
152 f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
157 return wrapper_wrapper_rate_limited
159 return wrapper_rate_limited
162 def debug_args(func: Callable) -> Callable:
163 """Print the function signature and return value at each call.
166 ... def foo(a, b, c):
170 ... return (a + b, c)
172 >>> foo(1, 2.0, "test")
173 Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>)
177 foo returned (3.0, 'test'):<class 'tuple'>
181 @functools.wraps(func)
182 def wrapper_debug_args(*args, **kwargs):
183 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
184 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
185 signature = ", ".join(args_repr + kwargs_repr)
186 msg = f"Calling {func.__qualname__}({signature})"
189 value = func(*args, **kwargs)
190 msg = f"{func.__qualname__} returned {value!r}:{type(value)}"
195 return wrapper_debug_args
198 def debug_count_calls(func: Callable) -> Callable:
199 """Count function invocations and print a message befor every call.
201 >>> @debug_count_calls
205 ... return x * factoral(x - 1)
208 Call #1 of 'factoral'
209 Call #2 of 'factoral'
210 Call #3 of 'factoral'
211 Call #4 of 'factoral'
212 Call #5 of 'factoral'
217 @functools.wraps(func)
218 def wrapper_debug_count_calls(*args, **kwargs):
219 wrapper_debug_count_calls.num_calls += 1
220 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
223 return func(*args, **kwargs)
225 wrapper_debug_count_calls.num_calls = 0 # type: ignore
226 return wrapper_debug_count_calls
229 class DelayWhen(enum.IntEnum):
236 _func: Callable = None,
238 seconds: float = 1.0,
239 when: DelayWhen = DelayWhen.BEFORE_CALL,
241 """Delay the execution of a function by sleeping before and/or after.
243 Slow down a function by inserting a delay before and/or after its
248 >>> @delay(seconds=1.0)
252 >>> start = time.time()
254 >>> dur = time.time() - start
260 def decorator_delay(func: Callable) -> Callable:
261 @functools.wraps(func)
262 def wrapper_delay(*args, **kwargs):
263 if when & DelayWhen.BEFORE_CALL:
264 logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}")
266 retval = func(*args, **kwargs)
267 if when & DelayWhen.AFTER_CALL:
268 logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}")
275 return decorator_delay
277 return decorator_delay(_func)
280 class _SingletonWrapper:
282 A singleton wrapper class. Its instances would be created
283 for each decorated class.
287 def __init__(self, cls):
288 self.__wrapped__ = cls
289 self._instance = None
291 def __call__(self, *args, **kwargs):
292 """Returns a single instance of decorated class"""
294 f"@singleton returning global instance of {self.__wrapped__.__name__}"
296 if self._instance is None:
297 self._instance = self.__wrapped__(*args, **kwargs)
298 return self._instance
303 A singleton decorator. Returns a wrapper objects. A call on that object
304 returns a single instance object of decorated class. Use the __wrapped__
305 attribute to access decorated class directly in unit tests
308 ... class foo(object):
320 return _SingletonWrapper(cls)
323 def memoized(func: Callable) -> Callable:
324 """Keep a cache of previous function call results.
326 The cache here is a dict with a key based on the arguments to the
327 call. Consider also: functools.lru_cache for a more advanced
333 ... def expensive(arg) -> int:
334 ... # Simulate something slow to compute or lookup
338 >>> start = time.time()
339 >>> expensive(5) # Takes about 1 sec
342 >>> expensive(3) # Also takes about 1 sec
345 >>> expensive(5) # Pulls from cache, fast
348 >>> expensive(3) # Pulls from cache again, fast
351 >>> dur = time.time() - start
357 @functools.wraps(func)
358 def wrapper_memoized(*args, **kwargs):
359 cache_key = args + tuple(kwargs.items())
360 if cache_key not in wrapper_memoized.cache:
361 value = func(*args, **kwargs)
362 logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}")
363 wrapper_memoized.cache[cache_key] = value
365 logger.debug(f"Returning memoized value for {func.__name__}")
366 return wrapper_memoized.cache[cache_key]
368 wrapper_memoized.cache = dict() # type: ignore
369 return wrapper_memoized
375 predicate: Callable[..., bool],
376 delay_sec: float = 3.0,
377 backoff: float = 2.0,
379 """Retries a function or method up to a certain number of times
380 with a prescribed initial delay period and backoff rate.
382 tries is the maximum number of attempts to run the function.
383 delay_sec sets the initial delay period in seconds.
384 backoff is a multiplied (must be >1) used to modify the delay.
385 predicate is a function that will be passed the retval of the
386 decorated function and must return True to stop or False to
391 msg = f"backoff must be greater than or equal to 1, got {backoff}"
393 raise ValueError(msg)
395 tries = math.floor(tries)
397 msg = f"tries must be 0 or greater, got {tries}"
399 raise ValueError(msg)
402 msg = f"delay_sec must be greater than 0, got {delay_sec}"
404 raise ValueError(msg)
408 def f_retry(*args, **kwargs):
409 mtries, mdelay = tries, delay_sec # make mutable
410 logger.debug(f'deco_retry: will make up to {mtries} attempts...')
411 retval = f(*args, **kwargs)
413 if predicate(retval) is True:
414 logger.debug('Predicate succeeded, deco_retry is done.')
416 logger.debug("Predicate failed, sleeping and retrying.")
420 retval = f(*args, **kwargs)
428 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
429 """A helper for @retry_predicate that retries a decorated
430 function as long as it keeps returning False.
436 >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
440 ... return counter >= 3
442 >>> start = time.time()
443 >>> foo() # fail, delay 1.0, fail, delay 1.1, succeed
446 >>> dur = time.time() - start
455 return retry_predicate(
457 predicate=lambda x: x is True,
463 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
464 """Another helper for @retry_predicate above. Retries up to N
465 times so long as the wrapped function returns None with a delay
466 between each retry and a backoff that can increase the delay.
469 return retry_predicate(
471 predicate=lambda x: x is not None,
477 def deprecated(func):
478 """This is a decorator which can be used to mark functions
479 as deprecated. It will result in a warning being emitted
480 when the function is used.
484 @functools.wraps(func)
485 def wrapper_deprecated(*args, **kwargs):
486 msg = f"Call to deprecated function {func.__qualname__}"
488 warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
489 print(msg, file=sys.stderr)
490 return func(*args, **kwargs)
492 return wrapper_deprecated
497 Make a function immediately return a function of no args which,
498 when called, waits for the result, which will start being
499 processed in another thread.
502 @functools.wraps(func)
503 def lazy_thunked(*args, **kwargs):
504 wait_event = threading.Event()
511 func_result = func(*args, **kwargs)
512 result[0] = func_result
515 exc[1] = sys.exc_info() # (type, value, traceback)
516 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
524 raise exc[1][0](exc[1][1])
527 threading.Thread(target=worker_func).start()
533 ############################################################
535 ############################################################
537 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
539 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
542 def _raise_exception(exception, error_message: Optional[str]):
543 if error_message is None:
546 raise Exception(error_message)
549 def _target(queue, function, *args, **kwargs):
550 """Run a function with arguments and return output via a queue.
552 This is a helper function for the Process created in _Timeout. It runs
553 the function with positional arguments and keyword arguments and then
554 returns the function's output by way of a queue. If an exception gets
555 raised, it is returned to _Timeout to be raised by the value property.
558 queue.put((True, function(*args, **kwargs)))
560 queue.put((False, sys.exc_info()[1]))
563 class _Timeout(object):
564 """Wrap a function and add a timeout to it.
566 Instances of this class are automatically generated by the add_timeout
567 function defined below. Do not use directly.
573 timeout_exception: Exception,
577 self.__limit = seconds
578 self.__function = function
579 self.__timeout_exception = timeout_exception
580 self.__error_message = error_message
581 self.__name__ = function.__name__
582 self.__doc__ = function.__doc__
583 self.__timeout = time.time()
584 self.__process = multiprocessing.Process()
585 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
587 def __call__(self, *args, **kwargs):
588 """Execute the embedded function object asynchronously.
590 The function given to the constructor is transparently called and
591 requires that "ready" be intermittently polled. If and when it is
592 True, the "value" property may then be checked for returned data.
594 self.__limit = kwargs.pop("timeout", self.__limit)
595 self.__queue = multiprocessing.Queue(1)
596 args = (self.__queue, self.__function) + args
597 self.__process = multiprocessing.Process(
598 target=_target, args=args, kwargs=kwargs
600 self.__process.daemon = True
601 self.__process.start()
602 if self.__limit is not None:
603 self.__timeout = self.__limit + time.time()
604 while not self.ready:
609 """Terminate any possible execution of the embedded function."""
610 if self.__process.is_alive():
611 self.__process.terminate()
612 _raise_exception(self.__timeout_exception, self.__error_message)
616 """Read-only property indicating status of "value" property."""
617 if self.__limit and self.__timeout < time.time():
619 return self.__queue.full() and not self.__queue.empty()
623 """Read-only property containing data returned from function."""
624 if self.ready is True:
625 flag, load = self.__queue.get()
632 seconds: float = 1.0,
633 use_signals: Optional[bool] = None,
634 timeout_exception=exceptions.TimeoutError,
635 error_message="Function call timed out",
637 """Add a timeout parameter to a function and return the function.
639 Note: the use_signals parameter is included in order to support
640 multiprocessing scenarios (signal can only be used from the process'
641 main thread). When not using signals, timeout granularity will be
642 rounded to the nearest 0.1s.
644 Raises an exception when/if the timeout is reached.
646 It is illegal to pass anything other than a function as the first
647 parameter. The function is wrapped and returned to the caller.
650 ... def foo(delay: float):
651 ... time.sleep(delay)
658 Traceback (most recent call last):
660 Exception: Function call timed out
663 if use_signals is None:
666 use_signals = thread_utils.is_current_thread_main_thread()
668 def decorate(function):
671 def handler(signum, frame):
672 _raise_exception(timeout_exception, error_message)
674 @functools.wraps(function)
675 def new_function(*args, **kwargs):
676 new_seconds = kwargs.pop("timeout", seconds)
678 old = signal.signal(signal.SIGALRM, handler)
679 signal.setitimer(signal.ITIMER_REAL, new_seconds)
682 return function(*args, **kwargs)
685 return function(*args, **kwargs)
688 signal.setitimer(signal.ITIMER_REAL, 0)
689 signal.signal(signal.SIGALRM, old)
694 @functools.wraps(function)
695 def new_function(*args, **kwargs):
696 timeout_wrapper = _Timeout(
697 function, timeout_exception, error_message, seconds
699 return timeout_wrapper(*args, **kwargs)
706 def synchronized(lock):
709 def _gatekeeper(*args, **kw):
712 return f(*args, **kw)
721 def call_with_sample_rate(sample_rate: float) -> Callable:
722 if not 0.0 <= sample_rate <= 1.0:
723 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
725 raise ValueError(msg)
729 def _call_with_sample_rate(*args, **kwargs):
730 if random.uniform(0, 1) < sample_rate:
731 return f(*args, **kwargs)
733 logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}")
735 return _call_with_sample_rate
740 def decorate_matching_methods_with(decorator, acl=None):
741 """Apply decorator to all methods in a class whose names begin with
742 prefix. If prefix is None (default), decorate all methods in the
746 def decorate_the_class(cls):
747 for name, m in inspect.getmembers(cls, inspect.isfunction):
749 setattr(cls, name, decorator(m))
752 setattr(cls, name, decorator(m))
755 return decorate_the_class
758 if __name__ == '__main__':