Add a doctest to type_utils.py.
[python_utils.git] / decorator_utils.py
index 1ecbce3e2b10003ecf315cbd9db0e04168860d49..68a9d69633f6babe78c80153753d5c4a41c150af 100644 (file)
@@ -14,14 +14,13 @@ import sys
 import threading
 import time
 import traceback
-from typing import Any, Callable, Optional
 import warnings
+from typing import Any, Callable, Optional
 
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 import exceptions
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -149,9 +148,7 @@ def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callabl
                 logger.debug(f'@{time.time()}> calling it...')
                 ret = func(*args, **kargs)
                 last_invocation_timestamp[0] = time.time()
-                logger.debug(
-                    f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
-                )
+                logger.debug(f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}')
                 cv.notify()
             return ret
 
@@ -223,7 +220,7 @@ def debug_count_calls(func: Callable) -> Callable:
         logger.info(msg)
         return func(*args, **kwargs)
 
-    wrapper_debug_count_calls.num_calls = 0
+    wrapper_debug_count_calls.num_calls = 0  # type: ignore
     return wrapper_debug_count_calls
 
 
@@ -291,9 +288,7 @@ class _SingletonWrapper:
 
     def __call__(self, *args, **kwargs):
         """Returns a single instance of decorated class"""
-        logger.debug(
-            f"@singleton returning global instance of {self.__wrapped__.__name__}"
-        )
+        logger.debug(f"@singleton returning global instance of {self.__wrapped__.__name__}")
         if self._instance is None:
             self._instance = self.__wrapped__(*args, **kwargs)
         return self._instance
@@ -366,7 +361,7 @@ def memoized(func: Callable) -> Callable:
             logger.debug(f"Returning memoized value for {func.__name__}")
         return wrapper_memoized.cache[cache_key]
 
-    wrapper_memoized.cache = dict()
+    wrapper_memoized.cache = dict()  # type: ignore
     return wrapper_memoized
 
 
@@ -595,9 +590,7 @@ class _Timeout(object):
         self.__limit = kwargs.pop("timeout", self.__limit)
         self.__queue = multiprocessing.Queue(1)
         args = (self.__queue, self.__function) + args
-        self.__process = multiprocessing.Process(
-            target=_target, args=args, kwargs=kwargs
-        )
+        self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs)
         self.__process.daemon = True
         self.__process.start()
         if self.__limit is not None:
@@ -694,9 +687,7 @@ def timeout(
 
             @functools.wraps(function)
             def new_function(*args, **kwargs):
-                timeout_wrapper = _Timeout(
-                    function, timeout_exception, error_message, seconds
-                )
+                timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds)
                 return timeout_wrapper(*args, **kwargs)
 
             return new_function
@@ -704,38 +695,19 @@ def timeout(
     return decorate
 
 
-class non_reentrant_code(object):
-    def __init__(self):
-        self._lock = threading.RLock
-        self._entered = False
-
-    def __call__(self, f):
-        def _gatekeeper(*args, **kwargs):
-            with self._lock:
-                if self._entered:
-                    return
-                self._entered = True
-                f(*args, **kwargs)
-                self._entered = False
+def synchronized(lock):
+    def wrap(f):
+        @functools.wraps(f)
+        def _gatekeeper(*args, **kw):
+            lock.acquire()
+            try:
+                return f(*args, **kw)
+            finally:
+                lock.release()
 
         return _gatekeeper
 
-
-class rlocked(object):
-    def __init__(self):
-        self._lock = threading.RLock
-        self._entered = False
-
-    def __call__(self, f):
-        def _gatekeeper(*args, **kwargs):
-            with self._lock:
-                if self._entered:
-                    return
-                self._entered = True
-                f(*args, **kwargs)
-                self._entered = False
-
-        return _gatekeeper
+    return wrap
 
 
 def call_with_sample_rate(sample_rate: float) -> Callable: