11 import multiprocessing
18 from typing import Callable, Optional
24 logger = logging.getLogger(__name__)
27 def timed(func: Callable) -> Callable:
28 """Print the runtime of the decorated function."""
30 @functools.wraps(func)
31 def wrapper_timer(*args, **kwargs):
32 start_time = time.perf_counter()
33 value = func(*args, **kwargs)
34 end_time = time.perf_counter()
35 run_time = end_time - start_time
36 msg = f"Finished {func.__name__!r} in {run_time:.4f}s"
43 def invocation_logged(func: Callable) -> Callable:
44 """Log the call of a function."""
46 @functools.wraps(func)
47 def wrapper_invocation_logged(*args, **kwargs):
48 now = datetime.datetime.now()
49 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
50 msg = f"[{ts}]: Entered {func.__name__}"
53 ret = func(*args, **kwargs)
54 now = datetime.datetime.now()
55 ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
56 msg = f"[{ts}]: Exited {func.__name__}"
60 return wrapper_invocation_logged
63 def debug_args(func: Callable) -> Callable:
64 """Print the function signature and return value at each call."""
66 @functools.wraps(func)
67 def wrapper_debug_args(*args, **kwargs):
68 args_repr = [f"{repr(a)}:{type(a)}" for a in args]
69 kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
70 signature = ", ".join(args_repr + kwargs_repr)
71 msg = f"Calling {func.__name__}({signature})"
74 value = func(*args, **kwargs)
75 msg = f"{func.__name__!r} returned {value!r}:{type(value)}"
78 return wrapper_debug_args
81 def debug_count_calls(func: Callable) -> Callable:
82 """Count function invocations and print a message befor every call."""
84 @functools.wraps(func)
85 def wrapper_debug_count_calls(*args, **kwargs):
86 wrapper_debug_count_calls.num_calls += 1
87 msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
90 return func(*args, **kwargs)
91 wrapper_debug_count_calls.num_calls = 0
92 return wrapper_debug_count_calls
95 class DelayWhen(enum.Enum):
102 _func: Callable = None,
104 seconds: float = 1.0,
105 when: DelayWhen = DelayWhen.BEFORE_CALL,
107 """Delay the execution of a function by sleeping before and/or after.
109 Slow down a function by inserting a delay before and/or after its
113 def decorator_delay(func: Callable) -> Callable:
114 @functools.wraps(func)
115 def wrapper_delay(*args, **kwargs):
116 if when & DelayWhen.BEFORE_CALL:
118 f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
121 retval = func(*args, **kwargs)
122 if when & DelayWhen.AFTER_CALL:
124 f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
131 return decorator_delay
133 return decorator_delay(_func)
136 class _SingletonWrapper:
138 A singleton wrapper class. Its instances would be created
139 for each decorated class.
142 def __init__(self, cls):
143 self.__wrapped__ = cls
144 self._instance = None
146 def __call__(self, *args, **kwargs):
147 """Returns a single instance of decorated class"""
149 f"@singleton returning global instance of {self.__wrapped__.__name__}"
151 if self._instance is None:
152 self._instance = self.__wrapped__(*args, **kwargs)
153 return self._instance
158 A singleton decorator. Returns a wrapper objects. A call on that object
159 returns a single instance object of decorated class. Use the __wrapped__
160 attribute to access decorated class directly in unit tests
162 return _SingletonWrapper(cls)
165 def memoized(func: Callable) -> Callable:
166 """Keep a cache of previous function call results.
168 The cache here is a dict with a key based on the arguments to the
169 call. Consider also: functools.lru_cache for a more advanced
173 @functools.wraps(func)
174 def wrapper_memoized(*args, **kwargs):
175 cache_key = args + tuple(kwargs.items())
176 if cache_key not in wrapper_memoized.cache:
177 value = func(*args, **kwargs)
179 f"Memoizing {cache_key} => {value} for {func.__name__}"
181 wrapper_memoized.cache[cache_key] = value
183 logger.debug(f"Returning memoized value for {func.__name__}")
184 return wrapper_memoized.cache[cache_key]
185 wrapper_memoized.cache = dict()
186 return wrapper_memoized
192 predicate: Callable[..., bool],
193 delay_sec: float = 3,
194 backoff: float = 2.0,
196 """Retries a function or method up to a certain number of times
197 with a prescribed initial delay period and backoff rate.
199 tries is the maximum number of attempts to run the function.
200 delay_sec sets the initial delay period in seconds.
201 backoff is a multiplied (must be >1) used to modify the delay.
202 predicate is a function that will be passed the retval of the
203 decorated function and must return True to stop or False to
207 msg = f"backoff must be greater than or equal to 1, got {backoff}"
209 raise ValueError(msg)
211 tries = math.floor(tries)
213 msg = f"tries must be 0 or greater, got {tries}"
215 raise ValueError(msg)
218 msg = f"delay_sec must be greater than 0, got {delay_sec}"
220 raise ValueError(msg)
224 def f_retry(*args, **kwargs):
225 mtries, mdelay = tries, delay_sec # make mutable
226 retval = f(*args, **kwargs)
228 if predicate(retval) is True:
230 logger.debug("Predicate failed, sleeping and retrying.")
234 retval = f(*args, **kwargs)
240 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
241 return retry_predicate(
243 predicate=lambda x: x is True,
249 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
250 return retry_predicate(
252 predicate=lambda x: x is not None,
258 def deprecated(func):
259 """This is a decorator which can be used to mark functions
260 as deprecated. It will result in a warning being emitted
261 when the function is used.
264 @functools.wraps(func)
265 def wrapper_deprecated(*args, **kwargs):
266 msg = f"Call to deprecated function {func.__name__}"
268 warnings.warn(msg, category=DeprecationWarning)
269 return func(*args, **kwargs)
271 return wrapper_deprecated
276 Make a function immediately return a function of no args which,
277 when called, waits for the result, which will start being
278 processed in another thread.
281 @functools.wraps(func)
282 def lazy_thunked(*args, **kwargs):
283 wait_event = threading.Event()
290 func_result = func(*args, **kwargs)
291 result[0] = func_result
294 exc[1] = sys.exc_info() # (type, value, traceback)
295 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
304 raise exc[1][0](exc[1][1])
307 threading.Thread(target=worker_func).start()
313 ############################################################
315 ############################################################
317 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
319 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
322 def _raise_exception(exception, error_message: Optional[str]):
323 if error_message is None:
326 raise exception(error_message)
329 def _target(queue, function, *args, **kwargs):
330 """Run a function with arguments and return output via a queue.
332 This is a helper function for the Process created in _Timeout. It runs
333 the function with positional arguments and keyword arguments and then
334 returns the function's output by way of a queue. If an exception gets
335 raised, it is returned to _Timeout to be raised by the value property.
338 queue.put((True, function(*args, **kwargs)))
340 queue.put((False, sys.exc_info()[1]))
343 class _Timeout(object):
344 """Wrap a function and add a timeout (limit) attribute to it.
346 Instances of this class are automatically generated by the add_timeout
347 function defined below.
353 timeout_exception: Exception,
357 self.__limit = seconds
358 self.__function = function
359 self.__timeout_exception = timeout_exception
360 self.__error_message = error_message
361 self.__name__ = function.__name__
362 self.__doc__ = function.__doc__
363 self.__timeout = time.time()
364 self.__process = multiprocessing.Process()
365 self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
367 def __call__(self, *args, **kwargs):
368 """Execute the embedded function object asynchronously.
370 The function given to the constructor is transparently called and
371 requires that "ready" be intermittently polled. If and when it is
372 True, the "value" property may then be checked for returned data.
374 self.__limit = kwargs.pop("timeout", self.__limit)
375 self.__queue = multiprocessing.Queue(1)
376 args = (self.__queue, self.__function) + args
377 self.__process = multiprocessing.Process(
378 target=_target, args=args, kwargs=kwargs
380 self.__process.daemon = True
381 self.__process.start()
382 if self.__limit is not None:
383 self.__timeout = self.__limit + time.time()
384 while not self.ready:
389 """Terminate any possible execution of the embedded function."""
390 if self.__process.is_alive():
391 self.__process.terminate()
392 _raise_exception(self.__timeout_exception, self.__error_message)
396 """Read-only property indicating status of "value" property."""
397 if self.__limit and self.__timeout < time.time():
399 return self.__queue.full() and not self.__queue.empty()
403 """Read-only property containing data returned from function."""
404 if self.ready is True:
405 flag, load = self.__queue.get()
412 seconds: float = 1.0,
413 use_signals: Optional[bool] = None,
414 timeout_exception=exceptions.TimeoutError,
415 error_message="Function call timed out",
417 """Add a timeout parameter to a function and return the function.
419 Note: the use_signals parameter is included in order to support
420 multiprocessing scenarios (signal can only be used from the process'
421 main thread). When not using signals, timeout granularity will be
422 rounded to the nearest 0.1s.
424 Raises an exception when the timeout is reached.
426 It is illegal to pass anything other than a function as the first
427 parameter. The function is wrapped and returned to the caller.
429 if use_signals is None:
431 use_signals = thread_utils.is_current_thread_main_thread()
433 def decorate(function):
437 def handler(signum, frame):
438 _raise_exception(timeout_exception, error_message)
440 @functools.wraps(function)
441 def new_function(*args, **kwargs):
442 new_seconds = kwargs.pop("timeout", seconds)
444 old = signal.signal(signal.SIGALRM, handler)
445 signal.setitimer(signal.ITIMER_REAL, new_seconds)
448 return function(*args, **kwargs)
451 return function(*args, **kwargs)
454 signal.setitimer(signal.ITIMER_REAL, 0)
455 signal.signal(signal.SIGALRM, old)
460 @functools.wraps(function)
461 def new_function(*args, **kwargs):
462 timeout_wrapper = _Timeout(
463 function, timeout_exception, error_message, seconds
465 return timeout_wrapper(*args, **kwargs)
472 class non_reentrant_code(object):
474 self._lock = threading.RLock
475 self._entered = False
477 def __call__(self, f):
478 def _gatekeeper(*args, **kwargs):
484 self._entered = False
489 class rlocked(object):
491 self._lock = threading.RLock
492 self._entered = False
494 def __call__(self, f):
495 def _gatekeeper(*args, **kwargs):
501 self._entered = False
505 def call_with_sample_rate(sample_rate: float) -> Callable:
506 if not 0.0 <= sample_rate <= 1.0:
507 msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
509 raise ValueError(msg)
513 def _call_with_sample_rate(*args, **kwargs):
514 if random.uniform(0, 1) < sample_rate:
515 return f(*args, **kwargs)
518 f"@call_with_sample_rate skipping a call to {f.__name__}"
520 return _call_with_sample_rate
524 def decorate_matching_methods_with(decorator, acl=None):
525 """Apply decorator to all methods in a class whose names begin with
526 prefix. If prefix is None (default), decorate all methods in the
529 def decorate_the_class(cls):
530 for name, m in inspect.getmembers(cls, inspect.isfunction):
532 setattr(cls, name, decorator(m))
535 setattr(cls, name, decorator(m))
537 return decorate_the_class