"""Decorators."""
-import datetime
import enum
import functools
+import inspect
import logging
import math
import multiprocessing
import threading
import time
import traceback
-from typing import Callable, Optional
+from typing import Any, Callable, Optional
import warnings
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
import exceptions
-import thread_utils
+
logger = logging.getLogger(__name__)
@functools.wraps(func)
def wrapper_invocation_logged(*args, **kwargs):
- now = datetime.datetime.now()
- ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
- msg = f"[{ts}]: Entered {func.__name__}"
+ msg = f"Entered {func.__qualname__}"
print(msg)
logger.info(msg)
ret = func(*args, **kwargs)
- now = datetime.datetime.now()
- ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
- msg = f"[{ts}]: Exited {func.__name__}"
+ msg = f"Exited {func.__qualname__}"
print(msg)
logger.info(msg)
return ret
return wrapper_invocation_logged
+def rate_limited(n_per_second: int) -> Callable:
+ """Limit invocation of a wrapped function to n calls per second.
+ Thread safe.
+
+ """
+ min_interval = 1.0 / float(n_per_second)
+
+ def wrapper_rate_limited(func: Callable) -> Callable:
+ last_invocation_time = [0.0]
+
+ def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
+ while True:
+ elapsed = time.clock_gettime(0) - last_invocation_time[0]
+ wait_time = min_interval - elapsed
+ if wait_time > 0.0:
+ time.sleep(wait_time)
+ else:
+ break
+ ret = func(*args, **kargs)
+ last_invocation_time[0] = time.clock_gettime(0)
+ return ret
+ return wrapper_wrapper_rate_limited
+ return wrapper_rate_limited
+
+
def debug_args(func: Callable) -> Callable:
"""Print the function signature and return value at each call."""
tries: int,
*,
predicate: Callable[..., bool],
- delay_sec: float = 3,
+ delay_sec: float = 3.0,
backoff: float = 2.0,
):
"""Retries a function or method up to a certain number of times
delay_sec sets the initial delay period in seconds.
backoff is a multiplied (must be >1) used to modify the delay.
predicate is a function that will be passed the retval of the
- decorated function and must return True to stop or False to
- retry.
+ decorated function and must return True to stop or False to
+ retry.
"""
- if backoff < 1:
+ if backoff < 1.0:
msg = f"backoff must be greater than or equal to 1, got {backoff}"
logger.critical(msg)
raise ValueError(msg)
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
+ logger.debug(f'deco_retry: will make up to {mtries} attempts...')
retval = f(*args, **kwargs)
while mtries > 0:
if predicate(retval) is True:
+ logger.debug('Predicate succeeded, deco_retry is done.')
return retval
logger.debug("Predicate failed, sleeping and retrying.")
mtries -= 1
exc[0] = True
exc[1] = sys.exc_info() # (type, value, traceback)
msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
- logger.warning(msg)
print(msg)
+ logger.warning(msg)
finally:
wait_event.set()
"""
try:
queue.put((True, function(*args, **kwargs)))
- except:
+ except Exception:
queue.put((False, sys.exc_info()[1]))
parameter. The function is wrapped and returned to the caller.
"""
if use_signals is None:
+ import thread_utils
use_signals = thread_utils.is_current_thread_main_thread()
def decorate(function):
-
if use_signals:
def handler(signum, frame):
)
return _call_with_sample_rate
return decorator
+
+
+def decorate_matching_methods_with(decorator, acl=None):
+ """Apply decorator to all methods in a class whose names begin with
+ prefix. If prefix is None (default), decorate all methods in the
+ class.
+ """
+ def decorate_the_class(cls):
+ for name, m in inspect.getmembers(cls, inspect.isfunction):
+ if acl is None:
+ setattr(cls, name, decorator(m))
+ else:
+ if acl(name):
+ setattr(cls, name, decorator(m))
+ return cls
+ return decorate_the_class