11 import multiprocessing
18 from typing import Callable, Optional
25 logger = logging.getLogger(__name__)
28 def timed(func: Callable) -> Callable:
29 """Print the runtime of the decorated function."""
31 @functools.wraps(func)
32 def wrapper_timer(*args, **kwargs):
33 start_time = time.perf_counter()
34 value = func(*args, **kwargs)
35 end_time = time.perf_counter()
36 run_time = end_time - start_time
37 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
44 def invocation_logged(func: Callable) -> Callable:
45 """Log the call of a function."""
47 @functools.wraps(func)
48 def wrapper_invocation_logged(*args, **kwargs):
49 now = datetime.datetime.now()
50 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
51 msg = f"[{ts}]: Entered {func.__name__}"
54 ret = func(*args, **kwargs)
55 now = datetime.datetime.now()
56 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
57 msg = f"[{ts}]: Exited {func.__name__}"
61 return wrapper_invocation_logged
64 def debug_args(func: Callable) -> Callable:
65 """Print the function signature and return value at each call."""
67 @functools.wraps(func)
68 def wrapper_debug_args(*args, **kwargs):
69 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
70 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
71 signature = ", ".join(args_repr + kwargs_repr)
72 msg = f"Calling {func.__name__}({signature})"
75 value = func(*args, **kwargs)
76 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
79 return wrapper_debug_args
82 def debug_count_calls(func: Callable) -> Callable:
83 """Count function invocations and print a message befor every call."""
85 @functools.wraps(func)
86 def wrapper_debug_count_calls(*args, **kwargs):
87 wrapper_debug_count_calls.num_calls += 1
88 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
91 return func(*args, **kwargs)
92 wrapper_debug_count_calls.num_calls = 0
93 return wrapper_debug_count_calls
96 class DelayWhen(enum.Enum):
103 _func: Callable = None,
105 seconds: float = 1.0,
106 when: DelayWhen = DelayWhen.BEFORE_CALL,
108 """Delay the execution of a function by sleeping before and/or after.
110 Slow down a function by inserting a delay before and/or after its
114 def decorator_delay(func: Callable) -> Callable:
115 @functools.wraps(func)
116 def wrapper_delay(*args, **kwargs):
117 if when & DelayWhen.BEFORE_CALL:
119 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
122 retval = func(*args, **kwargs)
123 if when & DelayWhen.AFTER_CALL:
125 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
132 return decorator_delay
134 return decorator_delay(_func)
137 class _SingletonWrapper:
139 A singleton wrapper class. Its instances would be created
140 for each decorated class.
143 def __init__(self, cls):
144 self.__wrapped__ = cls
145 self._instance = None
147 def __call__(self, *args, **kwargs):
148 """Returns a single instance of decorated class"""
150 f"@singleton returning global instance of {self.__wrapped__.__name__}"
152 if self._instance is None:
153 self._instance = self.__wrapped__(*args, **kwargs)
154 return self._instance
159 A singleton decorator. Returns a wrapper objects. A call on that object
160 returns a single instance object of decorated class. Use the __wrapped__
161 attribute to access decorated class directly in unit tests
163 return _SingletonWrapper(cls)
166 def memoized(func: Callable) -> Callable:
167 """Keep a cache of previous function call results.
169 The cache here is a dict with a key based on the arguments to the
170 call. Consider also: functools.lru_cache for a more advanced
174 @functools.wraps(func)
175 def wrapper_memoized(*args, **kwargs):
176 cache_key = args + tuple(kwargs.items())
177 if cache_key not in wrapper_memoized.cache:
178 value = func(*args, **kwargs)
180 f"Memoizing {cache_key} => {value} for {func.__name__}"
182 wrapper_memoized.cache[cache_key] = value
184 logger.debug(f"Returning memoized value for {func.__name__}")
185 return wrapper_memoized.cache[cache_key]
186 wrapper_memoized.cache = dict()
187 return wrapper_memoized
193 predicate: Callable[..., bool],
194 delay_sec: float = 3,
195 backoff: float = 2.0,
197 """Retries a function or method up to a certain number of times
198 with a prescribed initial delay period and backoff rate.
200 tries is the maximum number of attempts to run the function.
201 delay_sec sets the initial delay period in seconds.
202 backoff is a multiplied (must be >1) used to modify the delay.
203 predicate is a function that will be passed the retval of the
204 decorated function and must return True to stop or False to
208 msg = f"backoff must be greater than or equal to 1, got {backoff}"
210 raise ValueError(msg)
212 tries = math.floor(tries)
214 msg = f"tries must be 0 or greater, got {tries}"
216 raise ValueError(msg)
219 msg = f"delay_sec must be greater than 0, got {delay_sec}"
221 raise ValueError(msg)
225 def f_retry(*args, **kwargs):
226 mtries, mdelay = tries, delay_sec # make mutable
227 retval = f(*args, **kwargs)
229 if predicate(retval) is True:
231 logger.debug("Predicate failed, sleeping and retrying.")
235 retval = f(*args, **kwargs)
241 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
242 return retry_predicate(
244 predicate=lambda x: x is True,
250 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
251 return retry_predicate(
253 predicate=lambda x: x is not None,
259 def deprecated(func):
260 """This is a decorator which can be used to mark functions
261 as deprecated. It will result in a warning being emitted
262 when the function is used.
265 @functools.wraps(func)
266 def wrapper_deprecated(*args, **kwargs):
267 msg = f"Call to deprecated function {func.__name__}"
269 warnings.warn(msg, category=DeprecationWarning)
270 return func(*args, **kwargs)
272 return wrapper_deprecated
277 Make a function immediately return a function of no args which,
278 when called, waits for the result, which will start being
279 processed in another thread.
282 @functools.wraps(func)
283 def lazy_thunked(*args, **kwargs):
284 wait_event = threading.Event()
291 func_result = func(*args, **kwargs)
292 result[0] = func_result
295 exc[1] = sys.exc_info() # (type, value, traceback)
296 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
305 raise exc[1][0](exc[1][1])
308 threading.Thread(target=worker_func).start()
314 ############################################################
316 ############################################################
318 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
320 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
323 def _raise_exception(exception, error_message: Optional[str]):
324 if error_message is None:
327 raise exception(error_message)
330 def _target(queue, function, *args, **kwargs):
331 """Run a function with arguments and return output via a queue.
333 This is a helper function for the Process created in _Timeout. It runs
334 the function with positional arguments and keyword arguments and then
335 returns the function's output by way of a queue. If an exception gets
336 raised, it is returned to _Timeout to be raised by the value property.
339 queue.put((True, function(*args, **kwargs)))
341 queue.put((False, sys.exc_info()[1]))
344 class _Timeout(object):
345 """Wrap a function and add a timeout (limit) attribute to it.
347 Instances of this class are automatically generated by the add_timeout
348 function defined below.
354 timeout_exception: Exception,
358 self.__limit = seconds
359 self.__function = function
360 self.__timeout_exception = timeout_exception
361 self.__error_message = error_message
362 self.__name__ = function.__name__
363 self.__doc__ = function.__doc__
364 self.__timeout = time.time()
365 self.__process = multiprocessing.Process()
366 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
368 def __call__(self, *args, **kwargs):
369 """Execute the embedded function object asynchronously.
371 The function given to the constructor is transparently called and
372 requires that "ready" be intermittently polled. If and when it is
373 True, the "value" property may then be checked for returned data.
375 self.__limit = kwargs.pop("timeout", self.__limit)
376 self.__queue = multiprocessing.Queue(1)
377 args = (self.__queue, self.__function) + args
378 self.__process = multiprocessing.Process(
379 target=_target, args=args, kwargs=kwargs
381 self.__process.daemon = True
382 self.__process.start()
383 if self.__limit is not None:
384 self.__timeout = self.__limit + time.time()
385 while not self.ready:
390 """Terminate any possible execution of the embedded function."""
391 if self.__process.is_alive():
392 self.__process.terminate()
393 _raise_exception(self.__timeout_exception, self.__error_message)
397 """Read-only property indicating status of "value" property."""
398 if self.__limit and self.__timeout < time.time():
400 return self.__queue.full() and not self.__queue.empty()
404 """Read-only property containing data returned from function."""
405 if self.ready is True:
406 flag, load = self.__queue.get()
413 seconds: float = 1.0,
414 use_signals: Optional[bool] = None,
415 timeout_exception=exceptions.TimeoutError,
416 error_message="Function call timed out",
418 """Add a timeout parameter to a function and return the function.
420 Note: the use_signals parameter is included in order to support
421 multiprocessing scenarios (signal can only be used from the process'
422 main thread). When not using signals, timeout granularity will be
423 rounded to the nearest 0.1s.
425 Raises an exception when the timeout is reached.
427 It is illegal to pass anything other than a function as the first
428 parameter. The function is wrapped and returned to the caller.
430 if use_signals is None:
431 use_signals = thread_utils.is_current_thread_main_thread()
433 def decorate(function):
437 def handler(signum, frame):
438 _raise_exception(timeout_exception, error_message)
440 @functools.wraps(function)
441 def new_function(*args, **kwargs):
442 new_seconds = kwargs.pop("timeout", seconds)
444 old = signal.signal(signal.SIGALRM, handler)
445 signal.setitimer(signal.ITIMER_REAL, new_seconds)
448 return function(*args, **kwargs)
451 return function(*args, **kwargs)
454 signal.setitimer(signal.ITIMER_REAL, 0)
455 signal.signal(signal.SIGALRM, old)
460 @functools.wraps(function)
461 def new_function(*args, **kwargs):
462 timeout_wrapper = _Timeout(
463 function, timeout_exception, error_message, seconds
465 return timeout_wrapper(*args, **kwargs)
472 class non_reentrant_code(object):
474 self._lock = threading.RLock
475 self._entered = False
477 def __call__(self, f):
478 def _gatekeeper(*args, **kwargs):
484 self._entered = False
489 class rlocked(object):
491 self._lock = threading.RLock
492 self._entered = False
494 def __call__(self, f):
495 def _gatekeeper(*args, **kwargs):
501 self._entered = False
505 def call_with_sample_rate(sample_rate: float) -> Callable:
506 if not 0.0 <= sample_rate <= 1.0:
507 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
509 raise ValueError(msg)
513 def _call_with_sample_rate(*args, **kwargs):
514 if random.uniform(0, 1) < sample_rate:
515 return f(*args, **kwargs)
518 f"@call_with_sample_rate skipping a call to {f.__name__}"
520 return _call_with_sample_rate
524 def decorate_matching_methods_with(decorator, acl=None):
525 """Apply decorator to all methods in a class whose names begin with
526 prefix. If prefix is None (default), decorate all methods in the
529 def decorate_the_class(cls):
530 for name, m in inspect.getmembers(cls, inspect.isfunction):
532 setattr(cls, name, decorator(m))
535 setattr(cls, name, decorator(m))
537 return decorate_the_class