X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=084e260bab370d9bbb93a1256ff84e98f96055bd;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=a956a214e2b9d5c2dad7bfb60ee95d528a49ccac;hpb=5e09a33068fcdf6d43f12477dd943e108e11ae06;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index a956a21..084e260 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch +# Portions (marked) below retain the original author's copyright. + """Decorators.""" import enum @@ -14,14 +17,13 @@ import sys import threading import time import traceback -from typing import Any, Callable, Optional import warnings +from typing import Any, Callable, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. import exceptions - logger = logging.getLogger(__name__) @@ -134,7 +136,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl wait_time = min_interval_seconds - elapsed_since_last else: wait_time = 0.0 - logger.debug(f'@{time.time()}> wait_time = {wait_time}') + logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time) return wait_time def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: @@ -146,11 +148,11 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl ): break with cv: - logger.debug(f'@{time.time()}> calling it...') + logger.debug('@%.4f> calling it...', time.time()) ret = func(*args, **kargs) last_invocation_timestamp[0] = time.time() logger.debug( - f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}' + '@%.4f> Last invocation <- %.4f', time.time(), last_invocation_timestamp[0] ) cv.notify() return ret @@ -223,11 +225,16 @@ def debug_count_calls(func: Callable) -> Callable: logger.info(msg) return func(*args, **kwargs) - wrapper_debug_count_calls.num_calls = 0 + wrapper_debug_count_calls.num_calls = 0 # type: ignore return wrapper_debug_count_calls class DelayWhen(enum.IntEnum): + """When should we delay: before or after calling the function (or + both)? + + """ + BEFORE_CALL = 1 AFTER_CALL = 2 BEFORE_AND_AFTER = 3 @@ -262,11 +269,11 @@ def delay( @functools.wraps(func) def wrapper_delay(*args, **kwargs): if when & DelayWhen.BEFORE_CALL: - logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}") + logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__) time.sleep(seconds) retval = func(*args, **kwargs) if when & DelayWhen.AFTER_CALL: - logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}") + logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__) time.sleep(seconds) return retval @@ -291,9 +298,7 @@ class _SingletonWrapper: def __call__(self, *args, **kwargs): """Returns a single instance of decorated class""" - logger.debug( - f"@singleton returning global instance of {self.__wrapped__.__name__}" - ) + logger.debug('@singleton returning global instance of %s', self.__wrapped__.__name__) if self._instance is None: self._instance = self.__wrapped__(*args, **kwargs) return self._instance @@ -360,13 +365,13 @@ def memoized(func: Callable) -> Callable: cache_key = args + tuple(kwargs.items()) if cache_key not in wrapper_memoized.cache: value = func(*args, **kwargs) - logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}") + logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__) wrapper_memoized.cache[cache_key] = value else: - logger.debug(f"Returning memoized value for {func.__name__}") + logger.debug('Returning memoized value for %s', {func.__name__}) return wrapper_memoized.cache[cache_key] - wrapper_memoized.cache = dict() + wrapper_memoized.cache = {} # type: ignore return wrapper_memoized @@ -408,7 +413,7 @@ def retry_predicate( @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable - logger.debug(f'deco_retry: will make up to {mtries} attempts...') + logger.debug('deco_retry: will make up to %d attempts...', mtries) retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: @@ -542,7 +547,7 @@ def thunkify(func): def _raise_exception(exception, error_message: Optional[str]): if error_message is None: - raise Exception() + raise Exception(exception) else: raise Exception(error_message) @@ -595,9 +600,7 @@ class _Timeout(object): self.__limit = kwargs.pop("timeout", self.__limit) self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args - self.__process = multiprocessing.Process( - target=_target, args=args, kwargs=kwargs - ) + self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() if self.__limit is not None: @@ -627,6 +630,7 @@ class _Timeout(object): if flag: return load raise load + return None def timeout( @@ -694,9 +698,7 @@ def timeout( @functools.wraps(function) def new_function(*args, **kwargs): - timeout_wrapper = _Timeout( - function, timeout_exception, error_message, seconds - ) + timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds) return timeout_wrapper(*args, **kwargs) return new_function @@ -731,7 +733,8 @@ def call_with_sample_rate(sample_rate: float) -> Callable: if random.uniform(0, 1) < sample_rate: return f(*args, **kwargs) else: - logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}") + logger.debug("@call_with_sample_rate skipping a call to %s", f.__name__) + return None return _call_with_sample_rate