#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+# Portions (marked) below retain the original author's copyright.
+
"""Decorators."""
import enum
import threading
import time
import traceback
-from typing import Any, Callable, Optional
import warnings
+from typing import Any, Callable, List, Optional
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
import exceptions
-
logger = logging.getLogger(__name__)
print(msg)
logger.info(msg)
return value
+
return wrapper_timer
print(msg)
logger.info(msg)
return ret
+
return wrapper_invocation_logged
wait_time = min_interval_seconds - elapsed_since_last
else:
wait_time = 0.0
- logger.debug(f'@{time.time()}> wait_time = {wait_time}')
+ logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time)
return wait_time
def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
):
break
with cv:
- logger.debug(f'@{time.time()}> calling it...')
+ logger.debug('@%.4f> calling it...', time.time())
ret = func(*args, **kargs)
last_invocation_timestamp[0] = time.time()
logger.debug(
- f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
+ '@%.4f> Last invocation <- %.4f', time.time(), last_invocation_timestamp[0]
)
cv.notify()
return ret
+
return wrapper_wrapper_rate_limited
+
return wrapper_rate_limited
print(msg)
logger.info(msg)
return value
+
return wrapper_debug_args
print(msg)
logger.info(msg)
return func(*args, **kwargs)
- wrapper_debug_count_calls.num_calls = 0
+
+ wrapper_debug_count_calls.num_calls = 0 # type: ignore
return wrapper_debug_count_calls
class DelayWhen(enum.IntEnum):
+ """When should we delay: before or after calling the function (or
+ both)?
+
+ """
+
BEFORE_CALL = 1
AFTER_CALL = 2
BEFORE_AND_AFTER = 3
True
"""
+
def decorator_delay(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_delay(*args, **kwargs):
if when & DelayWhen.BEFORE_CALL:
- logger.debug(
- f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
- )
+ logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__)
time.sleep(seconds)
retval = func(*args, **kwargs)
if when & DelayWhen.AFTER_CALL:
- logger.debug(
- f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
- )
+ logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__)
time.sleep(seconds)
return retval
+
return wrapper_delay
if _func is None:
def __call__(self, *args, **kwargs):
"""Returns a single instance of decorated class"""
- logger.debug(
- f"@singleton returning global instance of {self.__wrapped__.__name__}"
- )
+ logger.debug('@singleton returning global instance of %s', self.__wrapped__.__name__)
if self._instance is None:
self._instance = self.__wrapped__(*args, **kwargs)
return self._instance
True
"""
+
@functools.wraps(func)
def wrapper_memoized(*args, **kwargs):
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
- logger.debug(
- f"Memoizing {cache_key} => {value} for {func.__name__}"
- )
+ logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__)
wrapper_memoized.cache[cache_key] = value
else:
- logger.debug(f"Returning memoized value for {func.__name__}")
+ logger.debug('Returning memoized value for %s', {func.__name__})
return wrapper_memoized.cache[cache_key]
- wrapper_memoized.cache = dict()
+
+ wrapper_memoized.cache = {} # type: ignore
return wrapper_memoized
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
- logger.debug(f'deco_retry: will make up to {mtries} attempts...')
+ logger.debug('deco_retry: will make up to %d attempts...', mtries)
retval = f(*args, **kwargs)
while mtries > 0:
if predicate(retval) is True:
mdelay *= backoff
retval = f(*args, **kwargs)
return retval
+
return f_retry
+
return deco_retry
3
>>> dur > 2.0
True
- >>> dur < 2.2
+ >>> dur < 2.3
True
"""
when the function is used.
"""
+
@functools.wraps(func)
def wrapper_deprecated(*args, **kwargs):
msg = f"Call to deprecated function {func.__qualname__}"
warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
print(msg, file=sys.stderr)
return func(*args, **kwargs)
+
return wrapper_deprecated
wait_event = threading.Event()
result = [None]
- exc = [False, None]
+ exc: List[Any] = [False, None]
def worker_func():
try:
def thunk():
wait_event.wait()
if exc[0]:
+ assert exc[1]
raise exc[1][0](exc[1][1])
return result[0]
def _raise_exception(exception, error_message: Optional[str]):
if error_message is None:
- raise Exception()
+ raise Exception(exception)
else:
raise Exception(error_message)
self.__limit = kwargs.pop("timeout", self.__limit)
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
- self.__process = multiprocessing.Process(
- target=_target, args=args, kwargs=kwargs
- )
+ self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
if self.__limit is not None:
if flag:
return load
raise load
+ return None
def timeout(
"""
if use_signals is None:
import thread_utils
+
use_signals = thread_utils.is_current_thread_main_thread()
def decorate(function):
if use_signals:
- def handler(signum, frame):
+ def handler(unused_signum, unused_frame):
_raise_exception(timeout_exception, error_message)
@functools.wraps(function)
@functools.wraps(function)
def new_function(*args, **kwargs):
- timeout_wrapper = _Timeout(
- function, timeout_exception, error_message, seconds
- )
+ timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds)
return timeout_wrapper(*args, **kwargs)
return new_function
return decorate
-class non_reentrant_code(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
+def synchronized(lock):
+ def wrap(f):
+ @functools.wraps(f)
+ def _gatekeeper(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
return _gatekeeper
-
-class rlocked(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
-
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
- return _gatekeeper
+ return wrap
def call_with_sample_rate(sample_rate: float) -> Callable:
if random.uniform(0, 1) < sample_rate:
return f(*args, **kwargs)
else:
- logger.debug(
- f"@call_with_sample_rate skipping a call to {f.__name__}"
- )
+ logger.debug("@call_with_sample_rate skipping a call to %s", f.__name__)
+ return None
+
return _call_with_sample_rate
+
return decorator
prefix. If prefix is None (default), decorate all methods in the
class.
"""
+
def decorate_the_class(cls):
for name, m in inspect.getmembers(cls, inspect.isfunction):
if acl is None:
if acl(name):
setattr(cls, name, decorator(m))
return cls
+
return decorate_the_class
if __name__ == '__main__':
import doctest
- doctest.testmod()
+ doctest.testmod()