10 import multiprocessing
17 from typing import Callable, Optional
22 logger = logging.getLogger(__name__)
25 def timed(func: Callable) -> Callable:
26 """Print the runtime of the decorated function."""
28 @functools.wraps(func)
29 def wrapper_timer(*args, **kwargs):
30 start_time = time.perf_counter()
31 value = func(*args, **kwargs)
32 end_time = time.perf_counter()
33 run_time = end_time - start_time
34 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
41 def invocation_logged(func: Callable) -> Callable:
42 """Log the call of a function."""
44 @functools.wraps(func)
45 def wrapper_invocation_logged(*args, **kwargs):
46 now = datetime.datetime.now()
47 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
48 msg = f"[{ts}]: Entered {func.__name__}"
51 ret = func(*args, **kwargs)
52 now = datetime.datetime.now()
53 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
54 msg = f"[{ts}]: Exited {func.__name__}"
58 return wrapper_invocation_logged
61 def debug_args(func: Callable) -> Callable:
62 """Print the function signature and return value at each call."""
64 @functools.wraps(func)
65 def wrapper_debug_args(*args, **kwargs):
66 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
67 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
68 signature = ", ".join(args_repr + kwargs_repr)
69 msg = f"Calling {func.__name__}({signature})"
72 value = func(*args, **kwargs)
73 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
76 return wrapper_debug_args
79 def debug_count_calls(func: Callable) -> Callable:
80 """Count function invocations and print a message befor every call."""
82 @functools.wraps(func)
83 def wrapper_debug_count_calls(*args, **kwargs):
84 wrapper_debug_count_calls.num_calls += 1
85 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
88 return func(*args, **kwargs)
89 wrapper_debug_count_calls.num_calls = 0
90 return wrapper_debug_count_calls
93 class DelayWhen(enum.Enum):
100 _func: Callable = None,
102 seconds: float = 1.0,
103 when: DelayWhen = DelayWhen.BEFORE_CALL,
105 """Delay the execution of a function by sleeping before and/or after.
107 Slow down a function by inserting a delay before and/or after its
111 def decorator_delay(func: Callable) -> Callable:
112 @functools.wraps(func)
113 def wrapper_delay(*args, **kwargs):
114 if when & DelayWhen.BEFORE_CALL:
116 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
119 retval = func(*args, **kwargs)
120 if when & DelayWhen.AFTER_CALL:
122 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
129 return decorator_delay
131 return decorator_delay(_func)
134 class _SingletonWrapper:
136 A singleton wrapper class. Its instances would be created
137 for each decorated class.
140 def __init__(self, cls):
141 self.__wrapped__ = cls
142 self._instance = None
144 def __call__(self, *args, **kwargs):
145 """Returns a single instance of decorated class"""
147 f"@singleton returning global instance of {self.__wrapped__.__name__}"
149 if self._instance is None:
150 self._instance = self.__wrapped__(*args, **kwargs)
151 return self._instance
156 A singleton decorator. Returns a wrapper objects. A call on that object
157 returns a single instance object of decorated class. Use the __wrapped__
158 attribute to access decorated class directly in unit tests
160 return _SingletonWrapper(cls)
163 def memoized(func: Callable) -> Callable:
164 """Keep a cache of previous function call results.
166 The cache here is a dict with a key based on the arguments to the
167 call. Consider also: functools.lru_cache for a more advanced
171 @functools.wraps(func)
172 def wrapper_memoized(*args, **kwargs):
173 cache_key = args + tuple(kwargs.items())
174 if cache_key not in wrapper_memoized.cache:
175 value = func(*args, **kwargs)
177 f"Memoizing {cache_key} => {value} for {func.__name__}"
179 wrapper_memoized.cache[cache_key] = value
181 logger.debug(f"Returning memoized value for {func.__name__}")
182 return wrapper_memoized.cache[cache_key]
183 wrapper_memoized.cache = dict()
184 return wrapper_memoized
190 predicate: Callable[..., bool],
191 delay_sec: float = 3,
192 backoff: float = 2.0,
194 """Retries a function or method up to a certain number of times
195 with a prescribed initial delay period and backoff rate.
197 tries is the maximum number of attempts to run the function.
198 delay_sec sets the initial delay period in seconds.
199 backoff is a multiplied (must be >1) used to modify the delay.
200 predicate is a function that will be passed the retval of the
201 decorated function and must return True to stop or False to
205 msg = f"backoff must be greater than or equal to 1, got {backoff}"
207 raise ValueError(msg)
209 tries = math.floor(tries)
211 msg = f"tries must be 0 or greater, got {tries}"
213 raise ValueError(msg)
216 msg = f"delay_sec must be greater than 0, got {delay_sec}"
218 raise ValueError(msg)
222 def f_retry(*args, **kwargs):
223 mtries, mdelay = tries, delay_sec # make mutable
224 retval = f(*args, **kwargs)
226 if predicate(retval) is True:
228 logger.debug("Predicate failed, sleeping and retrying.")
232 retval = f(*args, **kwargs)
238 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
239 return retry_predicate(
241 predicate=lambda x: x is True,
247 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
248 return retry_predicate(
250 predicate=lambda x: x is not None,
256 def deprecated(func):
257 """This is a decorator which can be used to mark functions
258 as deprecated. It will result in a warning being emitted
259 when the function is used.
262 @functools.wraps(func)
263 def wrapper_deprecated(*args, **kwargs):
264 msg = f"Call to deprecated function {func.__name__}"
266 warnings.warn(msg, category=DeprecationWarning)
267 return func(*args, **kwargs)
269 return wrapper_deprecated
274 Make a function immediately return a function of no args which,
275 when called, waits for the result, which will start being
276 processed in another thread.
279 @functools.wraps(func)
280 def lazy_thunked(*args, **kwargs):
281 wait_event = threading.Event()
288 func_result = func(*args, **kwargs)
289 result[0] = func_result
292 exc[1] = sys.exc_info() # (type, value, traceback)
293 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
302 raise exc[1][0](exc[1][1])
305 threading.Thread(target=worker_func).start()
311 ############################################################
313 ############################################################
315 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
317 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
320 class TimeoutError(AssertionError):
321 def __init__(self, value: str = "Timed Out"):
325 return repr(self.value)
328 def _raise_exception(exception, error_message: Optional[str]):
329 if error_message is None:
332 raise exception(error_message)
335 def _target(queue, function, *args, **kwargs):
336 """Run a function with arguments and return output via a queue.
338 This is a helper function for the Process created in _Timeout. It runs
339 the function with positional arguments and keyword arguments and then
340 returns the function's output by way of a queue. If an exception gets
341 raised, it is returned to _Timeout to be raised by the value property.
344 queue.put((True, function(*args, **kwargs)))
346 queue.put((False, sys.exc_info()[1]))
349 class _Timeout(object):
350 """Wrap a function and add a timeout (limit) attribute to it.
352 Instances of this class are automatically generated by the add_timeout
353 function defined below.
359 timeout_exception: Exception,
363 self.__limit = seconds
364 self.__function = function
365 self.__timeout_exception = timeout_exception
366 self.__error_message = error_message
367 self.__name__ = function.__name__
368 self.__doc__ = function.__doc__
369 self.__timeout = time.time()
370 self.__process = multiprocessing.Process()
371 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
373 def __call__(self, *args, **kwargs):
374 """Execute the embedded function object asynchronously.
376 The function given to the constructor is transparently called and
377 requires that "ready" be intermittently polled. If and when it is
378 True, the "value" property may then be checked for returned data.
380 self.__limit = kwargs.pop("timeout", self.__limit)
381 self.__queue = multiprocessing.Queue(1)
382 args = (self.__queue, self.__function) + args
383 self.__process = multiprocessing.Process(
384 target=_target, args=args, kwargs=kwargs
386 self.__process.daemon = True
387 self.__process.start()
388 if self.__limit is not None:
389 self.__timeout = self.__limit + time.time()
390 while not self.ready:
395 """Terminate any possible execution of the embedded function."""
396 if self.__process.is_alive():
397 self.__process.terminate()
398 _raise_exception(self.__timeout_exception, self.__error_message)
402 """Read-only property indicating status of "value" property."""
403 if self.__limit and self.__timeout < time.time():
405 return self.__queue.full() and not self.__queue.empty()
409 """Read-only property containing data returned from function."""
410 if self.ready is True:
411 flag, load = self.__queue.get()
418 seconds: float = 1.0,
419 use_signals: Optional[bool] = None,
420 timeout_exception=TimeoutError,
421 error_message="Function call timed out",
423 """Add a timeout parameter to a function and return the function.
425 Note: the use_signals parameter is included in order to support
426 multiprocessing scenarios (signal can only be used from the process'
427 main thread). When not using signals, timeout granularity will be
428 rounded to the nearest 0.1s.
430 Raises an exception when the timeout is reached.
432 It is illegal to pass anything other than a function as the first
433 parameter. The function is wrapped and returned to the caller.
435 if use_signals is None:
436 use_signals = thread_utils.is_current_thread_main_thread()
438 def decorate(function):
442 def handler(signum, frame):
443 _raise_exception(timeout_exception, error_message)
445 @functools.wraps(function)
446 def new_function(*args, **kwargs):
447 new_seconds = kwargs.pop("timeout", seconds)
449 old = signal.signal(signal.SIGALRM, handler)
450 signal.setitimer(signal.ITIMER_REAL, new_seconds)
453 return function(*args, **kwargs)
456 return function(*args, **kwargs)
459 signal.setitimer(signal.ITIMER_REAL, 0)
460 signal.signal(signal.SIGALRM, old)
465 @functools.wraps(function)
466 def new_function(*args, **kwargs):
467 timeout_wrapper = _Timeout(
468 function, timeout_exception, error_message, seconds
470 return timeout_wrapper(*args, **kwargs)
477 class non_reentrant_code(object):
479 self._lock = threading.RLock
480 self._entered = False
482 def __call__(self, f):
483 def _gatekeeper(*args, **kwargs):
489 self._entered = False
494 class rlocked(object):
496 self._lock = threading.RLock
497 self._entered = False
499 def __call__(self, f):
500 def _gatekeeper(*args, **kwargs):
506 self._entered = False
510 def call_with_sample_rate(sample_rate: float) -> Callable:
511 if not 0.0 <= sample_rate <= 1.0:
512 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
514 raise ValueError(msg)
518 def _call_with_sample_rate(*args, **kwargs):
519 if random.uniform(0, 1) < sample_rate:
520 return f(*args, **kwargs)
523 f"@call_with_sample_rate skipping a call to {f.__name__}"
525 return _call_with_sample_rate