X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=decorator_utils.py;h=4615fec6f8960e0083ce48546ba9421c25243d42;hb=e46158e49121b8a955bb07b73f5bcf9928b79c90;hp=a5c5afecb34c005d16a351cc703f44dc561b567d;hpb=31c81f6539969a5eba864d3305f9fb7bf716a367;p=python_utils.git diff --git a/decorator_utils.py b/decorator_utils.py index a5c5afe..4615fec 100644 --- a/decorator_utils.py +++ b/decorator_utils.py @@ -1,6 +1,9 @@ #!/usr/bin/env python3 -"""Decorators.""" +# © Copyright 2021-2022, Scott Gasch +# Portions (marked) below retain the original author's copyright. + +"""Useful(?) decorators.""" import enum import functools @@ -15,7 +18,7 @@ import threading import time import traceback import warnings -from typing import Any, Callable, Optional +from typing import Any, Callable, List, Optional # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -30,7 +33,7 @@ def timed(func: Callable) -> Callable: >>> @timed ... def foo(): ... import time - ... time.sleep(0.1) + ... time.sleep(0.01) >>> foo() # doctest: +ELLIPSIS Finished foo in ... @@ -52,7 +55,7 @@ def timed(func: Callable) -> Callable: def invocation_logged(func: Callable) -> Callable: - """Log the call of a function. + """Log the call of a function on stdout and the info log. >>> @invocation_logged ... def foo(): @@ -80,9 +83,9 @@ def invocation_logged(func: Callable) -> Callable: def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable: - """Limit invocation of a wrapped function to n calls per period. + """Limit invocation of a wrapped function to n calls per time period. Thread safe. In testing this was relatively fair with multiple - threads using it though that hasn't been measured. + threads using it though that hasn't been measured in detail. >>> import time >>> import decorator_utils @@ -133,7 +136,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl wait_time = min_interval_seconds - elapsed_since_last else: wait_time = 0.0 - logger.debug(f'@{time.time()}> wait_time = {wait_time}') + logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time) return wait_time def wrapper_wrapper_rate_limited(*args, **kargs) -> Any: @@ -145,11 +148,11 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl ): break with cv: - logger.debug(f'@{time.time()}> calling it...') + logger.debug('@%.4f> calling it...', time.time()) ret = func(*args, **kargs) last_invocation_timestamp[0] = time.time() logger.debug( - f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}' + '@%.4f> Last invocation <- %.4f', time.time(), last_invocation_timestamp[0] ) cv.notify() return ret @@ -227,6 +230,11 @@ def debug_count_calls(func: Callable) -> Callable: class DelayWhen(enum.IntEnum): + """When should we delay: before or after calling the function (or + both)? + + """ + BEFORE_CALL = 1 AFTER_CALL = 2 BEFORE_AND_AFTER = 3 @@ -238,9 +246,7 @@ def delay( seconds: float = 1.0, when: DelayWhen = DelayWhen.BEFORE_CALL, ) -> Callable: - """Delay the execution of a function by sleeping before and/or after. - - Slow down a function by inserting a delay before and/or after its + """Slow down a function by inserting a delay before and/or after its invocation. >>> import time @@ -261,11 +267,11 @@ def delay( @functools.wraps(func) def wrapper_delay(*args, **kwargs): if when & DelayWhen.BEFORE_CALL: - logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}") + logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__) time.sleep(seconds) retval = func(*args, **kwargs) if when & DelayWhen.AFTER_CALL: - logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}") + logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__) time.sleep(seconds) return retval @@ -290,9 +296,7 @@ class _SingletonWrapper: def __call__(self, *args, **kwargs): """Returns a single instance of decorated class""" - logger.debug( - f"@singleton returning global instance of {self.__wrapped__.__name__}" - ) + logger.debug('@singleton returning global instance of %s', self.__wrapped__.__name__) if self._instance is None: self._instance = self.__wrapped__(*args, **kwargs) return self._instance @@ -324,8 +328,9 @@ def memoized(func: Callable) -> Callable: """Keep a cache of previous function call results. The cache here is a dict with a key based on the arguments to the - call. Consider also: functools.lru_cache for a more advanced - implementation. + call. Consider also: functools.cache for a more advanced + implementation. See: + https://docs.python.org/3/library/functools.html#functools.cache >>> import time @@ -359,13 +364,13 @@ def memoized(func: Callable) -> Callable: cache_key = args + tuple(kwargs.items()) if cache_key not in wrapper_memoized.cache: value = func(*args, **kwargs) - logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}") + logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__) wrapper_memoized.cache[cache_key] = value else: - logger.debug(f"Returning memoized value for {func.__name__}") + logger.debug('Returning memoized value for %s', {func.__name__}) return wrapper_memoized.cache[cache_key] - wrapper_memoized.cache = dict() # type: ignore + wrapper_memoized.cache = {} # type: ignore return wrapper_memoized @@ -376,17 +381,20 @@ def retry_predicate( delay_sec: float = 3.0, backoff: float = 2.0, ): - """Retries a function or method up to a certain number of times - with a prescribed initial delay period and backoff rate. - - tries is the maximum number of attempts to run the function. - delay_sec sets the initial delay period in seconds. - backoff is a multiplied (must be >1) used to modify the delay. - predicate is a function that will be passed the retval of the - decorated function and must return True to stop or False to - retry. - + """Retries a function or method up to a certain number of times with a + prescribed initial delay period and backoff rate (multiplier). + + Args: + tries: the maximum number of attempts to run the function + delay_sec: sets the initial delay period in seconds + backoff: a multiplier (must be >=1.0) used to modify the + delay at each subsequent invocation + predicate: a Callable that will be passed the retval of + the decorated function and must return True to indicate + that we should stop calling or False to indicate a retry + is necessary """ + if backoff < 1.0: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) @@ -407,7 +415,7 @@ def retry_predicate( @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable - logger.debug(f'deco_retry: will make up to {mtries} attempts...') + logger.debug('deco_retry: will make up to %d attempts...', mtries) retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: @@ -432,7 +440,6 @@ def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0): >>> import time >>> counter = 0 - >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1) ... def foo(): ... global counter @@ -464,8 +471,8 @@ def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0): """Another helper for @retry_predicate above. Retries up to N times so long as the wrapped function returns None with a delay between each retry and a backoff that can increase the delay. - """ + return retry_predicate( tries, predicate=lambda x: x is not None, @@ -478,7 +485,6 @@ def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used. - """ @functools.wraps(func) @@ -504,7 +510,7 @@ def thunkify(func): wait_event = threading.Event() result = [None] - exc = [False, None] + exc: List[Any] = [False, None] def worker_func(): try: @@ -521,6 +527,7 @@ def thunkify(func): def thunk(): wait_event.wait() if exc[0]: + assert exc[1] raise exc[1][0](exc[1][1]) return result[0] @@ -541,7 +548,7 @@ def thunkify(func): def _raise_exception(exception, error_message: Optional[str]): if error_message is None: - raise Exception() + raise Exception(exception) else: raise Exception(error_message) @@ -594,9 +601,7 @@ class _Timeout(object): self.__limit = kwargs.pop("timeout", self.__limit) self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args - self.__process = multiprocessing.Process( - target=_target, args=args, kwargs=kwargs - ) + self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs) self.__process.daemon = True self.__process.start() if self.__limit is not None: @@ -626,6 +631,7 @@ class _Timeout(object): if flag: return load raise load + return None def timeout( @@ -641,6 +647,13 @@ def timeout( main thread). When not using signals, timeout granularity will be rounded to the nearest 0.1s. + Beware that an @timeout on a function inside a module will be + evaluated at module load time and not when the wrapped function is + invoked. This can lead to problems when relying on the automatic + main thread detection code (use_signals=None, the default) since + the import probably happens on the main thread and the invocation + can happen on a different thread (which can't use signals). + Raises an exception when/if the timeout is reached. It is illegal to pass anything other than a function as the first @@ -668,7 +681,7 @@ def timeout( def decorate(function): if use_signals: - def handler(signum, frame): + def handler(unused_signum, unused_frame): _raise_exception(timeout_exception, error_message) @functools.wraps(function) @@ -693,9 +706,7 @@ def timeout( @functools.wraps(function) def new_function(*args, **kwargs): - timeout_wrapper = _Timeout( - function, timeout_exception, error_message, seconds - ) + timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds) return timeout_wrapper(*args, **kwargs) return new_function @@ -704,6 +715,11 @@ def timeout( def synchronized(lock): + """Emulates java's synchronized keyword: given a lock, require that + threads take that lock (or wait) before invoking the wrapped + function and automatically releases the lock afterwards. + """ + def wrap(f): @functools.wraps(f) def _gatekeeper(*args, **kw): @@ -719,6 +735,10 @@ def synchronized(lock): def call_with_sample_rate(sample_rate: float) -> Callable: + """Calls the wrapped function probabilistically given a rate between + 0.0 and 1.0 inclusive (0% probability and 100% probability). + """ + if not 0.0 <= sample_rate <= 1.0: msg = f"sample_rate must be between [0, 1]. Got {sample_rate}." logger.critical(msg) @@ -730,7 +750,8 @@ def call_with_sample_rate(sample_rate: float) -> Callable: if random.uniform(0, 1) < sample_rate: return f(*args, **kwargs) else: - logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}") + logger.debug("@call_with_sample_rate skipping a call to %s", f.__name__) + return None return _call_with_sample_rate @@ -738,9 +759,9 @@ def call_with_sample_rate(sample_rate: float) -> Callable: def decorate_matching_methods_with(decorator, acl=None): - """Apply decorator to all methods in a class whose names begin with - prefix. If prefix is None (default), decorate all methods in the - class. + """Apply the given decorator to all methods in a class whose names + begin with prefix. If prefix is None (default), decorate all + methods in the class. """ def decorate_the_class(cls):