#!/usr/bin/env python3
# © Copyright 2021-2023, Scott Gasch
# A portion (marked) below retain the original author's copyright.
# TODO(scott): Make type hints PEP 612 compliant once python 3.10
# is a reasonable minimum version requirement for this library.
# https://peps.python.org/pep-0612/#specification
"""This is a grab bag of, hopefully, useful decorators."""
import enum
import functools
import inspect
import logging
import math
import multiprocessing
import random
import signal
import sys
import threading
import time
import traceback
import warnings
from typing import Any, Callable, List, NoReturn, Optional, Union
from pyutils.exceptions import PyUtilsException
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
logger = logging.getLogger(__name__)
DEFAULT_LOCK = threading.RLock()
[docs]
def timed(func: Callable) -> Callable:
"""Prints + info logs the runtime of the decorated function at
each invocation.
>>> @timed
... def foo():
... import time
... time.sleep(0.01)
>>> foo() # doctest: +ELLIPSIS
Finished foo in ...
"""
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
msg = f"Finished {func.__qualname__} in {run_time:.4f}s"
print(msg)
logger.info(msg)
return value
return wrapper_timer
[docs]
def invocation_logged(func: Callable) -> Callable:
"""Log the call of a function on sys.stdout and the info log.
>>> @invocation_logged
... def foo():
... print('Hello, world.')
>>> foo()
Entered foo
Hello, world.
Exited foo
"""
@functools.wraps(func)
def wrapper_invocation_logged(*args, **kwargs):
msg = f"Entered {func.__qualname__}"
print(msg)
logger.info(msg)
ret = func(*args, **kwargs)
msg = f"Exited {func.__qualname__}"
print(msg)
logger.info(msg)
return ret
return wrapper_invocation_logged
[docs]
def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
"""Limit invocation of a wrapped function to n calls per time period.
Thread-safe. In testing this was relatively fair with multiple
threads using it though that hasn't been measured in detail.
.. note::
The doctest below makes use of
:py:class:`pyutils.parallelize.thread_utils.background_thread`. See
that class' documentation for details.
>>> import time
>>> from pyutils import decorator_utils
>>> from pyutils.parallelize import thread_utils
>>> calls = 0
>>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
... def limited(x: int):
... global calls
... calls += 1
>>> @thread_utils.background_thread
... def a(stop):
... for _ in range(3):
... limited(_)
>>> @thread_utils.background_thread
... def b(stop):
... for _ in range(3):
... limited(_)
>>> start = time.time()
>>> (thread1, event1) = a()
>>> (thread2, event2) = b()
>>> thread1.join()
>>> thread2.join()
>>> end = time.time()
>>> dur = end - start
>>> dur > 0.5
True
>>> calls
6
"""
min_interval_seconds = per_period_in_seconds / float(n_calls)
def wrapper_rate_limited(func: Callable) -> Callable:
cv = threading.Condition()
last_invocation_timestamp = [0.0]
def may_proceed() -> float:
now = time.time()
last_invocation = last_invocation_timestamp[0]
if last_invocation != 0.0:
elapsed_since_last = now - last_invocation
wait_time = min_interval_seconds - elapsed_since_last
else:
wait_time = 0.0
logger.debug("@%.4f> wait_time = %.4f", time.time(), wait_time)
return wait_time
def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
with cv:
while True:
if cv.wait_for(
lambda: may_proceed() <= 0.0,
timeout=may_proceed(),
):
break
with cv:
logger.debug("@%.4f> calling it...", time.time())
ret = func(*args, **kargs)
last_invocation_timestamp[0] = time.time()
logger.debug(
"@%.4f> Last invocation <- %.4f",
time.time(),
last_invocation_timestamp[0],
)
cv.notify()
return ret
return wrapper_wrapper_rate_limited
return wrapper_rate_limited
[docs]
def debug_args(func: Callable) -> Callable:
"""Print the function signature and return value at each call.
>>> @debug_args
... def foo(a, b, c):
... print(a)
... print(b)
... print(c)
... return (a + b, c)
>>> foo(1, 2.0, "test")
Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>)
1
2.0
test
foo returned (3.0, 'test'):<class 'tuple'>
(3.0, 'test')
"""
@functools.wraps(func)
def wrapper_debug_args(*args, **kwargs):
args_repr = [f"{repr(a)}:{type(a)}" for a in args]
kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
msg = f"Calling {func.__qualname__}({signature})"
print(msg)
logger.info(msg)
value = func(*args, **kwargs)
msg = f"{func.__qualname__} returned {value!r}:{type(value)}"
print(msg)
logger.info(msg)
return value
return wrapper_debug_args
[docs]
def debug_count_calls(func: Callable) -> Callable:
"""Count function invocations and print a message befor every call.
>>> @debug_count_calls
... def factoral(x):
... if x == 1:
... return 1
... return x * factoral(x - 1)
>>> factoral(5)
Call #1 of 'factoral'
Call #2 of 'factoral'
Call #3 of 'factoral'
Call #4 of 'factoral'
Call #5 of 'factoral'
120
"""
@functools.wraps(func)
def wrapper_debug_count_calls(*args, **kwargs):
wrapper_debug_count_calls.num_calls += 1
msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
print(msg)
logger.info(msg)
return func(*args, **kwargs)
wrapper_debug_count_calls.num_calls = 0 # type: ignore
return wrapper_debug_count_calls
[docs]
class DelayWhen(enum.IntEnum):
"""This enum is used with the `@delay` decorator to indicate that the
delay should happen before wrapped function invocation, after wrapped
function invocation, or both.
See: :py:meth:`delay`.
"""
BEFORE_CALL = 1
AFTER_CALL = 2
BEFORE_AND_AFTER = 3
[docs]
def delay(
_func: Optional[Callable] = None,
*,
seconds: float = 1.0,
when: DelayWhen = DelayWhen.BEFORE_CALL,
) -> Callable:
"""Slow down a function by inserting a delay before and/or after its
invocation.
Args:
seconds: how long should we delay (via a simple `time.sleep()`)?
when: when should we delay.. before the invocation, after it, or both?
>>> @delay(seconds=1.0)
... def foo():
... pass
>>> import time
>>> start = time.time()
>>> foo()
>>> dur = time.time() - start
>>> dur >= 1.0
True
"""
def decorator_delay(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_delay(*args, **kwargs):
if when & DelayWhen.BEFORE_CALL:
logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__)
time.sleep(seconds)
retval = func(*args, **kwargs)
if when & DelayWhen.AFTER_CALL:
logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__)
time.sleep(seconds)
return retval
return wrapper_delay
if _func is None:
return decorator_delay
else:
return decorator_delay(_func)
class _SingletonWrapper:
"""An internal singleton wrapper class. Its instances are created
for each decorated class.
"""
def __init__(self, cls):
self.__wrapped__ = cls
self._instance = None
self.lock = threading.Lock()
def __call__(self, *args, **kwargs):
"""Returns a single instance of decorated class"""
logger.debug(
"@singleton returning global instance of %s", self.__wrapped__.__name__
)
with self.lock:
if self._instance is None:
self._instance = self.__wrapped__(*args, **kwargs)
return self._instance
[docs]
def singleton(cls):
"""
A singleton decorator; adding this to a class results in the decorator making
sure that there exists only one instance of that class globally in the
program by creating an instance the first time the class is constructed
and then returning the previously created singleton instance on subsequent
creation requests.
See also :py:meth:`pyutils.persistent.persistent_autoloaded_singleton`.
>>> @singleton
... class global_configuration(object):
... pass
>>> a = global_configuration()
>>> b = global_configuration()
>>> a is b
True
>>> id(a) == id(b)
True
"""
return _SingletonWrapper(cls)
[docs]
def memoized(func: Callable) -> Callable:
"""Keep a cache of previous function call results. Use this with
pure functions without side effects that do expensive work.
The internal cache is a simple dict with a key based on the
arguments to the call so the result of the function must be determined
only by its parameters (i.e. it must be "functional") or this will
introduce errors. See:
https://en.wikipedia.org/wiki/Functional_programming#Pure_functions
Consider also: :py:meth:`functools.cache` for a more advanced
implementation. See:
https://docs.python.org/3/library/functools.html#functools.cache
>>> import time
>>> @memoized
... def expensive(arg) -> int:
... # Simulate something slow to compute or lookup, like a
... # computationally expensive task or a network read of
... # static data (i.e. that should never change).
... time.sleep(1.0)
... return arg * arg
>>> start = time.time()
>>> expensive(5) # Takes about 1 sec
25
>>> expensive(3) # Also takes about 1 sec
9
>>> expensive(5) # Pulls from cache, fast
25
>>> expensive(3) # Pulls from cache again, fast
9
>>> dur = time.time() - start
>>> dur < 3.0
True
"""
@functools.wraps(func)
def wrapper_memoized(*args, **kwargs):
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
logger.debug("Memoizing %s => %s for %s", cache_key, value, func.__name__)
wrapper_memoized.cache[cache_key] = value
else:
logger.debug("Returning memoized value for %s", {func.__name__})
return wrapper_memoized.cache[cache_key]
wrapper_memoized.cache = {} # type: ignore
return wrapper_memoized
[docs]
def normal_delay_helper(delay: float) -> None:
time.sleep(delay)
[docs]
def jittery_delay_helper(delay: float) -> None:
jitter = delay * 0.25
jitter *= random.random()
if random.choice(['add', 'subtract']) == 'add':
delay += jitter
else:
delay -= jitter
normal_delay_helper(delay)
[docs]
def predicated_retry_with_backoff(
tries: int,
*,
predicate: Callable[..., bool],
delay_sec: float = 3.0,
backoff: float = 2.0,
on_attempt: Optional[Callable[..., None]] = None,
on_success: Optional[Callable[..., None]] = None,
on_failure: Optional[Callable[..., None]] = None,
delay_helper: Optional[Callable[[float], None]] = None,
raise_on_repeated_failures: bool = False,
):
"""Retries a function or method up to a certain number of times with a
prescribed initial delay period and backoff rate (multiplier). Note
that :py:meth:`retry_if_false` and :py:meth:`retry_if_none` both
use this class with a predefined predicate but you can also use
it directly with your own custom predicate.
Args:
tries: the maximum number of attempts to run the function
delay_sec: sets the initial delay period in seconds
backoff: a multiplier (must be >=1.0) used to modify the
delay at each subsequent invocation
predicate: a Callable that will be passed the retval of
the decorated function and must return True to indicate
that we should stop calling or False to indicate a retry
is necessary
on_attempt: an optional callable to be invoked at each attempt
on_success: an optional callable to be invoked on success
on_failure: an optional callable to be invoked on failure
raise_on_repeated_failures: if True, raise a PyUtilsException
if the wrapped function never succeeds (as indicated by
the predicate). Otherwise simply returns the final error
result.
Raises:
ValueError: on invalid arguments; e.g. backoff must be >= 1.0,
delay_sec must be >= 0.0, tries must be > 0.
PyUtilsException: if raise_on_repeated_failures is True and
the wrapped function fails tries times. Otherwise simply
returns the final error result.
Example usage that would call `make_the_RPC_call` up to three
times (as long as it returns a tuple with `False` in the second
element) with a delay of 1.0s the first time, 2.0s the second
time, and 4.0s the third time.::
@decorator_utils.predicated_retry_with_backoff(
3,
predicate=lambda _: _[2] is False,
delay_sec=1.0,
backoff=2
)
def make_the_RPC_call() -> Tuple[str, int, bool]:
whatever
"""
if backoff < 1.0:
msg = f"backoff must be greater than or equal to 1, got {backoff}"
logger.critical(msg)
raise ValueError(msg)
tries = math.floor(tries)
if tries < 0:
msg = f"tries must be 0 or greater, got {tries}"
logger.critical(msg)
raise ValueError(msg)
if delay_sec <= 0:
msg = f"delay_sec must be greater than 0, got {delay_sec}"
logger.critical(msg)
raise ValueError(msg)
def deco_retry(f):
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
# Delay evaluation of this until execution time to allow
# callers to use 'unittest.mock.patch' on normal_delay_helper.
mdelay_helper = delay_helper if delay_helper else normal_delay_helper
logger.debug("deco_retry: will make up to %d attempts...", mtries)
while True:
logger.debug('Calling wrapped function; up to %d tries remain.', mtries)
if on_attempt:
on_attempt()
retval = f(*args, **kwargs)
if predicate(retval) is True:
logger.debug("Predicate indicates succeess, we're done.")
if on_success:
on_success()
return retval
else:
logger.error("Predicate indicates failure...")
if on_failure:
on_failure()
mtries -= 1
if mtries <= 0:
msg = f'Tried wrapped function {tries}x with backoffs between them; it always failed.'
if raise_on_repeated_failures:
raise PyUtilsException(msg)
else:
logger.error(msg)
return retval
logger.debug('Sleeping for %5.1f sec', mdelay)
mdelay_helper(mdelay)
mdelay *= backoff
return f_retry
return deco_retry
[docs]
def retry_if_false(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
"""A helper for `@predicated_retry_with_backoff` that retries a
decorated function as long as it keeps returning False.
Args:
tries: max number of times to retry
delay_sec: initial delay before retry length in seconds
backoff: a multiplier (must be >= 1.0) used to optionally increase
subsequent delays on repeated failures.
.. note::
If after `tries` attempts the wrapped function is still
failing, this code returns the failure result (i.e. False) to
the caller.
>>> import time
>>> counter = 0
>>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
... def foo():
... global counter
... counter += 1
... return counter >= 3
>>> start = time.time()
>>> foo() # fail, delay 1.0, fail, delay 1.1, succeed
True
>>> dur = time.time() - start
>>> counter
3
>>> dur > 2.0
True
>>> dur < 2.3
True
"""
return predicated_retry_with_backoff(
tries,
predicate=lambda x: x is True,
delay_sec=delay_sec,
backoff=backoff,
)
[docs]
def retry_if_none(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
"""A helper for `@predicated_retry_with_backoff` that continues to
invoke the wrapped function as long as it keeps returning None.
Retries up to N times with a delay between each retry and a
backoff that can increase the delay.
Args:
tries: max number of times to retry
delay_sec: initial delay before retry length in seconds
backoff: a multiplier (must be >= 1.0) used to optionally increase
subsequent delays on repeated failures.
.. note::
If after `tries` attempts the wrapped function is still
failing, this code returns the failure result (i.e. None) to
the caller.
Example usage... calls a function that reads a URL from the network
and returns the raw HTTP response or None on error with up to three
retries with an increasing backoff::
@retry_if_none(3, delay_sec=1.0, backoff=4.0)
def fetch_the_image(url: str) -> Optional[bytes]:
r = requests.get(url)
if r.status_code != 200:
return None
return r.content
# Use normally
image_binary_data = fetch_the_image(
'https://www.whatever.com/foo/bar/baz.jpg'
)
# Note: even with retries this might still fail; be prepared
# to still receive a None return value.
if image_binary_data is None:
raise Exception(f"Couldn't read {url}?!")
"""
return predicated_retry_with_backoff(
tries,
predicate=lambda x: x is not None,
delay_sec=delay_sec,
backoff=backoff,
)
[docs]
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used. The warning includes the caller
as determined by examining the stack in the warning log.
>>> @deprecated
... def foo() -> None:
... pass
>>> foo() # prints + logs "Call to deprecated function foo"
"""
@functools.wraps(func)
def wrapper_deprecated(*args, **kwargs):
msg = f"Call to deprecated function {func.__qualname__}"
logger.warning(msg)
warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
print(msg, file=sys.stderr)
return func(*args, **kwargs)
return wrapper_deprecated
[docs]
def thunkify(func):
"""Make a function immediately return a function of no args which,
when called, waits for the original result. Meanwhile spin up a
background thread to begin computing the result in parallel.
Example usage... hide a slow network read behind a thunk that will
block only when it is called::
@thunkify
def read_url(url: str) -> Result:
make a slow network read
urls = [ long list of urls ]
results = []
for url in urls:
results.append(read_url(url))
In this example, we will start one background thread per url(!!)
requested. The result of read_url is no longer a `Result` but
rather a `Callable` (see `thunk` below) that, when invoked, awaits
the Result and returns it.
For more control over things like the number of worker threads and
the ability cause work to be done on background processes or even
on other machines, see
:py:class:`pyutils.parallelize.SmartFuture`,
:py:class:`pyutils.parallelize.DeferredOperation` and
:py:mod:`pyutils.parallelize.parallelize`.
"""
@functools.wraps(func)
def lazy_thunked(*args, **kwargs):
wait_event = threading.Event()
result = [None]
exc: List[Any] = [False, None]
def worker_func():
try:
func_result = func(*args, **kwargs)
result[0] = func_result
except Exception:
exc[0] = True
exc[1] = sys.exc_info() # (type, value, traceback)
msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
logger.warning(msg)
finally:
wait_event.set()
def thunk():
wait_event.wait()
if exc[0]:
assert exc[1]
raise exc[1][0](exc[1][1])
return result[0]
threading.Thread(target=worker_func).start()
return thunk
return lazy_thunked
############################################################
# Timeout
############################################################
# http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
# Used work of Stephen "Zero" Chappell <[email protected]>
# in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
# Original work is covered by PSF-2.0:
# 1. This LICENSE AGREEMENT is between the Python Software Foundation
# ("PSF"), and the Individual or Organization ("Licensee") accessing
# and otherwise using this software ("Python") in source or binary
# form and its associated documentation.
#
# 2. Subject to the terms and conditions of this License Agreement,
# PSF hereby grants Licensee a nonexclusive, royalty-free, world-wide
# license to reproduce, analyze, test, perform and/or display
# publicly, prepare derivative works, distribute, and otherwise use
# Python alone or in any derivative version, provided, however, that
# PSF's License Agreement and PSF's notice of copyright, i.e.,
# "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 Python Software
# Foundation; All Rights Reserved" are retained in Python alone or in
# any derivative version prepared by Licensee.
# 3. In the event Licensee prepares a derivative work that is based on
# or incorporates Python or any part thereof, and wants to make the
# derivative work available to others as provided herein, then
# Licensee hereby agrees to include in any such work a brief summary
# of the changes made to Python.
# (N.B. See `NOTICE <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`__ file in the root of this module for a list
# of changes)
# 4. PSF is making Python available to Licensee on an "AS IS"
# basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
# IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
# DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR
# FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL
# NOT INFRINGE ANY THIRD PARTY RIGHTS.
# 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
# FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A
# RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, OR ANY
# DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
# 6. This License Agreement will automatically terminate upon a
# material breach of its terms and conditions.
# 7. Nothing in this License Agreement shall be deemed to create any
# relationship of agency, partnership, or joint venture between PSF
# and Licensee. This License Agreement does not grant permission to
# use PSF trademarks or trade name in a trademark sense to endorse or
# promote products or services of Licensee, or any third party.
# 8. By copying, installing or otherwise using Python, Licensee agrees
# to be bound by the terms and conditions of this License Agreement.
def _raise_exception(exception, error_message: Optional[str]) -> NoReturn:
"""Internal. Raise a deferred exception"""
if error_message is None:
raise Exception(exception)
else:
raise Exception(error_message)
def _target(queue, function, *args, **kwargs):
"""Run a function with arguments and return output via a queue.
This is a helper function for the Process created in _Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception gets
raised, it is returned to _Timeout to be raised by the value property.
"""
try:
queue.put((True, function(*args, **kwargs)))
except Exception:
queue.put((False, sys.exc_info()[1]))
class _Timeout(object):
"""Wrap a function and add a timeout to it.
.. warning::
Instances of this class are automatically generated by the
:py:meth:`timeout` function defined below. Do not use
directly. Example usage on :py:meth:`timeout`.
"""
def __init__(
self,
function: Callable,
timeout_exception: Exception,
error_message: str,
seconds: float,
):
"""
.. warning::
Instances of this class are automatically generated by the
:py:meth:`timeout` function defined below. Do not use
directly. Example usage on :py:meth:`timeout`.
"""
self.__limit = seconds
self.__function = function
self.__timeout_exception = timeout_exception
self.__error_message = error_message
self.__name__ = function.__name__
self.__doc__ = function.__doc__
self.__timeout = time.time()
self.__process = multiprocessing.Process()
self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
def __call__(self, *args, **kwargs):
"""Execute the embedded function object asynchronously.
The function given to the constructor is transparently called and
requires that "ready" be intermittently polled. If and when it is
True, the "value" property may then be checked for returned data.
"""
self.__limit = kwargs.pop("timeout", self.__limit)
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = multiprocessing.Process(
target=_target, args=args, kwargs=kwargs
)
self.__process.daemon = True
self.__process.start()
if self.__limit is not None:
self.__timeout = self.__limit + time.time()
while not self.ready:
time.sleep(0.1)
return self.value
def cancel(self):
"""Terminate any possible execution of the embedded function."""
if self.__process.is_alive():
self.__process.terminate()
_raise_exception(self.__timeout_exception, self.__error_message)
@property
def ready(self):
"""Read-only property indicating status of "value" property."""
if self.__limit and self.__timeout < time.time():
self.cancel()
return self.__queue.full() and not self.__queue.empty()
@property
def value(self):
"""Read-only property containing data returned from function."""
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
return None
[docs]
def timeout(
seconds: float = 1.0,
use_signals: Optional[bool] = None,
timeout_exception=TimeoutError,
error_message="Function call timed out",
):
"""Add a timeout to a function. If the function takes longer than
the given timeout (in seconds) it will raise an exception and
return control to the caller.
.. note::
the use_signals parameter is included in order to support
multiprocessing scenarios (signal can only be used from the
process' main thread). When not using signals, timeout
granularity will be rounded to the nearest 0.1s and will poll.
.. warning::
Beware that a @timeout on a function inside at the
module-level will be evaluated at module load time and not
when the wrapped function is invoked. This is somewhat
counterintuitive and tricky and it can lead to problems when
relying on the automatic main thread detection code
(`use_signals=None`, the default) since the import probably
happens on the main thread and the invocation can happen on a
different thread (one which can't use signals). If in doubt,
do not use the automatic signal safety logic and set their
`use_signals` argument explicitly.
Raises:
Exception: the timeout was reached
It is illegal to pass anything other than a function as the first
parameter. The function is wrapped and returned to the caller.
>>> @timeout(0.2)
... def foo(delay: float):
... time.sleep(delay)
... return "ok"
>>> foo(0)
'ok'
>>> foo(1.0)
Traceback (most recent call last):
...
Exception: Function call timed out
"""
if use_signals is None:
import pyutils.parallelize.thread_utils as tu
use_signals = tu.is_current_thread_main_thread()
# Please see warning above!!!
def decorate(function):
if use_signals:
def handler(unused_signum, unused_frame):
_raise_exception(timeout_exception, error_message)
@functools.wraps(function)
def new_function(*args, **kwargs):
new_seconds = kwargs.pop("timeout", seconds)
if new_seconds:
old = signal.signal(signal.SIGALRM, handler)
signal.setitimer(signal.ITIMER_REAL, new_seconds)
if not seconds:
return function(*args, **kwargs)
try:
return function(*args, **kwargs)
finally:
if new_seconds:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old)
return new_function
else:
@functools.wraps(function)
def new_function(*args, **kwargs):
timeout_wrapper = _Timeout(
function, timeout_exception, error_message, seconds
)
return timeout_wrapper(*args, **kwargs)
return new_function
return decorate
[docs]
def synchronized(
_func=None, *, lock: Union[None, threading.Lock, threading.RLock] = None
) -> Callable:
"""Emulates java's "synchronized" keyword: given a lock, require
that threads take that lock (or wait) before invoking the wrapped
function and automatically releases the lock afterwards.
Args:
lock: the lock that must be held to invoke the wrapped function.
Example usage. Imagine we have shared state between multiple thread
or processes and, to update the shared state, code should take a lock
to ensure only one writer is modifying the state at a time. Any kind
of python lock that has an `acquire` method can be used with the
`@synchronized` decorator and it will handle acquisition and release
automatically::
import threading
lock = threading.Lock()
@synchronized(lock)
def update_shared_state():
do some work
.. note::
If you pass no lock, a default lock will be used. This default
lock is reentrant. e.g.::
@synchronized
def do_something_single_threaded():
whatever
"""
if not lock:
lock = DEFAULT_LOCK
def wrap(func: Callable):
@functools.wraps(func)
def wrapper_synchronized(*args, **kwargs):
lock.acquire()
try:
return func(*args, **kwargs)
finally:
lock.release()
return wrapper_synchronized
if not _func:
return wrap
else:
return wrap(_func)
[docs]
def call_probabilistically(probability_of_call: float) -> Callable:
"""Calls the wrapped function probabilistically given a rate
between 0.0 and 1.0 inclusive (0% probability and 100%
probability).
Args:
probability_of_call: probability with which to invoke the
wrapped function. Must be 0 <= probabilty <= 1.0.
Raises:
ValueError: invalid probability argument
Example usage... this example would skip the invocation of
`log_the_entire_request_message` 95% of the time and only invoke
if 5% of the time.::
@call_probabilistically(0.05)
def log_the_entire_request_message(message: Whatever):
expensive work to save message to the log
"""
if not 0.0 <= probability_of_call <= 1.0:
msg = f"probability_of_call must be between [0, 1]. Got {probability_of_call}."
logger.critical(msg)
raise ValueError(msg)
def decorator(f):
@functools.wraps(f)
def _call_with_probability(*args, **kwargs):
if random.uniform(0, 1) < probability_of_call:
return f(*args, **kwargs)
else:
logger.debug(
"@call_with_probability_of_call skipping a call to %s", f.__name__
)
return None
return _call_with_probability
return decorator
[docs]
def decorate_matching_methods_with(decorator: Callable, acl: Optional[Callable] = None):
"""Apply the given decorator to all methods in a class whose names
begin with prefix. If prefix is None (default), decorate all
methods in the class.
Args:
decorator: the decorator to apply to matching class methods.
acl: the matcher used to predicate decorator application; None,
the default, applies the decorator to all class methods.
See :py:mod:`pyutils.security.acl` for more information
and options.
Example usage to wrap all methods whose names begin with either
"enter" or "exit" with the `@invocation_logged` decorator (see
:py:meth:`invocation_logged`)::
import pyutils.decorator_utils
import pyutils.security.acl as acl
@decorator_utils.decorate_matching_methods_with(
decorator_utils.invocation_logged,
acl.StringWildcardBasedACL(
allowed_patterns=['enter*', 'exit*'],
acl.Order.ALLOW_DENY
)
)
class MyClass:
def __init__(self):
self.name = None
self.rating = None
def __repr__(self) -> str:
return f'{self.name} @ {self.rating}'
def enterName(self, n: str) -> None:
if len(n) > 5:
self.name = n
def exitName(self, n: str) -> None:
pass
def enterRating(self, r: int) -> None:
if 1 <= r <= 5:
self.rating = r
def exitRating(self, r: int) -> None:
pass
"""
def decorate_the_class(cls):
for name, m in inspect.getmembers(cls, inspect.isfunction):
if acl is None or acl(name):
setattr(cls, name, decorator(m))
return cls
return decorate_the_class
if __name__ == "__main__":
import doctest
doctest.testmod()