import threading
import time
import traceback
-from typing import Any, Callable, Optional
import warnings
+from typing import Any, Callable, Optional
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
import exceptions
-
logger = logging.getLogger(__name__)
logger.debug(f'@{time.time()}> calling it...')
ret = func(*args, **kargs)
last_invocation_timestamp[0] = time.time()
- logger.debug(
- f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
- )
+ logger.debug(f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}')
cv.notify()
return ret
logger.info(msg)
return func(*args, **kwargs)
- wrapper_debug_count_calls.num_calls = 0
+ wrapper_debug_count_calls.num_calls = 0 # type: ignore
return wrapper_debug_count_calls
def __call__(self, *args, **kwargs):
"""Returns a single instance of decorated class"""
- logger.debug(
- f"@singleton returning global instance of {self.__wrapped__.__name__}"
- )
+ logger.debug(f"@singleton returning global instance of {self.__wrapped__.__name__}")
if self._instance is None:
self._instance = self.__wrapped__(*args, **kwargs)
return self._instance
logger.debug(f"Returning memoized value for {func.__name__}")
return wrapper_memoized.cache[cache_key]
- wrapper_memoized.cache = dict()
+ wrapper_memoized.cache = dict() # type: ignore
return wrapper_memoized
self.__limit = kwargs.pop("timeout", self.__limit)
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
- self.__process = multiprocessing.Process(
- target=_target, args=args, kwargs=kwargs
- )
+ self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
if self.__limit is not None:
@functools.wraps(function)
def new_function(*args, **kwargs):
- timeout_wrapper = _Timeout(
- function, timeout_exception, error_message, seconds
- )
+ timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds)
return timeout_wrapper(*args, **kwargs)
return new_function
return decorate
-class non_reentrant_code(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
-
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
+def synchronized(lock):
+ def wrap(f):
+ @functools.wraps(f)
+ def _gatekeeper(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
return _gatekeeper
-
-class rlocked(object):
- def __init__(self):
- self._lock = threading.RLock
- self._entered = False
-
- def __call__(self, f):
- def _gatekeeper(*args, **kwargs):
- with self._lock:
- if self._entered:
- return
- self._entered = True
- f(*args, **kwargs)
- self._entered = False
-
- return _gatekeeper
+ return wrap
def call_with_sample_rate(sample_rate: float) -> Callable: