X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=1e0fe18c4063b285e84d561b30b39e5b4245d001;hb=b3bbc27b2ec65b823f8dc3994f02dc3997c2ba1f;hp=03e7c880433fad5d359a2bb3acc29a4266204e65;hpb=e0973abee4c917127169795a56fd9c5c5412913c;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index 03e7c88..1e0fe18 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -2,9 +2,9 @@ """Decorators.""" -import datetime import enum import functools +import inspect import logging import math import multiprocessing @@ -14,11 +14,13 @@ import sys import threading import time import traceback -from typing import Callable, Optional +from typing import Any, Callable, Optional import warnings +# This module is commonly used by others in here and should avoid +# taking any unnecessary dependencies back on them. import exceptions -import thread_utils + logger = logging.getLogger(__name__) @@ -44,21 +46,95 @@ def invocation_logged(func: Callable) -> Callable: @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Entered {func.__name__}" + msg = f"Entered {func.__qualname__}" print(msg) logger.info(msg) ret = func(*args, **kwargs) - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Exited {func.__name__}" + msg = f"Exited {func.__qualname__}" print(msg) logger.info(msg) return ret return wrapper_invocation_logged +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: + """Limit invocation of a wrapped function to n calls per period. + Thread safe. In testing this was relatively fair with multiple + threads using it though that hasn't been measured. + + >>> import time + >>> import decorator_utils + >>> import thread_utils + + >>> calls = 0 + + >>> @decorator_utils.rate_limited(1, per_period_in_seconds=1.0) + ... def limited(x: int): + ... global calls + ... calls += 1 + + >>> @thread_utils.background_thread + ... def a(stop): + ... for _ in range(3): + ... limited(_) + + >>> @thread_utils.background_thread + ... def b(stop): + ... for _ in range(3): + ... limited(_) + + >>> start = time.time() + >>> (t1, e1) = a() + >>> (t2, e2) = b() + >>> t1.join() + >>> t2.join() + >>> end = time.time() + >>> dur = end - start + >>> dur > 5.0 + True + + >>> calls + 6 + + """ + min_interval_seconds = per_period_in_seconds / float(n_calls) + + def wrapper_rate_limited(func: Callable) -> Callable: + cv = threading.Condition() + last_invocation_timestamp = [0.0] + + def may_proceed() -> float: + now = time.time() + last_invocation = last_invocation_timestamp[0] + if last_invocation != 0.0: + elapsed_since_last = now - last_invocation + wait_time = min_interval_seconds - elapsed_since_last + else: + wait_time = 0.0 + logger.debug(f'@{time.time()}> wait_time = {wait_time}') + return wait_time + + def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: + with cv: + while True: + if cv.wait_for( + lambda: may_proceed() <= 0.0, + timeout=may_proceed(), + ): + break + with cv: + logger.debug(f'@{time.time()}> calling it...') + ret = func(*args, **kargs) + last_invocation_timestamp[0] = time.time() + logger.debug( + f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}' + ) + cv.notify() + return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited + + def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" @@ -189,7 +265,7 @@ def retry_predicate( tries: int, *, predicate: Callable[..., bool], - delay_sec: float = 3, + delay_sec: float = 3.0, backoff: float = 2.0, ): """Retries a function or method up to a certain number of times @@ -199,10 +275,10 @@ def retry_predicate( delay_sec sets the initial delay period in seconds. backoff is a multiplied (must be >1) used to modify the delay. predicate is a function that will be passed the retval of the - decorated function and must return True to stop or False to - retry. + decorated function and must return True to stop or False to + retry. """ - if backoff < 1: + if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) raise ValueError(msg) @@ -222,9 +298,11 @@ def retry_predicate( @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable + logger.debug(f'deco_retry: will make up to {mtries} attempts...') retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: + logger.debug('Predicate succeeded, deco_retry is done.') return retval logger.debug("Predicate failed, sleeping and retrying.") mtries -= 1 @@ -292,8 +370,8 @@ def thunkify(func): exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" - logger.warning(msg) print(msg) + logger.warning(msg) finally: wait_event.set() @@ -335,7 +413,7 @@ def _target(queue, function, *args, **kwargs): """ try: queue.put((True, function(*args, **kwargs))) - except: + except Exception: queue.put((False, sys.exc_info()[1])) @@ -426,10 +504,10 @@ def timeout( parameter. The function is wrapped and returned to the caller. """ if use_signals is None: + import thread_utils use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): - if use_signals: def handler(signum, frame): @@ -517,3 +595,26 @@ def call_with_sample_rate(sample_rate: float) -> Callable: ) return _call_with_sample_rate return decorator + + +def decorate_matching_methods_with(decorator, acl=None): + """Apply decorator to all methods in a class whose names begin with + prefix. If prefix is None (default), decorate all methods in the + class. + """ + def decorate_the_class(cls): + for name, m in inspect.getmembers(cls, inspect.isfunction): + if acl is None: + setattr(cls, name, decorator(m)) + else: + if acl(name): + setattr(cls, name, decorator(m)) + return cls + return decorate_the_class + + +if __name__ == '__main__': + import doctest + doctest.testmod() + +