"""Decorators."""
-import datetime
import enum
import functools
import inspect
import threading
import time
import traceback
-from typing import Callable, Optional
+from typing import Any, Callable, Optional, Tuple
import warnings
# This module is commonly used by others in here and should avoid
@functools.wraps(func)
def wrapper_invocation_logged(*args, **kwargs):
- now = datetime.datetime.now()
- ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
- msg = f"[{ts}]: Entered {func.__name__}"
+ msg = f"Entered {func.__qualname__}"
print(msg)
logger.info(msg)
ret = func(*args, **kwargs)
- now = datetime.datetime.now()
- ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
- msg = f"[{ts}]: Exited {func.__name__}"
+ msg = f"Exited {func.__qualname__}"
print(msg)
logger.info(msg)
return ret
return wrapper_invocation_logged
+def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
+ """Limit invocation of a wrapped function to n calls per period.
+ Thread safe. In testing this was relatively fair with multiple
+ threads using it though that hasn't been measured.
+
+ """
+ min_interval_seconds = per_period_in_seconds / float(n_calls)
+
+ def wrapper_rate_limited(func: Callable) -> Callable:
+ cv = threading.Condition()
+ last_invocation_timestamp = [0.0]
+
+ def may_proceed() -> float:
+ now = time.time()
+ last_invocation = last_invocation_timestamp[0]
+ if last_invocation != 0.0:
+ elapsed_since_last = now - last_invocation
+ wait_time = min_interval_seconds - elapsed_since_last
+ else:
+ wait_time = 0.0
+ return wait_time
+
+ def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
+ with cv:
+ while True:
+ cv.wait_for(
+ lambda: may_proceed() <= 0.0,
+ timeout=may_proceed(),
+ )
+ break
+ ret = func(*args, **kargs)
+ with cv:
+ last_invocation_timestamp[0] = time.time()
+ cv.notify()
+ return ret
+ return wrapper_wrapper_rate_limited
+ return wrapper_rate_limited
+
+
def debug_args(func: Callable) -> Callable:
"""Print the function signature and return value at each call."""
tries: int,
*,
predicate: Callable[..., bool],
- delay_sec: float = 3,
+ delay_sec: float = 3.0,
backoff: float = 2.0,
):
"""Retries a function or method up to a certain number of times
delay_sec sets the initial delay period in seconds.
backoff is a multiplied (must be >1) used to modify the delay.
predicate is a function that will be passed the retval of the
- decorated function and must return True to stop or False to
- retry.
+ decorated function and must return True to stop or False to
+ retry.
"""
- if backoff < 1:
+ if backoff < 1.0:
msg = f"backoff must be greater than or equal to 1, got {backoff}"
logger.critical(msg)
raise ValueError(msg)
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
+ logger.debug(f'deco_retry: will make up to {mtries} attempts...')
retval = f(*args, **kwargs)
while mtries > 0:
if predicate(retval) is True:
+ logger.debug('Predicate succeeded, deco_retry is done.')
return retval
logger.debug("Predicate failed, sleeping and retrying.")
mtries -= 1
exc[0] = True
exc[1] = sys.exc_info() # (type, value, traceback)
msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
- logger.warning(msg)
print(msg)
+ logger.warning(msg)
finally:
wait_event.set()
"""
try:
queue.put((True, function(*args, **kwargs)))
- except:
+ except Exception:
queue.put((False, sys.exc_info()[1]))
use_signals = thread_utils.is_current_thread_main_thread()
def decorate(function):
-
if use_signals:
def handler(signum, frame):