Add rate limiter decorator.
[python_utils.git] / decorator_utils.py
index e19759f92a5e5b06ea425c2a0755d96ea783a7af..480543ae97e26498b9d4314c927c5ff85b076757 100644 (file)
@@ -2,7 +2,6 @@
 
 """Decorators."""
 
-import datetime
 import enum
 import functools
 import inspect
@@ -15,7 +14,7 @@ import sys
 import threading
 import time
 import traceback
-from typing import Callable, Optional
+from typing import Any, Callable, Optional
 import warnings
 
 # This module is commonly used by others in here and should avoid
@@ -47,21 +46,42 @@ def invocation_logged(func: Callable) -> Callable:
 
     @functools.wraps(func)
     def wrapper_invocation_logged(*args, **kwargs):
-        now = datetime.datetime.now()
-        ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
-        msg = f"[{ts}]: Entered {func.__name__}"
+        msg = f"Entered {func.__qualname__}"
         print(msg)
         logger.info(msg)
         ret = func(*args, **kwargs)
-        now = datetime.datetime.now()
-        ts = now.strftime("%Y/%d/%b:%H:%M:%S%Z")
-        msg = f"[{ts}]: Exited {func.__name__}"
+        msg = f"Exited {func.__qualname__}"
         print(msg)
         logger.info(msg)
         return ret
     return wrapper_invocation_logged
 
 
+def rate_limited(n_per_second: int) -> Callable:
+    """Limit invocation of a wrapped function to n calls per second.
+    Thread safe.
+
+    """
+    min_interval = 1.0 / float(n_per_second)
+
+    def wrapper_rate_limited(func: Callable) -> Callable:
+        last_invocation_time = [0.0]
+
+        def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
+            while True:
+                elapsed = time.clock_gettime(0) - last_invocation_time[0]
+                wait_time = min_interval - elapsed
+                if wait_time > 0.0:
+                    time.sleep(wait_time)
+                else:
+                    break
+            ret = func(*args, **kargs)
+            last_invocation_time[0] = time.clock_gettime(0)
+            return ret
+        return wrapper_wrapper_rate_limited
+    return wrapper_rate_limited
+
+
 def debug_args(func: Callable) -> Callable:
     """Print the function signature and return value at each call."""
 
@@ -297,8 +317,8 @@ def thunkify(func):
                 exc[0] = True
                 exc[1] = sys.exc_info()  # (type, value, traceback)
                 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
-                logger.warning(msg)
                 print(msg)
+                logger.warning(msg)
             finally:
                 wait_event.set()