#!/usr/bin/env python3 """Decorators.""" import datetime import enum import functools import inspect import logging import math import multiprocessing import random import signal import sys import threading import time import traceback from typing import Callable, Optional import warnings import exceptions import thread_utils logger = logging.getLogger(__name__) def timed(func: Callable) -> Callable: """Print the runtime of the decorated function.""" @functools.wraps(func) def wrapper_timer(*args, **kwargs): start_time = time.perf_counter() value = func(*args, **kwargs) end_time = time.perf_counter() run_time = end_time - start_time msg = f"Finished {func.__name__!r} in {run_time:.4f}s" print(msg) logger.info(msg) return value return wrapper_timer def invocation_logged(func: Callable) -> Callable: """Log the call of a function.""" @functools.wraps(func) def wrapper_invocation_logged(*args, **kwargs): now = datetime.datetime.now() ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") msg = f"[{ts}]: Entered {func.__name__}" print(msg) logger.info(msg) ret = func(*args, **kwargs) now = datetime.datetime.now() ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z") msg = f"[{ts}]: Exited {func.__name__}" print(msg) logger.info(msg) return ret return wrapper_invocation_logged def debug_args(func: Callable) -> Callable: """Print the function signature and return value at each call.""" @functools.wraps(func) def wrapper_debug_args(*args, **kwargs): args_repr = [f"{repr(a)}:{type(a)}" for a in args] kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()] signature = ", ".join(args_repr + kwargs_repr) msg = f"Calling {func.__name__}({signature})" print(msg) logger.info(msg) value = func(*args, **kwargs) msg = f"{func.__name__!r} returned {value!r}:{type(value)}" logger.info(msg) return value return wrapper_debug_args def debug_count_calls(func: Callable) -> Callable: """Count function invocations and print a message befor every call.""" @functools.wraps(func) def wrapper_debug_count_calls(*args, **kwargs): wrapper_debug_count_calls.num_calls += 1 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}" print(msg) logger.info(msg) return func(*args, **kwargs) wrapper_debug_count_calls.num_calls = 0 return wrapper_debug_count_calls class DelayWhen(enum.Enum): BEFORE_CALL = 1 AFTER_CALL = 2 BEFORE_AND_AFTER = 3 def delay( _func: Callable = None, *, seconds: float = 1.0, when: DelayWhen = DelayWhen.BEFORE_CALL, ) -> Callable: """Delay the execution of a function by sleeping before and/or after. Slow down a function by inserting a delay before and/or after its invocation. """ def decorator_delay(func: Callable) -> Callable: @functools.wraps(func) def wrapper_delay(*args, **kwargs): if when & DelayWhen.BEFORE_CALL: logger.debug( f"@delay for {seconds}s BEFORE_CALL to {func.__name__}" ) time.sleep(seconds) retval = func(*args, **kwargs) if when & DelayWhen.AFTER_CALL: logger.debug( f"@delay for {seconds}s AFTER_CALL to {func.__name__}" ) time.sleep(seconds) return retval return wrapper_delay if _func is None: return decorator_delay else: return decorator_delay(_func) class _SingletonWrapper: """ A singleton wrapper class. Its instances would be created for each decorated class. """ def __init__(self, cls): self.__wrapped__ = cls self._instance = None def __call__(self, *args, **kwargs): """Returns a single instance of decorated class""" logger.debug( f"@singleton returning global instance of {self.__wrapped__.__name__}" ) if self._instance is None: self._instance = self.__wrapped__(*args, **kwargs) return self._instance def singleton(cls): """ A singleton decorator. Returns a wrapper objects. A call on that object returns a single instance object of decorated class. Use the __wrapped__ attribute to access decorated class directly in unit tests """ return _SingletonWrapper(cls) def memoized(func: Callable) -> Callable: """Keep a cache of previous function call results. The cache here is a dict with a key based on the arguments to the call. Consider also: functools.lru_cache for a more advanced implementation. """ @functools.wraps(func) def wrapper_memoized(*args, **kwargs): cache_key = args + tuple(kwargs.items()) if cache_key not in wrapper_memoized.cache: value = func(*args, **kwargs) logger.debug( f"Memoizing {cache_key} => {value} for {func.__name__}" ) wrapper_memoized.cache[cache_key] = value else: logger.debug(f"Returning memoized value for {func.__name__}") return wrapper_memoized.cache[cache_key] wrapper_memoized.cache = dict() return wrapper_memoized def retry_predicate( tries: int, *, predicate: Callable[..., bool], delay_sec: float = 3, backoff: float = 2.0, ): """Retries a function or method up to a certain number of times with a prescribed initial delay period and backoff rate. tries is the maximum number of attempts to run the function. delay_sec sets the initial delay period in seconds. backoff is a multiplied (must be >1) used to modify the delay. predicate is a function that will be passed the retval of the decorated function and must return True to stop or False to retry. """ if backoff < 1: msg = f"backoff must be greater than or equal to 1, got {backoff}" logger.critical(msg) raise ValueError(msg) tries = math.floor(tries) if tries < 0: msg = f"tries must be 0 or greater, got {tries}" logger.critical(msg) raise ValueError(msg) if delay_sec <= 0: msg = f"delay_sec must be greater than 0, got {delay_sec}" logger.critical(msg) raise ValueError(msg) def deco_retry(f): @functools.wraps(f) def f_retry(*args, **kwargs): mtries, mdelay = tries, delay_sec # make mutable retval = f(*args, **kwargs) while mtries > 0: if predicate(retval) is True: return retval logger.debug("Predicate failed, sleeping and retrying.") mtries -= 1 time.sleep(mdelay) mdelay *= backoff retval = f(*args, **kwargs) return retval return f_retry return deco_retry def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0): return retry_predicate( tries, predicate=lambda x: x is True, delay_sec=delay_sec, backoff=backoff, ) def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0): return retry_predicate( tries, predicate=lambda x: x is not None, delay_sec=delay_sec, backoff=backoff, ) def deprecated(func): """This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used. """ @functools.wraps(func) def wrapper_deprecated(*args, **kwargs): msg = f"Call to deprecated function {func.__name__}" logger.warning(msg) warnings.warn(msg, category=DeprecationWarning) return func(*args, **kwargs) return wrapper_deprecated def thunkify(func): """ Make a function immediately return a function of no args which, when called, waits for the result, which will start being processed in another thread. """ @functools.wraps(func) def lazy_thunked(*args, **kwargs): wait_event = threading.Event() result = [None] exc = [False, None] def worker_func(): try: func_result = func(*args, **kwargs) result[0] = func_result except Exception: exc[0] = True exc[1] = sys.exc_info() # (type, value, traceback) msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}" logger.warning(msg) print(msg) finally: wait_event.set() def thunk(): wait_event.wait() if exc[0]: raise exc[1][0](exc[1][1]) return result[0] threading.Thread(target=worker_func).start() return thunk return lazy_thunked ############################################################ # Timeout ############################################################ # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/ # Used work of Stephen "Zero" Chappell # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py def _raise_exception(exception, error_message: Optional[str]): if error_message is None: raise exception() else: raise exception(error_message) def _target(queue, function, *args, **kwargs): """Run a function with arguments and return output via a queue. This is a helper function for the Process created in _Timeout. It runs the function with positional arguments and keyword arguments and then returns the function's output by way of a queue. If an exception gets raised, it is returned to _Timeout to be raised by the value property. """ try: queue.put((True, function(*args, **kwargs))) except: queue.put((False, sys.exc_info()[1])) class _Timeout(object): """Wrap a function and add a timeout (limit) attribute to it. Instances of this class are automatically generated by the add_timeout function defined below. """ def __init__( self, function: Callable, timeout_exception: Exception, error_message: str, seconds: float, ): self.__limit = seconds self.__function = function self.__timeout_exception = timeout_exception self.__error_message = error_message self.__name__ = function.__name__ self.__doc__ = function.__doc__ self.__timeout = time.time() self.__process = multiprocessing.Process() self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue() def __call__(self, *args, **kwargs): """Execute the embedded function object asynchronously. The function given to the constructor is transparently called and requires that "ready" be intermittently polled. If and when it is True, the "value" property may then be checked for returned data. """ self.__limit = kwargs.pop("timeout", self.__limit) self.__queue = multiprocessing.Queue(1) args = (self.__queue, self.__function) + args self.__process = multiprocessing.Process( target=_target, args=args, kwargs=kwargs ) self.__process.daemon = True self.__process.start() if self.__limit is not None: self.__timeout = self.__limit + time.time() while not self.ready: time.sleep(0.1) return self.value def cancel(self): """Terminate any possible execution of the embedded function.""" if self.__process.is_alive(): self.__process.terminate() _raise_exception(self.__timeout_exception, self.__error_message) @property def ready(self): """Read-only property indicating status of "value" property.""" if self.__limit and self.__timeout < time.time(): self.cancel() return self.__queue.full() and not self.__queue.empty() @property def value(self): """Read-only property containing data returned from function.""" if self.ready is True: flag, load = self.__queue.get() if flag: return load raise load def timeout( seconds: float = 1.0, use_signals: Optional[bool] = None, timeout_exception=exceptions.TimeoutError, error_message="Function call timed out", ): """Add a timeout parameter to a function and return the function. Note: the use_signals parameter is included in order to support multiprocessing scenarios (signal can only be used from the process' main thread). When not using signals, timeout granularity will be rounded to the nearest 0.1s. Raises an exception when the timeout is reached. It is illegal to pass anything other than a function as the first parameter. The function is wrapped and returned to the caller. """ if use_signals is None: use_signals = thread_utils.is_current_thread_main_thread() def decorate(function): if use_signals: def handler(signum, frame): _raise_exception(timeout_exception, error_message) @functools.wraps(function) def new_function(*args, **kwargs): new_seconds = kwargs.pop("timeout", seconds) if new_seconds: old = signal.signal(signal.SIGALRM, handler) signal.setitimer(signal.ITIMER_REAL, new_seconds) if not seconds: return function(*args, **kwargs) try: return function(*args, **kwargs) finally: if new_seconds: signal.setitimer(signal.ITIMER_REAL, 0) signal.signal(signal.SIGALRM, old) return new_function else: @functools.wraps(function) def new_function(*args, **kwargs): timeout_wrapper = _Timeout( function, timeout_exception, error_message, seconds ) return timeout_wrapper(*args, **kwargs) return new_function return decorate class non_reentrant_code(object): def __init__(self): self._lock = threading.RLock self._entered = False def __call__(self, f): def _gatekeeper(*args, **kwargs): with self._lock: if self._entered: return self._entered = True f(*args, **kwargs) self._entered = False return _gatekeeper class rlocked(object): def __init__(self): self._lock = threading.RLock self._entered = False def __call__(self, f): def _gatekeeper(*args, **kwargs): with self._lock: if self._entered: return self._entered = True f(*args, **kwargs) self._entered = False return _gatekeeper def call_with_sample_rate(sample_rate: float) -> Callable: if not 0.0 <= sample_rate <= 1.0: msg = f"sample_rate must be between [0, 1]. Got {sample_rate}." logger.critical(msg) raise ValueError(msg) def decorator(f): @functools.wraps(f) def _call_with_sample_rate(*args, **kwargs): if random.uniform(0, 1) < sample_rate: return f(*args, **kwargs) else: logger.debug( f"@call_with_sample_rate skipping a call to {f.__name__}" ) return _call_with_sample_rate return decorator def decorate_matching_methods_with(decorator, acl=None): """Apply decorator to all methods in a class whose names begin with prefix. If prefix is None (default), decorate all methods in the class. """ def decorate_the_class(cls): for name, m in inspect.getmembers(cls, inspect.isfunction): if acl is None: setattr(cls, name, decorator(m)) else: if acl(name): setattr(cls, name, decorator(m)) return cls return decorate_the_class