X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=68a9d69633f6babe78c80153753d5c4a41c150af;hb=309ddadb402e09bf09bc38ca455dcaef7e08619e;hp=daae64e75348e973dc8a27cf387faf7f404ef2b2;hpb=36fea7f15ed17150691b5b3ead75450e575229ef;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index daae64e..68a9d69 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -14,14 +14,13 @@ import sys import threading import time import traceback -from typing import Any, Callable, Optional import warnings +from typing import Any, Callable, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import exceptions - logger = logging.getLogger(__name__) @@ -80,9 +79,7 @@ def invocation_logged(func: Callable) -> Callable: return wrapper_invocation_logged -def rate_limited( - n_calls: int, *, per_period_in_seconds: float = 1.0 -) -> Callable: +def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: """Limit invocation of a wrapped function to n calls per period. Thread safe. In testing this was relatively fair with multiple threads using it though that hasn't been measured. @@ -151,9 +148,7 @@ def rate_limited( logger.debug(f'@{time.time()}> calling it...') ret = func(*args, **kargs) last_invocation_timestamp[0] = time.time() - logger.debug( - f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}' - ) + logger.debug(f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}') cv.notify() return ret @@ -220,14 +215,12 @@ def debug_count_calls(func: Callable) -> Callable: @functools.wraps(func) def wrapper_debug_count_calls(*args, **kwargs): wrapper_debug_count_calls.num_calls += 1 - msg = ( - f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}" - ) + msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}" print(msg) logger.info(msg) return func(*args, **kwargs) - wrapper_debug_count_calls.num_calls = 0 + wrapper_debug_count_calls.num_calls = 0 # type: ignore return wrapper_debug_count_calls @@ -266,15 +259,11 @@ def delay( @functools.wraps(func) def wrapper_delay(*args, **kwargs): if when & DelayWhen.BEFORE_CALL: - logger.debug( - f"@delay for {seconds}s BEFORE_CALL to {func.__name__}" - ) + logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}") time.sleep(seconds) retval = func(*args, **kwargs) if when & DelayWhen.AFTER_CALL: - logger.debug( - f"@delay for {seconds}s AFTER_CALL to {func.__name__}" - ) + logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}") time.sleep(seconds) return retval @@ -299,9 +288,7 @@ class _SingletonWrapper: def __call__(self, *args, **kwargs): """Returns a single instance of decorated class""" - logger.debug( - f"@singleton returning global instance of {self.__wrapped__.__name__}" - ) + logger.debug(f"@singleton returning global instance of {self.__wrapped__.__name__}") if self._instance is None: self._instance = self.__wrapped__(*args, **kwargs) return self._instance @@ -368,15 +355,13 @@ def memoized(func: Callable) -> Callable: cache_key = args + tuple(kwargs.items()) if cache_key not in wrapper_memoized.cache: value = func(*args, **kwargs) - logger.debug( - f"Memoizing {cache_key} => {value} for {func.__name__}" - ) + logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}") wrapper_memoized.cache[cache_key] = value else: logger.debug(f"Returning memoized value for {func.__name__}") return wrapper_memoized.cache[cache_key] - wrapper_memoized.cache = dict() + wrapper_memoized.cache = dict() # type: ignore return wrapper_memoized @@ -605,9 +590,7 @@ class _Timeout(object): self.__limit = kwargs.pop("timeout", self.__limit) self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args - self.__process = multiprocessing.Process( - target=_target, args=args, kwargs=kwargs - ) + self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() if self.__limit is not None: @@ -704,9 +687,7 @@ def timeout( @functools.wraps(function) def new_function(*args, **kwargs): - timeout_wrapper = _Timeout( - function, timeout_exception, error_message, seconds - ) + timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds) return timeout_wrapper(*args, **kwargs) return new_function @@ -714,38 +695,19 @@ def timeout( return decorate -class non_reentrant_code(object): - def __init__(self): - self._lock = threading.RLock - self._entered = False - - def __call__(self, f): - def _gatekeeper(*args, **kwargs): - with self._lock: - if self._entered: - return - self._entered = True - f(*args, **kwargs) - self._entered = False +def synchronized(lock): + def wrap(f): + @functools.wraps(f) + def _gatekeeper(*args, **kw): + lock.acquire() + try: + return f(*args, **kw) + finally: + lock.release() return _gatekeeper - -class rlocked(object): - def __init__(self): - self._lock = threading.RLock - self._entered = False - - def __call__(self, f): - def _gatekeeper(*args, **kwargs): - with self._lock: - if self._entered: - return - self._entered = True - f(*args, **kwargs) - self._entered = False - - return _gatekeeper + return wrap def call_with_sample_rate(sample_rate: float) -> Callable: @@ -760,9 +722,7 @@ def call_with_sample_rate(sample_rate: float) -> Callable: if random.uniform(0, 1) < sample_rate: return f(*args, **kwargs) else: - logger.debug( - f"@call_with_sample_rate skipping a call to {f.__name__}" - ) + logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}") return _call_with_sample_rate