X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=1e0fe18c4063b285e84d561b30b39e5b4245d001;hb=b3bbc27b2ec65b823f8dc3994f02dc3997c2ba1f;hp=76faec6f7be2e8d52f3ed9c7e05e1dc6048d666b;hpb=a08ca309cb5bd7971210a9247a38c9bbe376a6e6;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index 76faec6..1e0fe18 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -2,7 +2,6 @@ """Decorators.""" -import datetime import enum import functools import inspect @@ -15,7 +14,7 @@ import sys import threading import time import traceback -from typing import Callable, Optional +from typing import Any, Callable, Optional import warnings # This module is commonly used by others in here and should avoid @@ -47,21 +46,95 @@ def invocation_logged(func: Callable) -> Callable: @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Entered {func.__name__}" + msg = f"Entered {func.__qualname__}" print(msg) logger.info(msg) ret = func(*args, **kwargs) - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Exited {func.__name__}" + msg = f"Exited {func.__qualname__}" print(msg) logger.info(msg) return ret return wrapper_invocation_logged +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: + """Limit invocation of a wrapped function to n calls per period. + Thread safe. In testing this was relatively fair with multiple + threads using it though that hasn't been measured. + + >>> import time + >>> import decorator_utils + >>> import thread_utils + + >>> calls = 0 + + >>> @decorator_utils.rate_limited(1, per_period_in_seconds=1.0) + ... def limited(x: int): + ... global calls + ... calls += 1 + + >>> @thread_utils.background_thread + ... def a(stop): + ... for _ in range(3): + ... limited(_) + + >>> @thread_utils.background_thread + ... def b(stop): + ... for _ in range(3): + ... limited(_) + + >>> start = time.time() + >>> (t1, e1) = a() + >>> (t2, e2) = b() + >>> t1.join() + >>> t2.join() + >>> end = time.time() + >>> dur = end - start + >>> dur > 5.0 + True + + >>> calls + 6 + + """ + min_interval_seconds = per_period_in_seconds / float(n_calls) + + def wrapper_rate_limited(func: Callable) -> Callable: + cv = threading.Condition() + last_invocation_timestamp = [0.0] + + def may_proceed() -> float: + now = time.time() + last_invocation = last_invocation_timestamp[0] + if last_invocation != 0.0: + elapsed_since_last = now - last_invocation + wait_time = min_interval_seconds - elapsed_since_last + else: + wait_time = 0.0 + logger.debug(f'@{time.time()}> wait_time = {wait_time}') + return wait_time + + def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: + with cv: + while True: + if cv.wait_for( + lambda: may_proceed() <= 0.0, + timeout=may_proceed(), + ): + break + with cv: + logger.debug(f'@{time.time()}> calling it...') + ret = func(*args, **kargs) + last_invocation_timestamp[0] = time.time() + logger.debug( + f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}' + ) + cv.notify() + return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited + + def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" @@ -297,8 +370,8 @@ def thunkify(func): exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" - logger.warning(msg) print(msg) + logger.warning(msg) finally: wait_event.set() @@ -435,7 +508,6 @@ def timeout( use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): - if use_signals: def handler(signum, frame): @@ -539,3 +611,10 @@ def decorate_matching_methods_with(decorator, acl=None): setattr(cls, name, decorator(m)) return cls return decorate_the_class + + +if __name__ == '__main__': + import doctest + doctest.testmod() + +