10 import multiprocessing
17 from typing import Any, Callable, Optional
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
25 logger = logging.getLogger(__name__)
28 def timed(func: Callable) -> Callable:
29 """Print the runtime of the decorated function."""
31 @functools.wraps(func)
32 def wrapper_timer(*args, **kwargs):
33 start_time = time.perf_counter()
34 value = func(*args, **kwargs)
35 end_time = time.perf_counter()
36 run_time = end_time - start_time
37 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
44 def invocation_logged(func: Callable) -> Callable:
45 """Log the call of a function."""
47 @functools.wraps(func)
48 def wrapper_invocation_logged(*args, **kwargs):
49 msg = f"Entered {func.__qualname__}"
52 ret = func(*args, **kwargs)
53 msg = f"Exited {func.__qualname__}"
57 return wrapper_invocation_logged
60 def rate_limited(n_per_second: int) -> Callable:
61 """Limit invocation of a wrapped function to n calls per second.
65 min_interval = 1.0 / float(n_per_second)
67 def wrapper_rate_limited(func: Callable) -> Callable:
68 last_invocation_time = [0.0]
70 def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
72 elapsed = time.clock_gettime(0) - last_invocation_time[0]
73 wait_time = min_interval - elapsed
78 ret = func(*args, **kargs)
79 last_invocation_time[0] = time.clock_gettime(0)
81 return wrapper_wrapper_rate_limited
82 return wrapper_rate_limited
85 def debug_args(func: Callable) -> Callable:
86 """Print the function signature and return value at each call."""
88 @functools.wraps(func)
89 def wrapper_debug_args(*args, **kwargs):
90 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
91 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
92 signature = ", ".join(args_repr + kwargs_repr)
93 msg = f"Calling {func.__name__}({signature})"
96 value = func(*args, **kwargs)
97 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
100 return wrapper_debug_args
103 def debug_count_calls(func: Callable) -> Callable:
104 """Count function invocations and print a message befor every call."""
106 @functools.wraps(func)
107 def wrapper_debug_count_calls(*args, **kwargs):
108 wrapper_debug_count_calls.num_calls += 1
109 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
112 return func(*args, **kwargs)
113 wrapper_debug_count_calls.num_calls = 0
114 return wrapper_debug_count_calls
117 class DelayWhen(enum.Enum):
124 _func: Callable = None,
126 seconds: float = 1.0,
127 when: DelayWhen = DelayWhen.BEFORE_CALL,
129 """Delay the execution of a function by sleeping before and/or after.
131 Slow down a function by inserting a delay before and/or after its
135 def decorator_delay(func: Callable) -> Callable:
136 @functools.wraps(func)
137 def wrapper_delay(*args, **kwargs):
138 if when & DelayWhen.BEFORE_CALL:
140 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
143 retval = func(*args, **kwargs)
144 if when & DelayWhen.AFTER_CALL:
146 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
153 return decorator_delay
155 return decorator_delay(_func)
158 class _SingletonWrapper:
160 A singleton wrapper class. Its instances would be created
161 for each decorated class.
164 def __init__(self, cls):
165 self.__wrapped__ = cls
166 self._instance = None
168 def __call__(self, *args, **kwargs):
169 """Returns a single instance of decorated class"""
171 f"@singleton returning global instance of {self.__wrapped__.__name__}"
173 if self._instance is None:
174 self._instance = self.__wrapped__(*args, **kwargs)
175 return self._instance
180 A singleton decorator. Returns a wrapper objects. A call on that object
181 returns a single instance object of decorated class. Use the __wrapped__
182 attribute to access decorated class directly in unit tests
184 return _SingletonWrapper(cls)
187 def memoized(func: Callable) -> Callable:
188 """Keep a cache of previous function call results.
190 The cache here is a dict with a key based on the arguments to the
191 call. Consider also: functools.lru_cache for a more advanced
195 @functools.wraps(func)
196 def wrapper_memoized(*args, **kwargs):
197 cache_key = args + tuple(kwargs.items())
198 if cache_key not in wrapper_memoized.cache:
199 value = func(*args, **kwargs)
201 f"Memoizing {cache_key} => {value} for {func.__name__}"
203 wrapper_memoized.cache[cache_key] = value
205 logger.debug(f"Returning memoized value for {func.__name__}")
206 return wrapper_memoized.cache[cache_key]
207 wrapper_memoized.cache = dict()
208 return wrapper_memoized
214 predicate: Callable[..., bool],
215 delay_sec: float = 3.0,
216 backoff: float = 2.0,
218 """Retries a function or method up to a certain number of times
219 with a prescribed initial delay period and backoff rate.
221 tries is the maximum number of attempts to run the function.
222 delay_sec sets the initial delay period in seconds.
223 backoff is a multiplied (must be >1) used to modify the delay.
224 predicate is a function that will be passed the retval of the
225 decorated function and must return True to stop or False to
229 msg = f"backoff must be greater than or equal to 1, got {backoff}"
231 raise ValueError(msg)
233 tries = math.floor(tries)
235 msg = f"tries must be 0 or greater, got {tries}"
237 raise ValueError(msg)
240 msg = f"delay_sec must be greater than 0, got {delay_sec}"
242 raise ValueError(msg)
246 def f_retry(*args, **kwargs):
247 mtries, mdelay = tries, delay_sec # make mutable
248 logger.debug(f'deco_retry: will make up to {mtries} attempts...')
249 retval = f(*args, **kwargs)
251 if predicate(retval) is True:
252 logger.debug('Predicate succeeded, deco_retry is done.')
254 logger.debug("Predicate failed, sleeping and retrying.")
258 retval = f(*args, **kwargs)
264 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
265 return retry_predicate(
267 predicate=lambda x: x is True,
273 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
274 return retry_predicate(
276 predicate=lambda x: x is not None,
282 def deprecated(func):
283 """This is a decorator which can be used to mark functions
284 as deprecated. It will result in a warning being emitted
285 when the function is used.
288 @functools.wraps(func)
289 def wrapper_deprecated(*args, **kwargs):
290 msg = f"Call to deprecated function {func.__name__}"
292 warnings.warn(msg, category=DeprecationWarning)
293 return func(*args, **kwargs)
295 return wrapper_deprecated
300 Make a function immediately return a function of no args which,
301 when called, waits for the result, which will start being
302 processed in another thread.
305 @functools.wraps(func)
306 def lazy_thunked(*args, **kwargs):
307 wait_event = threading.Event()
314 func_result = func(*args, **kwargs)
315 result[0] = func_result
318 exc[1] = sys.exc_info() # (type, value, traceback)
319 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
328 raise exc[1][0](exc[1][1])
331 threading.Thread(target=worker_func).start()
337 ############################################################
339 ############################################################
341 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
343 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
346 def _raise_exception(exception, error_message: Optional[str]):
347 if error_message is None:
350 raise exception(error_message)
353 def _target(queue, function, *args, **kwargs):
354 """Run a function with arguments and return output via a queue.
356 This is a helper function for the Process created in _Timeout. It runs
357 the function with positional arguments and keyword arguments and then
358 returns the function's output by way of a queue. If an exception gets
359 raised, it is returned to _Timeout to be raised by the value property.
362 queue.put((True, function(*args, **kwargs)))
364 queue.put((False, sys.exc_info()[1]))
367 class _Timeout(object):
368 """Wrap a function and add a timeout (limit) attribute to it.
370 Instances of this class are automatically generated by the add_timeout
371 function defined below.
377 timeout_exception: Exception,
381 self.__limit = seconds
382 self.__function = function
383 self.__timeout_exception = timeout_exception
384 self.__error_message = error_message
385 self.__name__ = function.__name__
386 self.__doc__ = function.__doc__
387 self.__timeout = time.time()
388 self.__process = multiprocessing.Process()
389 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
391 def __call__(self, *args, **kwargs):
392 """Execute the embedded function object asynchronously.
394 The function given to the constructor is transparently called and
395 requires that "ready" be intermittently polled. If and when it is
396 True, the "value" property may then be checked for returned data.
398 self.__limit = kwargs.pop("timeout", self.__limit)
399 self.__queue = multiprocessing.Queue(1)
400 args = (self.__queue, self.__function) + args
401 self.__process = multiprocessing.Process(
402 target=_target, args=args, kwargs=kwargs
404 self.__process.daemon = True
405 self.__process.start()
406 if self.__limit is not None:
407 self.__timeout = self.__limit + time.time()
408 while not self.ready:
413 """Terminate any possible execution of the embedded function."""
414 if self.__process.is_alive():
415 self.__process.terminate()
416 _raise_exception(self.__timeout_exception, self.__error_message)
420 """Read-only property indicating status of "value" property."""
421 if self.__limit and self.__timeout < time.time():
423 return self.__queue.full() and not self.__queue.empty()
427 """Read-only property containing data returned from function."""
428 if self.ready is True:
429 flag, load = self.__queue.get()
436 seconds: float = 1.0,
437 use_signals: Optional[bool] = None,
438 timeout_exception=exceptions.TimeoutError,
439 error_message="Function call timed out",
441 """Add a timeout parameter to a function and return the function.
443 Note: the use_signals parameter is included in order to support
444 multiprocessing scenarios (signal can only be used from the process'
445 main thread). When not using signals, timeout granularity will be
446 rounded to the nearest 0.1s.
448 Raises an exception when the timeout is reached.
450 It is illegal to pass anything other than a function as the first
451 parameter. The function is wrapped and returned to the caller.
453 if use_signals is None:
455 use_signals = thread_utils.is_current_thread_main_thread()
457 def decorate(function):
460 def handler(signum, frame):
461 _raise_exception(timeout_exception, error_message)
463 @functools.wraps(function)
464 def new_function(*args, **kwargs):
465 new_seconds = kwargs.pop("timeout", seconds)
467 old = signal.signal(signal.SIGALRM, handler)
468 signal.setitimer(signal.ITIMER_REAL, new_seconds)
471 return function(*args, **kwargs)
474 return function(*args, **kwargs)
477 signal.setitimer(signal.ITIMER_REAL, 0)
478 signal.signal(signal.SIGALRM, old)
483 @functools.wraps(function)
484 def new_function(*args, **kwargs):
485 timeout_wrapper = _Timeout(
486 function, timeout_exception, error_message, seconds
488 return timeout_wrapper(*args, **kwargs)
495 class non_reentrant_code(object):
497 self._lock = threading.RLock
498 self._entered = False
500 def __call__(self, f):
501 def _gatekeeper(*args, **kwargs):
507 self._entered = False
512 class rlocked(object):
514 self._lock = threading.RLock
515 self._entered = False
517 def __call__(self, f):
518 def _gatekeeper(*args, **kwargs):
524 self._entered = False
528 def call_with_sample_rate(sample_rate: float) -> Callable:
529 if not 0.0 <= sample_rate <= 1.0:
530 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
532 raise ValueError(msg)
536 def _call_with_sample_rate(*args, **kwargs):
537 if random.uniform(0, 1) < sample_rate:
538 return f(*args, **kwargs)
541 f"@call_with_sample_rate skipping a call to {f.__name__}"
543 return _call_with_sample_rate
547 def decorate_matching_methods_with(decorator, acl=None):
548 """Apply decorator to all methods in a class whose names begin with
549 prefix. If prefix is None (default), decorate all methods in the
552 def decorate_the_class(cls):
553 for name, m in inspect.getmembers(cls, inspect.isfunction):
555 setattr(cls, name, decorator(m))
558 setattr(cls, name, decorator(m))
560 return decorate_the_class