+def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
+ """Limit invocation of a wrapped function to n calls per period.
+ Thread safe. In testing this was relatively fair with multiple
+ threads using it though that hasn't been measured.
+
+ >>> import time
+ >>> import decorator_utils
+ >>> import thread_utils
+
+ >>> calls = 0
+
+ >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
+ ... def limited(x: int):
+ ... global calls
+ ... calls += 1
+
+ >>> @thread_utils.background_thread
+ ... def a(stop):
+ ... for _ in range(3):
+ ... limited(_)
+
+ >>> @thread_utils.background_thread
+ ... def b(stop):
+ ... for _ in range(3):
+ ... limited(_)
+
+ >>> start = time.time()
+ >>> (t1, e1) = a()
+ >>> (t2, e2) = b()
+ >>> t1.join()
+ >>> t2.join()
+ >>> end = time.time()
+ >>> dur = end - start
+ >>> dur > 0.5
+ True
+
+ >>> calls
+ 6
+
+ """
+ min_interval_seconds = per_period_in_seconds / float(n_calls)
+
+ def wrapper_rate_limited(func: Callable) -> Callable:
+ cv = threading.Condition()
+ last_invocation_timestamp = [0.0]
+
+ def may_proceed() -> float:
+ now = time.time()
+ last_invocation = last_invocation_timestamp[0]
+ if last_invocation != 0.0:
+ elapsed_since_last = now - last_invocation
+ wait_time = min_interval_seconds - elapsed_since_last
+ else:
+ wait_time = 0.0
+ logger.debug(f'@{time.time()}> wait_time = {wait_time}')
+ return wait_time
+
+ def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
+ with cv:
+ while True:
+ if cv.wait_for(
+ lambda: may_proceed() <= 0.0,
+ timeout=may_proceed(),
+ ):
+ break
+ with cv:
+ logger.debug(f'@{time.time()}> calling it...')
+ ret = func(*args, **kargs)
+ last_invocation_timestamp[0] = time.time()
+ logger.debug(
+ f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
+ )
+ cv.notify()
+ return ret
+ return wrapper_wrapper_rate_limited
+ return wrapper_rate_limited
+
+