X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=70a88d37ad0dbad37edff45aba0130dcc5a26271;hb=44a7740bf6fe18673f8636658256a227e0622880;hp=76faec6f7be2e8d52f3ed9c7e05e1dc6048d666b;hpb=a08ca309cb5bd7971210a9247a38c9bbe376a6e6;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index 76faec6..70a88d3 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -2,7 +2,6 @@ """Decorators.""" -import datetime import enum import functools import inspect @@ -15,7 +14,7 @@ import sys import threading import time import traceback -from typing import Callable, Optional +from typing import Any, Callable, Optional, Tuple import warnings # This module is commonly used by others in here and should avoid @@ -47,21 +46,56 @@ def invocation_logged(func: Callable) -> Callable: @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Entered {func.__name__}" + msg = f"Entered {func.__qualname__}" print(msg) logger.info(msg) ret = func(*args, **kwargs) - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Exited {func.__name__}" + msg = f"Exited {func.__qualname__}" print(msg) logger.info(msg) return ret return wrapper_invocation_logged +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: + """Limit invocation of a wrapped function to n calls per period. + Thread safe. In testing this was relatively fair with multiple + threads using it though that hasn't been measured. + + """ + min_interval_seconds = per_period_in_seconds / float(n_calls) + + def wrapper_rate_limited(func: Callable) -> Callable: + cv = threading.Condition() + last_invocation_timestamp = [0.0] + + def may_proceed() -> float: + now = time.time() + last_invocation = last_invocation_timestamp[0] + if last_invocation != 0.0: + elapsed_since_last = now - last_invocation + wait_time = min_interval_seconds - elapsed_since_last + else: + wait_time = 0.0 + return wait_time + + def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: + with cv: + while True: + cv.wait_for( + lambda: may_proceed() <= 0.0, + timeout=may_proceed(), + ) + break + ret = func(*args, **kargs) + with cv: + last_invocation_timestamp[0] = time.time() + cv.notify() + return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited + + def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" @@ -297,8 +331,8 @@ def thunkify(func): exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" - logger.warning(msg) print(msg) + logger.warning(msg) finally: wait_event.set() @@ -435,7 +469,6 @@ def timeout( use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): - if use_signals: def handler(signum, frame):