#!/usr/bin/env python3
# © Copyright 2021-2022, Scott Gasch
-# Portions (marked) below retain the original author's copyright.
+# A portion (marked) below retain the original author's copyright.
-"""Useful(?) decorators."""
+"""This is a grab bag of, hopefully, useful decorators."""
import enum
import functools
def timed(func: Callable) -> Callable:
- """Print the runtime of the decorated function.
+ """Prints + info logs the runtime of the decorated function at
+ each invocation.
>>> @timed
... def foo():
>>> foo() # doctest: +ELLIPSIS
Finished foo in ...
-
"""
@functools.wraps(func)
def invocation_logged(func: Callable) -> Callable:
- """Log the call of a function on stdout and the info log.
+ """Log the call of a function on sys.stdout and the info log.
>>> @invocation_logged
... def foo():
Entered foo
Hello, world.
Exited foo
-
"""
@functools.wraps(func)
def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
"""Limit invocation of a wrapped function to n calls per time period.
- Thread safe. In testing this was relatively fair with multiple
+ Thread-safe. In testing this was relatively fair with multiple
threads using it though that hasn't been measured in detail.
+ .. note::
+
+ The doctest below makes use of
+ :py:class:`pyutils.parallelize.thread_utils.background_thread`. See
+ that class' documentation for details.
+
>>> import time
>>> from pyutils import decorator_utils
>>> from pyutils.parallelize import thread_utils
... limited(_)
>>> start = time.time()
- >>> (t1, e1) = a()
- >>> (t2, e2) = b()
- >>> t1.join()
- >>> t2.join()
+ >>> (thread1, event1) = a()
+ >>> (thread2, event2) = b()
+ >>> thread1.join()
+ >>> thread2.join()
>>> end = time.time()
>>> dur = end - start
>>> dur > 0.5
True
-
>>> calls
6
-
"""
+
min_interval_seconds = per_period_in_seconds / float(n_calls)
def wrapper_rate_limited(func: Callable) -> Callable:
Call #4 of 'factoral'
Call #5 of 'factoral'
120
-
"""
@functools.wraps(func)
class DelayWhen(enum.IntEnum):
- """When should we delay: before or after calling the function (or
- both)?
+ """This enum is used with the `@delay` decorator to indicate that the
+ delay should happen before wrapped function invocation, after wrapped
+ function invocation, or both.
+ See: :py:meth:`delay`.
"""
BEFORE_CALL = 1
"""Slow down a function by inserting a delay before and/or after its
invocation.
- >>> import time
+ Args:
+ seconds: how long should we delay (via a simple `time.sleep()`)?
+ when: when should we delay.. before the invocation, after it, or both?
>>> @delay(seconds=1.0)
... def foo():
... pass
+ >>> import time
>>> start = time.time()
>>> foo()
>>> dur = time.time() - start
>>> dur >= 1.0
True
-
"""
def decorator_delay(func: Callable) -> Callable:
class _SingletonWrapper:
- """
- A singleton wrapper class. Its instances would be created
+ """An internal singleton wrapper class. Its instances are created
for each decorated class.
-
"""
def __init__(self, cls):
def singleton(cls):
"""
- A singleton decorator. Returns a wrapper objects. A call on that object
- returns a single instance object of decorated class. Use the __wrapped__
- attribute to access decorated class directly in unit tests
+ A singleton decorator; adding this to a class results in the decorator making
+ sure that there exists only one instance of that class globally in the
+ program by creating an instance the first time the class is constructed
+ and then returning the previously created singleton instance on subsequent
+ creation requests.
+
+ See also :py:meth:`pyutils.persistent.persistent_autoloaded_singleton`.
>>> @singleton
- ... class foo(object):
+ ... class global_configuration(object):
... pass
- >>> a = foo()
- >>> b = foo()
+ >>> a = global_configuration()
+ >>> b = global_configuration()
>>> a is b
True
-
>>> id(a) == id(b)
True
-
"""
return _SingletonWrapper(cls)
def memoized(func: Callable) -> Callable:
- """Keep a cache of previous function call results.
+ """Keep a cache of previous function call results. Use this with
+ pure functions without side effects that do expensive work.
+
+ The internal cache is a simple dict with a key based on the
+ arguments to the call so the result of the function must be determined
+ only by its parameters (i.e. it must be "functional") or this will
+ introduce errors. See:
+ https://en.wikipedia.org/wiki/Functional_programming#Pure_functions
- The cache here is a dict with a key based on the arguments to the
- call. Consider also: functools.cache for a more advanced
+ Consider also: :py:meth:`functools.cache` for a more advanced
implementation. See:
https://docs.python.org/3/library/functools.html#functools.cache
>>> import time
-
>>> @memoized
... def expensive(arg) -> int:
- ... # Simulate something slow to compute or lookup
+ ... # Simulate something slow to compute or lookup, like a
+ ... # computationally expensive task or a network read of
+ ... # static data (i.e. that should never change).
... time.sleep(1.0)
... return arg * arg
>>> start = time.time()
>>> expensive(5) # Takes about 1 sec
25
-
>>> expensive(3) # Also takes about 1 sec
9
-
>>> expensive(5) # Pulls from cache, fast
25
-
>>> expensive(3) # Pulls from cache again, fast
9
-
>>> dur = time.time() - start
>>> dur < 3.0
True
return wrapper_memoized
-def retry_predicate(
+def predicated_retry_with_backoff(
tries: int,
*,
predicate: Callable[..., bool],
backoff: float = 2.0,
):
"""Retries a function or method up to a certain number of times with a
- prescribed initial delay period and backoff rate (multiplier).
+ prescribed initial delay period and backoff rate (multiplier). Note
+ that :py:meth:`retry_if_false` and :py:meth:`retry_if_none` both
+ use this class with a predefined predicate but you can also use
+ it directly with your own custom predicate.
Args:
tries: the maximum number of attempts to run the function
the decorated function and must return True to indicate
that we should stop calling or False to indicate a retry
is necessary
+
+ .. note::
+
+ If after `tries` attempts the wrapped function is still
+ failing, this code returns the failure result to the caller.
+
+ Example usage that would call `make_the_RPC_call` up to three
+ times (as long as it returns a tuple with `False` in the second
+ element) with a delay of 1.0s the first time, 2.0s the second
+ time, and 4.0s the third time.::
+
+ @decorator_utils.predicated_retry_with_backoff(
+ 3,
+ predicate=lambda _: _[2] is False,
+ delay_sec=1.0,
+ backoff=2
+ )
+ def make_the_RPC_call() -> Tuple[str, int, bool]:
+ whatever
+
"""
if backoff < 1.0:
def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
- """A helper for @retry_predicate that retries a decorated
- function as long as it keeps returning False.
+ """A helper for `@predicated_retry_with_backoff` that retries a
+ decorated function as long as it keeps returning False.
- >>> import time
+ Args:
+ tries: max number of times to retry
+ delay_sec: initial delay before retry length in seconds
+ backoff: a multiplier (must be >= 1.0) used to optionally increase
+ subsequent delays on repeated failures.
+
+ .. note::
+
+ If after `tries` attempts the wrapped function is still
+ failing, this code returns the failure result (i.e. False) to
+ the caller.
+ >>> import time
>>> counter = 0
>>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
... def foo():
True
"""
- return retry_predicate(
+ return predicated_retry_with_backoff(
tries,
predicate=lambda x: x is True,
delay_sec=delay_sec,
def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
- """Another helper for @retry_predicate above. Retries up to N
- times so long as the wrapped function returns None with a delay
- between each retry and a backoff that can increase the delay.
- """
+ """A helper for `@predicated_retry_with_backoff` that continues to
+ invoke the wrapped function as long as it keeps returning None.
+ Retries up to N times with a delay between each retry and a
+ backoff that can increase the delay.
+
+ Args:
+ tries: max number of times to retry
+ delay_sec: initial delay before retry length in seconds
+ backoff: a multiplier (must be >= 1.0) used to optionally increase
+ subsequent delays on repeated failures.
+
+ .. note::
+
+ If after `tries` attempts the wrapped function is still
+ failing, this code returns the failure result (i.e. None) to
+ the caller.
- return retry_predicate(
+ Example usage... calls a function that reads a URL from the network
+ and returns the raw HTTP response or None on error with up to three
+ retries with an increasing backoff::
+
+ @retry_if_none(3, delay_sec=1.0, backoff=4.0)
+ def fetch_the_image(url: str) -> Optional[bytes]:
+ r = requests.get(url)
+ if r.status_code != 200:
+ return None
+ return r.content
+
+ # Use normally
+ image_binary_data = fetch_the_image(
+ 'https://www.whatever.com/foo/bar/baz.jpg'
+ )
+
+ # Note: even with retries this might still fail; be prepared
+ # to still receive a None return value.
+ if image_binary_data is None:
+ raise Exception(f"Couldn't read {url}?!")
+ """
+ return predicated_retry_with_backoff(
tries,
predicate=lambda x: x is not None,
delay_sec=delay_sec,
def deprecated(func):
"""This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
- when the function is used.
+ when the function is used. The warning includes the caller
+ as determined by examining the stack in the warning log.
>>> @deprecated
... def foo() -> None:
def thunkify(func):
- """
- Make a function immediately return a function of no args which,
- when called, waits for the result, which will start being
- processed in another thread.
+ """Make a function immediately return a function of no args which,
+ when called, waits for the original result. Meanwhile spin up a
+ background thread to begin computing the result in parallel.
+
+ Example usage... hide a slow network read behind a thunk that will
+ block only when it is called::
+
+ @thunkify
+ def read_url(url: str) -> Result:
+ make a slow network read
+
+ urls = [ long list of urls ]
+ results = []
+
+ for url in urls:
+ results.append(read_url(url))
+
+ In this example, we will start one background thread per url(!!)
+ requested. The result of read_url is no longer a `Result` but
+ rather a `Callable` (see `thunk` below) that, when invoked, awaits
+ the Result and returns it.
+
+ For more control over things like the number of worker threads and
+ the ability cause work to be done on background processes or even
+ on other machines, see
+ :py:class:`pyutils.parallelize.SmartFuture`,
+ :py:class:`pyutils.parallelize.DeferredOperation` and
+ :py:mod:`pyutils.parallelize.parallelize`.
"""
@functools.wraps(func)
# Licensee hereby agrees to include in any such work a brief summary
# of the changes made to Python.
-# (N.B. See NOTICE file in the root of this module for a list of
-# changes)
+# (N.B. See `NOTICE <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`__ file in the root of this module for a list
+# of changes)
# 4. PSF is making Python available to Licensee on an "AS IS"
# basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
def _raise_exception(exception, error_message: Optional[str]):
+ """Internal. Raise a deferred exception"""
if error_message is None:
raise Exception(exception)
else:
class _Timeout(object):
"""Wrap a function and add a timeout to it.
- Instances of this class are automatically generated by the add_timeout
- function defined below. Do not use directly.
+ .. warning::
+
+ Instances of this class are automatically generated by the
+ :py:meth:`timeout` function defined below. Do not use
+ directly. Example usage on :py:meth:`timeout`.
+
"""
def __init__(
error_message: str,
seconds: float,
):
+ """
+ .. warning::
+
+ Instances of this class are automatically generated by the
+ :py:meth:`timeout` function defined below. Do not use
+ directly. Example usage on :py:meth:`timeout`.
+ """
self.__limit = seconds
self.__function = function
self.__timeout_exception = timeout_exception
timeout_exception=TimeoutError,
error_message="Function call timed out",
):
- """Add a timeout parameter to a function and return the function.
+ """Add a timeout to a function. If the function takes longer than
+ the given timeout (in seconds) it will raise an exception and
+ return control to the caller.
+
+ .. note::
+
+ the use_signals parameter is included in order to support
+ multiprocessing scenarios (signal can only be used from the
+ process' main thread). When not using signals, timeout
+ granularity will be rounded to the nearest 0.1s and will poll.
+
+ .. warning::
- Note: the use_signals parameter is included in order to support
- multiprocessing scenarios (signal can only be used from the process'
- main thread). When not using signals, timeout granularity will be
- rounded to the nearest 0.1s.
+ Beware that a @timeout on a function inside at the
+ module-level will be evaluated at module load time and not
+ when the wrapped function is invoked. This is somewhat
+ counterintuitive and tricky and it can lead to problems when
+ relying on the automatic main thread detection code
+ (`use_signals=None`, the default) since the import probably
+ happens on the main thread and the invocation can happen on a
+ different thread (one which can't use signals). If in doubt,
+ do not use the automatic signal safety logic and set their
+ `use_signals` argument explicitly.
- Beware that an @timeout on a function inside a module will be
- evaluated at module load time and not when the wrapped function is
- invoked. This can lead to problems when relying on the automatic
- main thread detection code (use_signals=None, the default) since
- the import probably happens on the main thread and the invocation
- can happen on a different thread (which can't use signals).
+ Raises:
- Raises an exception when/if the timeout is reached.
+ An Exception with a timed out message when/if the timeout is
+ reached.
It is illegal to pass anything other than a function as the first
parameter. The function is wrapped and returned to the caller.
import pyutils.parallelize.thread_utils as tu
use_signals = tu.is_current_thread_main_thread()
+ # Please see warning above!!!
def decorate(function):
if use_signals:
def synchronized(lock):
- """Emulates java's synchronized keyword: given a lock, require that
- threads take that lock (or wait) before invoking the wrapped
+ """Emulates java's "synchronized" keyword: given a lock, require
+ that threads take that lock (or wait) before invoking the wrapped
function and automatically releases the lock afterwards.
+
+ Args:
+ lock: the lock that must be held to invoke the wrapped function.
+
+ Example usage. Imagine we have shared state between multiple thread
+ or processes and, to update the shared state, code should take a lock
+ to ensure only one writer is modifying the state at a time. Any kind
+ of python lock that has an `acquire` method can be used with the
+ `@synchronized` decorator and it will handle acquisition and release
+ automatically::
+
+ import threading
+
+ lock = threading.Lock()
+
+ @synchronized(lock)
+ def update_shared_state():
+ do some work
+
"""
def wrap(f):
return wrap
-def call_with_sample_rate(sample_rate: float) -> Callable:
- """Calls the wrapped function probabilistically given a rate between
- 0.0 and 1.0 inclusive (0% probability and 100% probability).
- """
+def call_probabilistically(probability_of_call: float) -> Callable:
+ """Calls the wrapped function probabilistically given a rate
+ between 0.0 and 1.0 inclusive (0% probability and 100%
+ probability).
+
+ Args:
+ probability_of_call: probability with which to invoke the
+ wrapped function. Must be 0 <= probabilty <= 1.0.
+
+ Example usage... this example would skip the invocation of
+ `log_the_entire_request_message` 95% of the time and only invoke
+ if 5% of the time.::
+
+ @call_probabilistically(0.05)
+ def log_the_entire_request_message(message: Whatever):
+ expensive work to save message to the log
- if not 0.0 <= sample_rate <= 1.0:
- msg = f"sample_rate must be between [0, 1]. Got {sample_rate}."
+ """
+ if not 0.0 <= probability_of_call <= 1.0:
+ msg = f"probability_of_call must be between [0, 1]. Got {probability_of_call}."
logger.critical(msg)
raise ValueError(msg)
def decorator(f):
@functools.wraps(f)
- def _call_with_sample_rate(*args, **kwargs):
- if random.uniform(0, 1) < sample_rate:
+ def _call_with_probability(*args, **kwargs):
+ if random.uniform(0, 1) < probability_of_call:
return f(*args, **kwargs)
else:
- logger.debug("@call_with_sample_rate skipping a call to %s", f.__name__)
+ logger.debug(
+ "@call_with_probability_of_call skipping a call to %s", f.__name__
+ )
return None
- return _call_with_sample_rate
+ return _call_with_probability
return decorator
"""Apply the given decorator to all methods in a class whose names
begin with prefix. If prefix is None (default), decorate all
methods in the class.
+
+ Args:
+ decorator: the decorator to apply to matching class methods.
+ acl: the matcher used to predicate decorator application; None,
+ the default, applies the decorator to all class methods.
+ See :py:mod:`pyutils.security.acl` for more information
+ and options.
+
+ Example usage to wrap all methods whose names begin with either
+ "enter" or "exit" with the `@invocation_logged` decorator (see
+ :py:meth:`invocation_logged`)::
+
+ import pyutils.decorator_utils
+ import pyutils.security.acl as acl
+
+ @decorator_utils.decorate_matching_methods_with(
+ decorator_utils.invocation_logged,
+ acl.StringWildcardBasedACL(
+ allowed_patterns=['enter*', 'exit*'],
+ acl.Order.ALLOW_DENY
+ )
+ )
+ class MyClass:
+ def __init__(self):
+ self.name = None
+ self.rating = None
+
+ def __repr__(self) -> str:
+ return f'{self.name} @ {self.rating}'
+
+ def enterName(self, n: str) -> None:
+ if len(n) > 5:
+ self.name = n
+
+ def exitName(self, n: str) -> None:
+ pass
+
+ def enterRating(self, r: int) -> None:
+ if 1 <= r <= 5:
+ self.rating = r
+
+ def exitRating(self, r: int) -> None:
+ pass
"""
def decorate_the_class(cls):