11 import multiprocessing
18 from typing import Callable, Optional
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
26 logger = logging.getLogger(__name__)
29 def timed(func: Callable) -> Callable:
30 """Print the runtime of the decorated function."""
32 @functools.wraps(func)
33 def wrapper_timer(*args, **kwargs):
34 start_time = time.perf_counter()
35 value = func(*args, **kwargs)
36 end_time = time.perf_counter()
37 run_time = end_time - start_time
38 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
45 def invocation_logged(func: Callable) -> Callable:
46 """Log the call of a function."""
48 @functools.wraps(func)
49 def wrapper_invocation_logged(*args, **kwargs):
50 now = datetime.datetime.now()
51 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
52 msg = f"[{ts}]: Entered {func.__name__}"
55 ret = func(*args, **kwargs)
56 now = datetime.datetime.now()
57 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
58 msg = f"[{ts}]: Exited {func.__name__}"
62 return wrapper_invocation_logged
65 def debug_args(func: Callable) -> Callable:
66 """Print the function signature and return value at each call."""
68 @functools.wraps(func)
69 def wrapper_debug_args(*args, **kwargs):
70 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
71 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
72 signature = ", ".join(args_repr + kwargs_repr)
73 msg = f"Calling {func.__name__}({signature})"
76 value = func(*args, **kwargs)
77 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
80 return wrapper_debug_args
83 def debug_count_calls(func: Callable) -> Callable:
84 """Count function invocations and print a message befor every call."""
86 @functools.wraps(func)
87 def wrapper_debug_count_calls(*args, **kwargs):
88 wrapper_debug_count_calls.num_calls += 1
89 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
92 return func(*args, **kwargs)
93 wrapper_debug_count_calls.num_calls = 0
94 return wrapper_debug_count_calls
97 class DelayWhen(enum.Enum):
104 _func: Callable = None,
106 seconds: float = 1.0,
107 when: DelayWhen = DelayWhen.BEFORE_CALL,
109 """Delay the execution of a function by sleeping before and/or after.
111 Slow down a function by inserting a delay before and/or after its
115 def decorator_delay(func: Callable) -> Callable:
116 @functools.wraps(func)
117 def wrapper_delay(*args, **kwargs):
118 if when & DelayWhen.BEFORE_CALL:
120 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
123 retval = func(*args, **kwargs)
124 if when & DelayWhen.AFTER_CALL:
126 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
133 return decorator_delay
135 return decorator_delay(_func)
138 class _SingletonWrapper:
140 A singleton wrapper class. Its instances would be created
141 for each decorated class.
144 def __init__(self, cls):
145 self.__wrapped__ = cls
146 self._instance = None
148 def __call__(self, *args, **kwargs):
149 """Returns a single instance of decorated class"""
151 f"@singleton returning global instance of {self.__wrapped__.__name__}"
153 if self._instance is None:
154 self._instance = self.__wrapped__(*args, **kwargs)
155 return self._instance
160 A singleton decorator. Returns a wrapper objects. A call on that object
161 returns a single instance object of decorated class. Use the __wrapped__
162 attribute to access decorated class directly in unit tests
164 return _SingletonWrapper(cls)
167 def memoized(func: Callable) -> Callable:
168 """Keep a cache of previous function call results.
170 The cache here is a dict with a key based on the arguments to the
171 call. Consider also: functools.lru_cache for a more advanced
175 @functools.wraps(func)
176 def wrapper_memoized(*args, **kwargs):
177 cache_key = args + tuple(kwargs.items())
178 if cache_key not in wrapper_memoized.cache:
179 value = func(*args, **kwargs)
181 f"Memoizing {cache_key} => {value} for {func.__name__}"
183 wrapper_memoized.cache[cache_key] = value
185 logger.debug(f"Returning memoized value for {func.__name__}")
186 return wrapper_memoized.cache[cache_key]
187 wrapper_memoized.cache = dict()
188 return wrapper_memoized
194 predicate: Callable[..., bool],
195 delay_sec: float = 3.0,
196 backoff: float = 2.0,
198 """Retries a function or method up to a certain number of times
199 with a prescribed initial delay period and backoff rate.
201 tries is the maximum number of attempts to run the function.
202 delay_sec sets the initial delay period in seconds.
203 backoff is a multiplied (must be >1) used to modify the delay.
204 predicate is a function that will be passed the retval of the
205 decorated function and must return True to stop or False to
209 msg = f"backoff must be greater than or equal to 1, got {backoff}"
211 raise ValueError(msg)
213 tries = math.floor(tries)
215 msg = f"tries must be 0 or greater, got {tries}"
217 raise ValueError(msg)
220 msg = f"delay_sec must be greater than 0, got {delay_sec}"
222 raise ValueError(msg)
226 def f_retry(*args, **kwargs):
227 mtries, mdelay = tries, delay_sec # make mutable
228 logger.debug(f'deco_retry: will make up to {mtries} attempts...')
229 retval = f(*args, **kwargs)
231 if predicate(retval) is True:
232 logger.debug('Predicate succeeded, deco_retry is done.')
234 logger.debug("Predicate failed, sleeping and retrying.")
238 retval = f(*args, **kwargs)
244 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
245 return retry_predicate(
247 predicate=lambda x: x is True,
253 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
254 return retry_predicate(
256 predicate=lambda x: x is not None,
262 def deprecated(func):
263 """This is a decorator which can be used to mark functions
264 as deprecated. It will result in a warning being emitted
265 when the function is used.
268 @functools.wraps(func)
269 def wrapper_deprecated(*args, **kwargs):
270 msg = f"Call to deprecated function {func.__name__}"
272 warnings.warn(msg, category=DeprecationWarning)
273 return func(*args, **kwargs)
275 return wrapper_deprecated
280 Make a function immediately return a function of no args which,
281 when called, waits for the result, which will start being
282 processed in another thread.
285 @functools.wraps(func)
286 def lazy_thunked(*args, **kwargs):
287 wait_event = threading.Event()
294 func_result = func(*args, **kwargs)
295 result[0] = func_result
298 exc[1] = sys.exc_info() # (type, value, traceback)
299 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
308 raise exc[1][0](exc[1][1])
311 threading.Thread(target=worker_func).start()
317 ############################################################
319 ############################################################
321 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
323 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
326 def _raise_exception(exception, error_message: Optional[str]):
327 if error_message is None:
330 raise exception(error_message)
333 def _target(queue, function, *args, **kwargs):
334 """Run a function with arguments and return output via a queue.
336 This is a helper function for the Process created in _Timeout. It runs
337 the function with positional arguments and keyword arguments and then
338 returns the function's output by way of a queue. If an exception gets
339 raised, it is returned to _Timeout to be raised by the value property.
342 queue.put((True, function(*args, **kwargs)))
344 queue.put((False, sys.exc_info()[1]))
347 class _Timeout(object):
348 """Wrap a function and add a timeout (limit) attribute to it.
350 Instances of this class are automatically generated by the add_timeout
351 function defined below.
357 timeout_exception: Exception,
361 self.__limit = seconds
362 self.__function = function
363 self.__timeout_exception = timeout_exception
364 self.__error_message = error_message
365 self.__name__ = function.__name__
366 self.__doc__ = function.__doc__
367 self.__timeout = time.time()
368 self.__process = multiprocessing.Process()
369 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
371 def __call__(self, *args, **kwargs):
372 """Execute the embedded function object asynchronously.
374 The function given to the constructor is transparently called and
375 requires that "ready" be intermittently polled. If and when it is
376 True, the "value" property may then be checked for returned data.
378 self.__limit = kwargs.pop("timeout", self.__limit)
379 self.__queue = multiprocessing.Queue(1)
380 args = (self.__queue, self.__function) + args
381 self.__process = multiprocessing.Process(
382 target=_target, args=args, kwargs=kwargs
384 self.__process.daemon = True
385 self.__process.start()
386 if self.__limit is not None:
387 self.__timeout = self.__limit + time.time()
388 while not self.ready:
393 """Terminate any possible execution of the embedded function."""
394 if self.__process.is_alive():
395 self.__process.terminate()
396 _raise_exception(self.__timeout_exception, self.__error_message)
400 """Read-only property indicating status of "value" property."""
401 if self.__limit and self.__timeout < time.time():
403 return self.__queue.full() and not self.__queue.empty()
407 """Read-only property containing data returned from function."""
408 if self.ready is True:
409 flag, load = self.__queue.get()
416 seconds: float = 1.0,
417 use_signals: Optional[bool] = None,
418 timeout_exception=exceptions.TimeoutError,
419 error_message="Function call timed out",
421 """Add a timeout parameter to a function and return the function.
423 Note: the use_signals parameter is included in order to support
424 multiprocessing scenarios (signal can only be used from the process'
425 main thread). When not using signals, timeout granularity will be
426 rounded to the nearest 0.1s.
428 Raises an exception when the timeout is reached.
430 It is illegal to pass anything other than a function as the first
431 parameter. The function is wrapped and returned to the caller.
433 if use_signals is None:
435 use_signals = thread_utils.is_current_thread_main_thread()
437 def decorate(function):
441 def handler(signum, frame):
442 _raise_exception(timeout_exception, error_message)
444 @functools.wraps(function)
445 def new_function(*args, **kwargs):
446 new_seconds = kwargs.pop("timeout", seconds)
448 old = signal.signal(signal.SIGALRM, handler)
449 signal.setitimer(signal.ITIMER_REAL, new_seconds)
452 return function(*args, **kwargs)
455 return function(*args, **kwargs)
458 signal.setitimer(signal.ITIMER_REAL, 0)
459 signal.signal(signal.SIGALRM, old)
464 @functools.wraps(function)
465 def new_function(*args, **kwargs):
466 timeout_wrapper = _Timeout(
467 function, timeout_exception, error_message, seconds
469 return timeout_wrapper(*args, **kwargs)
476 class non_reentrant_code(object):
478 self._lock = threading.RLock
479 self._entered = False
481 def __call__(self, f):
482 def _gatekeeper(*args, **kwargs):
488 self._entered = False
493 class rlocked(object):
495 self._lock = threading.RLock
496 self._entered = False
498 def __call__(self, f):
499 def _gatekeeper(*args, **kwargs):
505 self._entered = False
509 def call_with_sample_rate(sample_rate: float) -> Callable:
510 if not 0.0 <= sample_rate <= 1.0:
511 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
513 raise ValueError(msg)
517 def _call_with_sample_rate(*args, **kwargs):
518 if random.uniform(0, 1) < sample_rate:
519 return f(*args, **kwargs)
522 f"@call_with_sample_rate skipping a call to {f.__name__}"
524 return _call_with_sample_rate
528 def decorate_matching_methods_with(decorator, acl=None):
529 """Apply decorator to all methods in a class whose names begin with
530 prefix. If prefix is None (default), decorate all methods in the
533 def decorate_the_class(cls):
534 for name, m in inspect.getmembers(cls, inspect.isfunction):
536 setattr(cls, name, decorator(m))
539 setattr(cls, name, decorator(m))
541 return decorate_the_class