projects
/
python_utils.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Used isort to sort imports. Also added to the git pre-commit hook.
[python_utils.git]
/
decorator_utils.py
diff --git
a/decorator_utils.py
b/decorator_utils.py
index daae64e75348e973dc8a27cf387faf7f404ef2b2..a5c5afecb34c005d16a351cc703f44dc561b567d 100644
(file)
--- a/
decorator_utils.py
+++ b/
decorator_utils.py
@@
-14,14
+14,13
@@
import sys
import threading
import time
import traceback
import threading
import time
import traceback
-from typing import Any, Callable, Optional
import warnings
import warnings
+from typing import Any, Callable, Optional
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
import exceptions
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
import exceptions
-
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
@@
-80,9
+79,7
@@
def invocation_logged(func: Callable) -> Callable:
return wrapper_invocation_logged
return wrapper_invocation_logged
-def rate_limited(
- n_calls: int, *, per_period_in_seconds: float = 1.0
-) -> Callable:
+def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
"""Limit invocation of a wrapped function to n calls per period.
Thread safe. In testing this was relatively fair with multiple
threads using it though that hasn't been measured.
"""Limit invocation of a wrapped function to n calls per period.
Thread safe. In testing this was relatively fair with multiple
threads using it though that hasn't been measured.
@@
-220,14
+217,12
@@
def debug_count_calls(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_debug_count_calls(*args, **kwargs):
wrapper_debug_count_calls.num_calls += 1
@functools.wraps(func)
def wrapper_debug_count_calls(*args, **kwargs):
wrapper_debug_count_calls.num_calls += 1
- msg = (
- f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
- )
+ msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
print(msg)
logger.info(msg)
return func(*args, **kwargs)
print(msg)
logger.info(msg)
return func(*args, **kwargs)
- wrapper_debug_count_calls.num_calls = 0
+ wrapper_debug_count_calls.num_calls = 0
# type: ignore
return wrapper_debug_count_calls
return wrapper_debug_count_calls
@@
-266,15
+261,11
@@
def delay(
@functools.wraps(func)
def wrapper_delay(*args, **kwargs):
if when & DelayWhen.BEFORE_CALL:
@functools.wraps(func)
def wrapper_delay(*args, **kwargs):
if when & DelayWhen.BEFORE_CALL:
- logger.debug(
- f"@delay for {seconds}s BEFORE_CALL to {func.__name__}"
- )
+ logger.debug(f"@delay for {seconds}s BEFORE_CALL to {func.__name__}")
time.sleep(seconds)
retval = func(*args, **kwargs)
if when & DelayWhen.AFTER_CALL:
time.sleep(seconds)
retval = func(*args, **kwargs)
if when & DelayWhen.AFTER_CALL:
- logger.debug(
- f"@delay for {seconds}s AFTER_CALL to {func.__name__}"
- )
+ logger.debug(f"@delay for {seconds}s AFTER_CALL to {func.__name__}")
time.sleep(seconds)
return retval
time.sleep(seconds)
return retval
@@
-368,15
+359,13
@@
def memoized(func: Callable) -> Callable:
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
- logger.debug(
- f"Memoizing {cache_key} => {value} for {func.__name__}"
- )
+ logger.debug(f"Memoizing {cache_key} => {value} for {func.__name__}")
wrapper_memoized.cache[cache_key] = value
else:
logger.debug(f"Returning memoized value for {func.__name__}")
return wrapper_memoized.cache[cache_key]
wrapper_memoized.cache[cache_key] = value
else:
logger.debug(f"Returning memoized value for {func.__name__}")
return wrapper_memoized.cache[cache_key]
- wrapper_memoized.cache = dict()
+ wrapper_memoized.cache = dict()
# type: ignore
return wrapper_memoized
return wrapper_memoized
@@
-714,38
+703,19
@@
def timeout(
return decorate
return decorate
-class non_reentrant_code(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
-
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
+def synchronized(lock):
+ def wrap(f):
+ @functools.wraps(f)
+ def _gatekeeper(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
return _gatekeeper
return _gatekeeper
-
-class rlocked(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
-
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
-
- return _gatekeeper
+ return wrap
def call_with_sample_rate(sample_rate: float) -> Callable:
def call_with_sample_rate(sample_rate: float) -> Callable:
@@
-760,9
+730,7
@@
def call_with_sample_rate(sample_rate: float) -> Callable:
if random.uniform(0, 1) < sample_rate:
return f(*args, **kwargs)
else:
if random.uniform(0, 1) < sample_rate:
return f(*args, **kwargs)
else:
- logger.debug(
- f"@call_with_sample_rate skipping a call to {f.__name__}"
- )
+ logger.debug(f"@call_with_sample_rate skipping a call to {f.__name__}")
return _call_with_sample_rate
return _call_with_sample_rate