X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=70a88d37ad0dbad37edff45aba0130dcc5a26271;hb=44a7740bf6fe18673f8636658256a227e0622880;hp=2817239c88c2396b0e5dcc56e7c535b8afdd99d9;hpb=09e6d10face80d98a4578ff54192b5c8bec007d7;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index 2817239..70a88d3 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -2,7 +2,6 @@ """Decorators.""" -import datetime import enum import functools import inspect @@ -15,7 +14,7 @@ import sys import threading import time import traceback -from typing import Callable, Optional +from typing import Any, Callable, Optional, Tuple import warnings # This module is commonly used by others in here and should avoid @@ -47,21 +46,56 @@ def invocation_logged(func: Callable) -> Callable: @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Entered {func.__name__}" + msg = f"Entered {func.__qualname__}" print(msg) logger.info(msg) ret = func(*args, **kwargs) - now = datetime.datetime.now() - ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") - msg = f"[{ts}]: Exited {func.__name__}" + msg = f"Exited {func.__qualname__}" print(msg) logger.info(msg) return ret return wrapper_invocation_logged +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: + """Limit invocation of a wrapped function to n calls per period. + Thread safe. In testing this was relatively fair with multiple + threads using it though that hasn't been measured. + + """ + min_interval_seconds = per_period_in_seconds / float(n_calls) + + def wrapper_rate_limited(func: Callable) -> Callable: + cv = threading.Condition() + last_invocation_timestamp = [0.0] + + def may_proceed() -> float: + now = time.time() + last_invocation = last_invocation_timestamp[0] + if last_invocation != 0.0: + elapsed_since_last = now - last_invocation + wait_time = min_interval_seconds - elapsed_since_last + else: + wait_time = 0.0 + return wait_time + + def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: + with cv: + while True: + cv.wait_for( + lambda: may_proceed() <= 0.0, + timeout=may_proceed(), + ) + break + ret = func(*args, **kargs) + with cv: + last_invocation_timestamp[0] = time.time() + cv.notify() + return ret + return wrapper_wrapper_rate_limited + return wrapper_rate_limited + + def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" @@ -192,7 +226,7 @@ def retry_predicate( tries: int, *, predicate: Callable[..., bool], - delay_sec: float = 3, + delay_sec: float = 3.0, backoff: float = 2.0, ): """Retries a function or method up to a certain number of times @@ -202,10 +236,10 @@ def retry_predicate( delay_sec sets the initial delay period in seconds. backoff is a multiplied (must be >1) used to modify the delay. predicate is a function that will be passed the retval of the - decorated function and must return True to stop or False to - retry. + decorated function and must return True to stop or False to + retry. """ - if backoff < 1: + if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) raise ValueError(msg) @@ -225,9 +259,11 @@ def retry_predicate( @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable + logger.debug(f'deco_retry: will make up to {mtries} attempts...') retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: + logger.debug('Predicate succeeded, deco_retry is done.') return retval logger.debug("Predicate failed, sleeping and retrying.") mtries -= 1 @@ -295,8 +331,8 @@ def thunkify(func): exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" - logger.warning(msg) print(msg) + logger.warning(msg) finally: wait_event.set() @@ -338,7 +374,7 @@ def _target(queue, function, *args, **kwargs): """ try: queue.put((True, function(*args, **kwargs))) - except: + except Exception: queue.put((False, sys.exc_info()[1])) @@ -433,7 +469,6 @@ def timeout( use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): - if use_signals: def handler(signum, frame):