+def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
+ """Limit invocation of a wrapped function to n calls per period.
+ Thread safe. In testing this was relatively fair with multiple
+ threads using it though that hasn't been measured.
+
+ """
+ min_interval_seconds = per_period_in_seconds / float(n_calls)
+
+ def wrapper_rate_limited(func: Callable) -> Callable:
+ cv = threading.Condition()
+ last_invocation_timestamp = [0.0]
+
+ def may_proceed() -> float:
+ now = time.time()
+ last_invocation = last_invocation_timestamp[0]
+ if last_invocation != 0.0:
+ elapsed_since_last = now - last_invocation
+ wait_time = min_interval_seconds - elapsed_since_last
+ else:
+ wait_time = 0.0
+ return wait_time
+
+ def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
+ with cv:
+ while True:
+ cv.wait_for(
+ lambda: may_proceed() <= 0.0,
+ timeout=may_proceed(),
+ )
+ break
+ ret = func(*args, **kargs)
+ with cv:
+ last_invocation_timestamp[0] = time.time()
+ cv.notify()
+ return ret
+ return wrapper_wrapper_rate_limited
+ return wrapper_rate_limited
+
+