X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=a5c5afecb34c005d16a351cc703f44dc561b567d;hb=31c81f6539969a5eba864d3305f9fb7bf716a367;hp=1e0fe18c4063b285e84d561b30b39e5b4245d001;hpb=b3bbc27b2ec65b823f8dc3994f02dc3997c2ba1f;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index 1e0fe18..a5c5afe 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -14,19 +14,28 @@ import sys import threading import time import traceback -from typing import Any, Callable, Optional import warnings +from typing import Any, Callable, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import exceptions - logger = logging.getLogger(__name__) def timed(func: Callable) -> Callable: - """Print the runtime of the decorated function.""" + """Print the runtime of the decorated function. + + >>> @timed + ... def foo(): + ... import time + ... time.sleep(0.1) + + >>> foo() # doctest: +ELLIPSIS + Finished foo in ... + + """ @functools.wraps(func) def wrapper_timer(*args, **kwargs): @@ -34,15 +43,27 @@ def timed(func: Callable) -> Callable: value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time - msg = f"Finished {func.__name__!r} in {run_time:.4f}s" + msg = f"Finished {func.__qualname__} in {run_time:.4f}s" print(msg) logger.info(msg) return value + return wrapper_timer def invocation_logged(func: Callable) -> Callable: - """Log the call of a function.""" + """Log the call of a function. + + >>> @invocation_logged + ... def foo(): + ... print('Hello, world.') + + >>> foo() + Entered foo + Hello, world. + Exited foo + + """ @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): @@ -54,6 +75,7 @@ def invocation_logged(func: Callable) -> Callable: print(msg) logger.info(msg) return ret + return wrapper_invocation_logged @@ -68,7 +90,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl >>> calls = 0 - >>> @decorator_utils.rate_limited(1, per_period_in_seconds=1.0) + >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0) ... def limited(x: int): ... global calls ... calls += 1 @@ -90,7 +112,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl >>> t2.join() >>> end = time.time() >>> dur = end - start - >>> dur > 5.0 + >>> dur > 0.5 True >>> calls @@ -131,30 +153,66 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl ) cv.notify() return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited def debug_args(func: Callable) -> Callable: - """Print the function signature and return value at each call.""" + """Print the function signature and return value at each call. + + >>> @debug_args + ... def foo(a, b, c): + ... print(a) + ... print(b) + ... print(c) + ... return (a + b, c) + + >>> foo(1, 2.0, "test") + Calling foo(1:, 2.0:, 'test':) + 1 + 2.0 + test + foo returned (3.0, 'test'): + (3.0, 'test') + """ @functools.wraps(func) def wrapper_debug_args(*args, **kwargs): args_repr = [f"{repr(a)}:{type(a)}" for a in args] kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()] signature = ", ".join(args_repr + kwargs_repr) - msg = f"Calling {func.__name__}({signature})" + msg = f"Calling {func.__qualname__}({signature})" print(msg) logger.info(msg) value = func(*args, **kwargs) - msg = f"{func.__name__!r} returned {value!r}:{type(value)}" + msg = f"{func.__qualname__} returned {value!r}:{type(value)}" + print(msg) logger.info(msg) return value + return wrapper_debug_args def debug_count_calls(func: Callable) -> Callable: - """Count function invocations and print a message befor every call.""" + """Count function invocations and print a message befor every call. + + >>> @debug_count_calls + ... def factoral(x): + ... if x == 1: + ... return 1 + ... return x * factoral(x - 1) + + >>> factoral(5) + Call #1 of 'factoral' + Call #2 of 'factoral' + Call #3 of 'factoral' + Call #4 of 'factoral' + Call #5 of 'factoral' + 120 + + """ @functools.wraps(func) def wrapper_debug_count_calls(*args, **kwargs): @@ -163,11 +221,12 @@ def debug_count_calls(func: Callable) -> Callable: print(msg) logger.info(msg) return func(*args, **kwargs) - wrapper_debug_count_calls.num_calls = 0 + + wrapper_debug_count_calls.num_calls = 0 # type: ignore return wrapper_debug_count_calls -class DelayWhen(enum.Enum): +class DelayWhen(enum.IntEnum): BEFORE_CALL = 1 AFTER_CALL = 2 BEFORE_AND_AFTER = 3 @@ -183,23 +242,33 @@ def delay( Slow down a function by inserting a delay before and/or after its invocation. + + >>> import time + + >>> @delay(seconds=1.0) + ... def foo(): + ... pass + + >>> start = time.time() + >>> foo() + >>> dur = time.time() - start + >>> dur >= 1.0 + True + """ def decorator_delay(func: Callable) -> Callable: @functools.wraps(func) def wrapper_delay(*args, **kwargs): if when & DelayWhen.BEFORE_CALL: - logger.debug( - f"@delay for {seconds}s BEFORE_CALL to {func.__name__}" - ) + logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}") time.sleep(seconds) retval = func(*args, **kwargs) if when & DelayWhen.AFTER_CALL: - logger.debug( - f"@delay for {seconds}s AFTER_CALL to {func.__name__}" - ) + logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}") time.sleep(seconds) return retval + return wrapper_delay if _func is None: @@ -212,6 +281,7 @@ class _SingletonWrapper: """ A singleton wrapper class. Its instances would be created for each decorated class. + """ def __init__(self, cls): @@ -233,6 +303,19 @@ def singleton(cls): A singleton decorator. Returns a wrapper objects. A call on that object returns a single instance object of decorated class. Use the __wrapped__ attribute to access decorated class directly in unit tests + + >>> @singleton + ... class foo(object): + ... pass + + >>> a = foo() + >>> b = foo() + >>> a is b + True + + >>> id(a) == id(b) + True + """ return _SingletonWrapper(cls) @@ -243,6 +326,32 @@ def memoized(func: Callable) -> Callable: The cache here is a dict with a key based on the arguments to the call. Consider also: functools.lru_cache for a more advanced implementation. + + >>> import time + + >>> @memoized + ... def expensive(arg) -> int: + ... # Simulate something slow to compute or lookup + ... time.sleep(1.0) + ... return arg * arg + + >>> start = time.time() + >>> expensive(5) # Takes about 1 sec + 25 + + >>> expensive(3) # Also takes about 1 sec + 9 + + >>> expensive(5) # Pulls from cache, fast + 25 + + >>> expensive(3) # Pulls from cache again, fast + 9 + + >>> dur = time.time() - start + >>> dur < 3.0 + True + """ @functools.wraps(func) @@ -250,14 +359,13 @@ def memoized(func: Callable) -> Callable: cache_key = args + tuple(kwargs.items()) if cache_key not in wrapper_memoized.cache: value = func(*args, **kwargs) - logger.debug( - f"Memoizing {cache_key} => {value} for {func.__name__}" - ) + logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}") wrapper_memoized.cache[cache_key] = value else: logger.debug(f"Returning memoized value for {func.__name__}") return wrapper_memoized.cache[cache_key] - wrapper_memoized.cache = dict() + + wrapper_memoized.cache = dict() # type: ignore return wrapper_memoized @@ -277,6 +385,7 @@ def retry_predicate( predicate is a function that will be passed the retval of the decorated function and must return True to stop or False to retry. + """ if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" @@ -310,11 +419,39 @@ def retry_predicate( mdelay *= backoff retval = f(*args, **kwargs) return retval + return f_retry + return deco_retry def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0): + """A helper for @retry_predicate that retries a decorated + function as long as it keeps returning False. + + >>> import time + + >>> counter = 0 + + >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1) + ... def foo(): + ... global counter + ... counter += 1 + ... return counter >= 3 + + >>> start = time.time() + >>> foo() # fail, delay 1.0, fail, delay 1.1, succeed + True + + >>> dur = time.time() - start + >>> counter + 3 + >>> dur > 2.0 + True + >>> dur < 2.3 + True + + """ return retry_predicate( tries, predicate=lambda x: x is True, @@ -324,6 +461,11 @@ def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0): def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0): + """Another helper for @retry_predicate above. Retries up to N + times so long as the wrapped function returns None with a delay + between each retry and a backoff that can increase the delay. + + """ return retry_predicate( tries, predicate=lambda x: x is not None, @@ -336,13 +478,15 @@ def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used. + """ @functools.wraps(func) def wrapper_deprecated(*args, **kwargs): - msg = f"Call to deprecated function {func.__name__}" + msg = f"Call to deprecated function {func.__qualname__}" logger.warning(msg) - warnings.warn(msg, category=DeprecationWarning) + warnings.warn(msg, category=DeprecationWarning, stacklevel=2) + print(msg, file=sys.stderr) return func(*args, **kwargs) return wrapper_deprecated @@ -370,7 +514,6 @@ def thunkify(func): exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" - print(msg) logger.warning(msg) finally: wait_event.set() @@ -398,9 +541,9 @@ def thunkify(func): def _raise_exception(exception, error_message: Optional[str]): if error_message is None: - raise exception() + raise Exception() else: - raise exception(error_message) + raise Exception(error_message) def _target(queue, function, *args, **kwargs): @@ -418,10 +561,10 @@ def _target(queue, function, *args, **kwargs): class _Timeout(object): - """Wrap a function and add a timeout (limit) attribute to it. + """Wrap a function and add a timeout to it. Instances of this class are automatically generated by the add_timeout - function defined below. + function defined below. Do not use directly. """ def __init__( @@ -498,13 +641,28 @@ def timeout( main thread). When not using signals, timeout granularity will be rounded to the nearest 0.1s. - Raises an exception when the timeout is reached. + Raises an exception when/if the timeout is reached. It is illegal to pass anything other than a function as the first parameter. The function is wrapped and returned to the caller. + + >>> @timeout(0.2) + ... def foo(delay: float): + ... time.sleep(delay) + ... return "ok" + + >>> foo(0) + 'ok' + + >>> foo(1.0) + Traceback (most recent call last): + ... + Exception: Function call timed out + """ if use_signals is None: import thread_utils + use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): @@ -545,37 +703,19 @@ def timeout( return decorate -class non_reentrant_code(object): - def __init__(self): - self._lock = threading.RLock - self._entered = False - - def __call__(self, f): - def _gatekeeper(*args, **kwargs): - with self._lock: - if self._entered: - return - self._entered = True - f(*args, **kwargs) - self._entered = False +def synchronized(lock): + def wrap(f): + @functools.wraps(f) + def _gatekeeper(*args, **kw): + lock.acquire() + try: + return f(*args, **kw) + finally: + lock.release() return _gatekeeper - -class rlocked(object): - def __init__(self): - self._lock = threading.RLock - self._entered = False - - def __call__(self, f): - def _gatekeeper(*args, **kwargs): - with self._lock: - if self._entered: - return - self._entered = True - f(*args, **kwargs) - self._entered = False - return _gatekeeper + return wrap def call_with_sample_rate(sample_rate: float) -> Callable: @@ -590,10 +730,10 @@ def call_with_sample_rate(sample_rate: float) -> Callable: if random.uniform(0, 1) < sample_rate: return f(*args, **kwargs) else: - logger.debug( - f"@call_with_sample_rate skipping a call to {f.__name__}" - ) + logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}") + return _call_with_sample_rate + return decorator @@ -602,6 +742,7 @@ def decorate_matching_methods_with(decorator, acl=None): prefix. If prefix is None (default), decorate all methods in the class. """ + def decorate_the_class(cls): for name, m in inspect.getmembers(cls, inspect.isfunction): if acl is None: @@ -610,11 +751,11 @@ def decorate_matching_methods_with(decorator, acl=None): if acl(name): setattr(cls, name, decorator(m)) return cls + return decorate_the_class if __name__ == '__main__': import doctest - doctest.testmod() - + doctest.testmod()