Update sphinx links to anonymous references to work around a warning
[pyutils.git] / src / pyutils / decorator_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4 # A portion (marked) below retain the original author's copyright.
5
6 """This is a grab bag of, hopefully, useful decorators."""
7
8 import enum
9 import functools
10 import inspect
11 import logging
12 import math
13 import multiprocessing
14 import random
15 import signal
16 import sys
17 import threading
18 import time
19 import traceback
20 import warnings
21 from typing import Any, Callable, List, Optional
22
23 # This module is commonly used by others in here and should avoid
24 # taking any unnecessary dependencies back on them.
25
26 logger = logging.getLogger(__name__)
27
28
29 def timed(func: Callable) -> Callable:
30     """Prints + info logs the runtime of the decorated function at
31     each invocation.
32
33     >>> @timed
34     ... def foo():
35     ...     import time
36     ...     time.sleep(0.01)
37
38     >>> foo()  # doctest: +ELLIPSIS
39     Finished foo in ...
40     """
41
42     @functools.wraps(func)
43     def wrapper_timer(*args, **kwargs):
44         start_time = time.perf_counter()
45         value = func(*args, **kwargs)
46         end_time = time.perf_counter()
47         run_time = end_time - start_time
48         msg = f"Finished {func.__qualname__} in {run_time:.4f}s"
49         print(msg)
50         logger.info(msg)
51         return value
52
53     return wrapper_timer
54
55
56 def invocation_logged(func: Callable) -> Callable:
57     """Log the call of a function on sys.stdout and the info log.
58
59     >>> @invocation_logged
60     ... def foo():
61     ...     print('Hello, world.')
62
63     >>> foo()
64     Entered foo
65     Hello, world.
66     Exited foo
67     """
68
69     @functools.wraps(func)
70     def wrapper_invocation_logged(*args, **kwargs):
71         msg = f"Entered {func.__qualname__}"
72         print(msg)
73         logger.info(msg)
74         ret = func(*args, **kwargs)
75         msg = f"Exited {func.__qualname__}"
76         print(msg)
77         logger.info(msg)
78         return ret
79
80     return wrapper_invocation_logged
81
82
83 def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
84     """Limit invocation of a wrapped function to n calls per time period.
85     Thread-safe.  In testing this was relatively fair with multiple
86     threads using it though that hasn't been measured in detail.
87
88     .. note::
89
90         The doctest below makes use of
91         :py:class:`pyutils.parallelize.thread_utils.background_thread`.  See
92         that class' documentation for details.
93
94     >>> import time
95     >>> from pyutils import decorator_utils
96     >>> from pyutils.parallelize import thread_utils
97
98     >>> calls = 0
99
100     >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
101     ... def limited(x: int):
102     ...     global calls
103     ...     calls += 1
104
105     >>> @thread_utils.background_thread
106     ... def a(stop):
107     ...     for _ in range(3):
108     ...         limited(_)
109
110     >>> @thread_utils.background_thread
111     ... def b(stop):
112     ...     for _ in range(3):
113     ...         limited(_)
114
115     >>> start = time.time()
116     >>> (thread1, event1) = a()
117     >>> (thread2, event2) = b()
118     >>> thread1.join()
119     >>> thread2.join()
120     >>> end = time.time()
121     >>> dur = end - start
122     >>> dur > 0.5
123     True
124     >>> calls
125     6
126     """
127
128     min_interval_seconds = per_period_in_seconds / float(n_calls)
129
130     def wrapper_rate_limited(func: Callable) -> Callable:
131         cv = threading.Condition()
132         last_invocation_timestamp = [0.0]
133
134         def may_proceed() -> float:
135             now = time.time()
136             last_invocation = last_invocation_timestamp[0]
137             if last_invocation != 0.0:
138                 elapsed_since_last = now - last_invocation
139                 wait_time = min_interval_seconds - elapsed_since_last
140             else:
141                 wait_time = 0.0
142             logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time)
143             return wait_time
144
145         def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
146             with cv:
147                 while True:
148                     if cv.wait_for(
149                         lambda: may_proceed() <= 0.0,
150                         timeout=may_proceed(),
151                     ):
152                         break
153             with cv:
154                 logger.debug('@%.4f> calling it...', time.time())
155                 ret = func(*args, **kargs)
156                 last_invocation_timestamp[0] = time.time()
157                 logger.debug(
158                     '@%.4f> Last invocation <- %.4f',
159                     time.time(),
160                     last_invocation_timestamp[0],
161                 )
162                 cv.notify()
163             return ret
164
165         return wrapper_wrapper_rate_limited
166
167     return wrapper_rate_limited
168
169
170 def debug_args(func: Callable) -> Callable:
171     """Print the function signature and return value at each call.
172
173     >>> @debug_args
174     ... def foo(a, b, c):
175     ...     print(a)
176     ...     print(b)
177     ...     print(c)
178     ...     return (a + b, c)
179
180     >>> foo(1, 2.0, "test")
181     Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>)
182     1
183     2.0
184     test
185     foo returned (3.0, 'test'):<class 'tuple'>
186     (3.0, 'test')
187     """
188
189     @functools.wraps(func)
190     def wrapper_debug_args(*args, **kwargs):
191         args_repr = [f"{repr(a)}:{type(a)}" for a in args]
192         kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
193         signature = ", ".join(args_repr + kwargs_repr)
194         msg = f"Calling {func.__qualname__}({signature})"
195         print(msg)
196         logger.info(msg)
197         value = func(*args, **kwargs)
198         msg = f"{func.__qualname__} returned {value!r}:{type(value)}"
199         print(msg)
200         logger.info(msg)
201         return value
202
203     return wrapper_debug_args
204
205
206 def debug_count_calls(func: Callable) -> Callable:
207     """Count function invocations and print a message befor every call.
208
209     >>> @debug_count_calls
210     ... def factoral(x):
211     ...     if x == 1:
212     ...         return 1
213     ...     return x * factoral(x - 1)
214
215     >>> factoral(5)
216     Call #1 of 'factoral'
217     Call #2 of 'factoral'
218     Call #3 of 'factoral'
219     Call #4 of 'factoral'
220     Call #5 of 'factoral'
221     120
222     """
223
224     @functools.wraps(func)
225     def wrapper_debug_count_calls(*args, **kwargs):
226         wrapper_debug_count_calls.num_calls += 1
227         msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
228         print(msg)
229         logger.info(msg)
230         return func(*args, **kwargs)
231
232     wrapper_debug_count_calls.num_calls = 0  # type: ignore
233     return wrapper_debug_count_calls
234
235
236 class DelayWhen(enum.IntEnum):
237     """This enum is used with the `@delay` decorator to indicate that the
238     delay should happen before wrapped function invocation, after wrapped
239     function invocation, or both.
240
241     See: :py:meth:`delay`.
242     """
243
244     BEFORE_CALL = 1
245     AFTER_CALL = 2
246     BEFORE_AND_AFTER = 3
247
248
249 def delay(
250     _func: Callable = None,
251     *,
252     seconds: float = 1.0,
253     when: DelayWhen = DelayWhen.BEFORE_CALL,
254 ) -> Callable:
255     """Slow down a function by inserting a delay before and/or after its
256     invocation.
257
258     Args:
259         seconds: how long should we delay (via a simple `time.sleep()`)?
260         when: when should we delay.. before the invocation, after it, or both?
261
262     >>> @delay(seconds=1.0)
263     ... def foo():
264     ...     pass
265
266     >>> import time
267     >>> start = time.time()
268     >>> foo()
269     >>> dur = time.time() - start
270     >>> dur >= 1.0
271     True
272     """
273
274     def decorator_delay(func: Callable) -> Callable:
275         @functools.wraps(func)
276         def wrapper_delay(*args, **kwargs):
277             if when & DelayWhen.BEFORE_CALL:
278                 logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__)
279                 time.sleep(seconds)
280             retval = func(*args, **kwargs)
281             if when & DelayWhen.AFTER_CALL:
282                 logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__)
283                 time.sleep(seconds)
284             return retval
285
286         return wrapper_delay
287
288     if _func is None:
289         return decorator_delay
290     else:
291         return decorator_delay(_func)
292
293
294 class _SingletonWrapper:
295     """An internal singleton wrapper class. Its instances are created
296     for each decorated class.
297     """
298
299     def __init__(self, cls):
300         self.__wrapped__ = cls
301         self._instance = None
302
303     def __call__(self, *args, **kwargs):
304         """Returns a single instance of decorated class"""
305         logger.debug(
306             '@singleton returning global instance of %s', self.__wrapped__.__name__
307         )
308         if self._instance is None:
309             self._instance = self.__wrapped__(*args, **kwargs)
310         return self._instance
311
312
313 def singleton(cls):
314     """
315     A singleton decorator; adding this to a class results in the decorator making
316     sure that there exists only one instance of that class globally in the
317     program by creating an instance the first time the class is constructed
318     and then returning the previously created singleton instance on subsequent
319     creation requests.
320
321     See also :py:meth:`pyutils.persistent.persistent_autoloaded_singleton`.
322
323     >>> @singleton
324     ... class global_configuration(object):
325     ...     pass
326
327     >>> a = global_configuration()
328     >>> b = global_configuration()
329     >>> a is b
330     True
331     >>> id(a) == id(b)
332     True
333     """
334     return _SingletonWrapper(cls)
335
336
337 def memoized(func: Callable) -> Callable:
338     """Keep a cache of previous function call results.  Use this with
339     pure functions without side effects that do expensive work.
340
341     The internal cache is a simple dict with a key based on the
342     arguments to the call so the result of the function must be determined
343     only by its parameters (i.e. it must be "functional") or this will
344     introduce errors.  See:
345     https://en.wikipedia.org/wiki/Functional_programming#Pure_functions
346
347     Consider also: :py:meth:`functools.cache` for a more advanced
348     implementation.  See:
349     https://docs.python.org/3/library/functools.html#functools.cache
350
351     >>> import time
352     >>> @memoized
353     ... def expensive(arg) -> int:
354     ...     # Simulate something slow to compute or lookup, like a
355     ...     # computationally expensive task or a network read of
356     ...     # static data (i.e. that should never change).
357     ...     time.sleep(1.0)
358     ...     return arg * arg
359
360     >>> start = time.time()
361     >>> expensive(5)           # Takes about 1 sec
362     25
363     >>> expensive(3)           # Also takes about 1 sec
364     9
365     >>> expensive(5)           # Pulls from cache, fast
366     25
367     >>> expensive(3)           # Pulls from cache again, fast
368     9
369     >>> dur = time.time() - start
370     >>> dur < 3.0
371     True
372
373     """
374
375     @functools.wraps(func)
376     def wrapper_memoized(*args, **kwargs):
377         cache_key = args + tuple(kwargs.items())
378         if cache_key not in wrapper_memoized.cache:
379             value = func(*args, **kwargs)
380             logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__)
381             wrapper_memoized.cache[cache_key] = value
382         else:
383             logger.debug('Returning memoized value for %s', {func.__name__})
384         return wrapper_memoized.cache[cache_key]
385
386     wrapper_memoized.cache = {}  # type: ignore
387     return wrapper_memoized
388
389
390 def predicated_retry_with_backoff(
391     tries: int,
392     *,
393     predicate: Callable[..., bool],
394     delay_sec: float = 3.0,
395     backoff: float = 2.0,
396 ):
397     """Retries a function or method up to a certain number of times with a
398     prescribed initial delay period and backoff rate (multiplier).  Note
399     that :py:meth:`retry_if_false` and :py:meth:`retry_if_none` both
400     use this class with a predefined predicate but you can also use
401     it directly with your own custom predicate.
402
403     Args:
404         tries: the maximum number of attempts to run the function
405         delay_sec: sets the initial delay period in seconds
406         backoff: a multiplier (must be >=1.0) used to modify the
407             delay at each subsequent invocation
408         predicate: a Callable that will be passed the retval of
409             the decorated function and must return True to indicate
410             that we should stop calling or False to indicate a retry
411             is necessary
412
413     .. note::
414
415         If after `tries` attempts the wrapped function is still
416         failing, this code returns the failure result to the caller.
417
418     Example usage that would call `make_the_RPC_call` up to three
419     times (as long as it returns a tuple with `False` in the second
420     element) with a delay of 1.0s the first time, 2.0s the second
421     time, and 4.0s the third time.::
422
423         @decorator_utils.predicated_retry_with_backoff(
424             3,
425             predicate=lambda _: _[2] is False,
426             delay_sec=1.0,
427             backoff=2
428         )
429         def make_the_RPC_call() -> Tuple[str, int, bool]:
430             whatever
431
432     """
433
434     if backoff < 1.0:
435         msg = f"backoff must be greater than or equal to 1, got {backoff}"
436         logger.critical(msg)
437         raise ValueError(msg)
438
439     tries = math.floor(tries)
440     if tries < 0:
441         msg = f"tries must be 0 or greater, got {tries}"
442         logger.critical(msg)
443         raise ValueError(msg)
444
445     if delay_sec <= 0:
446         msg = f"delay_sec must be greater than 0, got {delay_sec}"
447         logger.critical(msg)
448         raise ValueError(msg)
449
450     def deco_retry(f):
451         @functools.wraps(f)
452         def f_retry(*args, **kwargs):
453             mtries, mdelay = tries, delay_sec  # make mutable
454             logger.debug('deco_retry: will make up to %d attempts...', mtries)
455             retval = f(*args, **kwargs)
456             while mtries > 0:
457                 if predicate(retval) is True:
458                     logger.debug('Predicate succeeded, deco_retry is done.')
459                     return retval
460                 logger.debug("Predicate failed, sleeping and retrying.")
461                 mtries -= 1
462                 time.sleep(mdelay)
463                 mdelay *= backoff
464                 retval = f(*args, **kwargs)
465             return retval
466
467         return f_retry
468
469     return deco_retry
470
471
472 def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
473     """A helper for `@predicated_retry_with_backoff` that retries a
474     decorated function as long as it keeps returning False.
475
476     Args:
477         tries: max number of times to retry
478         delay_sec: initial delay before retry length in seconds
479         backoff: a multiplier (must be >= 1.0) used to optionally increase
480             subsequent delays on repeated failures.
481
482     .. note::
483
484         If after `tries` attempts the wrapped function is still
485         failing, this code returns the failure result (i.e. False) to
486         the caller.
487
488     >>> import time
489     >>> counter = 0
490     >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
491     ... def foo():
492     ...     global counter
493     ...     counter += 1
494     ...     return counter >= 3
495
496     >>> start = time.time()
497     >>> foo()  # fail, delay 1.0, fail, delay 1.1, succeed
498     True
499
500     >>> dur = time.time() - start
501     >>> counter
502     3
503     >>> dur > 2.0
504     True
505     >>> dur < 2.3
506     True
507
508     """
509     return predicated_retry_with_backoff(
510         tries,
511         predicate=lambda x: x is True,
512         delay_sec=delay_sec,
513         backoff=backoff,
514     )
515
516
517 def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
518     """A helper for `@predicated_retry_with_backoff` that continues to
519     invoke the wrapped function as long as it keeps returning None.
520     Retries up to N times with a delay between each retry and a
521     backoff that can increase the delay.
522
523     Args:
524         tries: max number of times to retry
525         delay_sec: initial delay before retry length in seconds
526         backoff: a multiplier (must be >= 1.0) used to optionally increase
527             subsequent delays on repeated failures.
528
529     .. note::
530
531         If after `tries` attempts the wrapped function is still
532         failing, this code returns the failure result (i.e. None) to
533         the caller.
534
535     Example usage... calls a function that reads a URL from the network
536     and returns the raw HTTP response or None on error with up to three
537     retries with an increasing backoff::
538
539         @retry_if_none(3, delay_sec=1.0, backoff=4.0)
540         def fetch_the_image(url: str) -> Optional[bytes]:
541             r = requests.get(url)
542             if r.status_code != 200:
543                 return None
544             return r.content
545
546         # Use normally
547         image_binary_data = fetch_the_image(
548             'https://www.whatever.com/foo/bar/baz.jpg'
549         )
550
551         # Note: even with retries this might still fail; be prepared
552         # to still receive a None return value.
553         if image_binary_data is None:
554             raise Exception(f"Couldn't read {url}?!")
555     """
556     return predicated_retry_with_backoff(
557         tries,
558         predicate=lambda x: x is not None,
559         delay_sec=delay_sec,
560         backoff=backoff,
561     )
562
563
564 def deprecated(func):
565     """This is a decorator which can be used to mark functions
566     as deprecated. It will result in a warning being emitted
567     when the function is used.  The warning includes the caller
568     as determined by examining the stack in the warning log.
569
570     >>> @deprecated
571     ... def foo() -> None:
572     ...     pass
573     >>> foo()   # prints + logs "Call to deprecated function foo"
574     """
575
576     @functools.wraps(func)
577     def wrapper_deprecated(*args, **kwargs):
578         msg = f"Call to deprecated function {func.__qualname__}"
579         logger.warning(msg)
580         warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
581         print(msg, file=sys.stderr)
582         return func(*args, **kwargs)
583
584     return wrapper_deprecated
585
586
587 def thunkify(func):
588     """Make a function immediately return a function of no args which,
589     when called, waits for the original result.  Meanwhile spin up a
590     background thread to begin computing the result in parallel.
591
592     Example usage... hide a slow network read behind a thunk that will
593     block only when it is called::
594
595         @thunkify
596         def read_url(url: str) -> Result:
597             make a slow network read
598
599         urls = [ long list of urls ]
600         results = []
601
602         for url in urls:
603             results.append(read_url(url))
604
605     In this example, we will start one background thread per url(!!)
606     requested.  The result of read_url is no longer a `Result` but
607     rather a `Callable` (see `thunk` below) that, when invoked, awaits
608     the Result and returns it.
609
610     For more control over things like the number of worker threads and
611     the ability cause work to be done on background processes or even
612     on other machines, see
613     :py:class:`pyutils.parallelize.SmartFuture`,
614     :py:class:`pyutils.parallelize.DeferredOperation` and
615     :py:mod:`pyutils.parallelize.parallelize`.
616     """
617
618     @functools.wraps(func)
619     def lazy_thunked(*args, **kwargs):
620         wait_event = threading.Event()
621
622         result = [None]
623         exc: List[Any] = [False, None]
624
625         def worker_func():
626             try:
627                 func_result = func(*args, **kwargs)
628                 result[0] = func_result
629             except Exception:
630                 exc[0] = True
631                 exc[1] = sys.exc_info()  # (type, value, traceback)
632                 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
633                 logger.warning(msg)
634             finally:
635                 wait_event.set()
636
637         def thunk():
638             wait_event.wait()
639             if exc[0]:
640                 assert exc[1]
641                 raise exc[1][0](exc[1][1])
642             return result[0]
643
644         threading.Thread(target=worker_func).start()
645         return thunk
646
647     return lazy_thunked
648
649
650 ############################################################
651 # Timeout
652 ############################################################
653
654 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
655 # Used work of Stephen "Zero" Chappell <[email protected]>
656 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
657
658 # Original work is covered by PSF-2.0:
659
660 # 1. This LICENSE AGREEMENT is between the Python Software Foundation
661 # ("PSF"), and the Individual or Organization ("Licensee") accessing
662 # and otherwise using this software ("Python") in source or binary
663 # form and its associated documentation.
664 #
665 # 2. Subject to the terms and conditions of this License Agreement,
666 # PSF hereby grants Licensee a nonexclusive, royalty-free, world-wide
667 # license to reproduce, analyze, test, perform and/or display
668 # publicly, prepare derivative works, distribute, and otherwise use
669 # Python alone or in any derivative version, provided, however, that
670 # PSF's License Agreement and PSF's notice of copyright, i.e.,
671 # "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 Python Software
672 # Foundation; All Rights Reserved" are retained in Python alone or in
673 # any derivative version prepared by Licensee.
674
675 # 3. In the event Licensee prepares a derivative work that is based on
676 # or incorporates Python or any part thereof, and wants to make the
677 # derivative work available to others as provided herein, then
678 # Licensee hereby agrees to include in any such work a brief summary
679 # of the changes made to Python.
680
681 # (N.B. See `NOTICE <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`__ file in the root of this module for a list
682 # of changes)
683
684 # 4. PSF is making Python available to Licensee on an "AS IS"
685 # basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
686 # IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
687 # DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR
688 # FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL
689 # NOT INFRINGE ANY THIRD PARTY RIGHTS.
690
691 # 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
692 # FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A
693 # RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, OR ANY
694 # DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
695
696 # 6. This License Agreement will automatically terminate upon a
697 # material breach of its terms and conditions.
698
699 # 7. Nothing in this License Agreement shall be deemed to create any
700 # relationship of agency, partnership, or joint venture between PSF
701 # and Licensee. This License Agreement does not grant permission to
702 # use PSF trademarks or trade name in a trademark sense to endorse or
703 # promote products or services of Licensee, or any third party.
704
705 # 8. By copying, installing or otherwise using Python, Licensee agrees
706 # to be bound by the terms and conditions of this License Agreement.
707
708
709 def _raise_exception(exception, error_message: Optional[str]):
710     """Internal.  Raise a deferred exception"""
711     if error_message is None:
712         raise Exception(exception)
713     else:
714         raise Exception(error_message)
715
716
717 def _target(queue, function, *args, **kwargs):
718     """Run a function with arguments and return output via a queue.
719
720     This is a helper function for the Process created in _Timeout. It runs
721     the function with positional arguments and keyword arguments and then
722     returns the function's output by way of a queue. If an exception gets
723     raised, it is returned to _Timeout to be raised by the value property.
724     """
725     try:
726         queue.put((True, function(*args, **kwargs)))
727     except Exception:
728         queue.put((False, sys.exc_info()[1]))
729
730
731 class _Timeout(object):
732     """Wrap a function and add a timeout to it.
733
734     .. warning::
735
736         Instances of this class are automatically generated by the
737         :py:meth:`timeout` function defined below.  Do not use
738         directly.  Example usage on :py:meth:`timeout`.
739
740     """
741
742     def __init__(
743         self,
744         function: Callable,
745         timeout_exception: Exception,
746         error_message: str,
747         seconds: float,
748     ):
749         """
750         .. warning::
751
752             Instances of this class are automatically generated by the
753             :py:meth:`timeout` function defined below.  Do not use
754             directly.  Example usage on :py:meth:`timeout`.
755         """
756         self.__limit = seconds
757         self.__function = function
758         self.__timeout_exception = timeout_exception
759         self.__error_message = error_message
760         self.__name__ = function.__name__
761         self.__doc__ = function.__doc__
762         self.__timeout = time.time()
763         self.__process = multiprocessing.Process()
764         self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
765
766     def __call__(self, *args, **kwargs):
767         """Execute the embedded function object asynchronously.
768
769         The function given to the constructor is transparently called and
770         requires that "ready" be intermittently polled. If and when it is
771         True, the "value" property may then be checked for returned data.
772         """
773         self.__limit = kwargs.pop("timeout", self.__limit)
774         self.__queue = multiprocessing.Queue(1)
775         args = (self.__queue, self.__function) + args
776         self.__process = multiprocessing.Process(
777             target=_target, args=args, kwargs=kwargs
778         )
779         self.__process.daemon = True
780         self.__process.start()
781         if self.__limit is not None:
782             self.__timeout = self.__limit + time.time()
783         while not self.ready:
784             time.sleep(0.1)
785         return self.value
786
787     def cancel(self):
788         """Terminate any possible execution of the embedded function."""
789         if self.__process.is_alive():
790             self.__process.terminate()
791         _raise_exception(self.__timeout_exception, self.__error_message)
792
793     @property
794     def ready(self):
795         """Read-only property indicating status of "value" property."""
796         if self.__limit and self.__timeout < time.time():
797             self.cancel()
798         return self.__queue.full() and not self.__queue.empty()
799
800     @property
801     def value(self):
802         """Read-only property containing data returned from function."""
803         if self.ready is True:
804             flag, load = self.__queue.get()
805             if flag:
806                 return load
807             raise load
808         return None
809
810
811 def timeout(
812     seconds: float = 1.0,
813     use_signals: Optional[bool] = None,
814     timeout_exception=TimeoutError,
815     error_message="Function call timed out",
816 ):
817     """Add a timeout to a function.  If the function takes longer than
818     the given timeout (in seconds) it will raise an exception and
819     return control to the caller.
820
821     .. note::
822
823         the use_signals parameter is included in order to support
824         multiprocessing scenarios (signal can only be used from the
825         process' main thread).  When not using signals, timeout
826         granularity will be rounded to the nearest 0.1s and will poll.
827
828     .. warning::
829
830         Beware that a @timeout on a function inside at the
831         module-level will be evaluated at module load time and not
832         when the wrapped function is invoked.  This is somewhat
833         counterintuitive and tricky and it can lead to problems when
834         relying on the automatic main thread detection code
835         (`use_signals=None`, the default) since the import probably
836         happens on the main thread and the invocation can happen on a
837         different thread (one which can't use signals).  If in doubt,
838         do not use the automatic signal safety logic and set their
839         `use_signals` argument explicitly.
840
841     Raises:
842
843         An Exception with a timed out message when/if the timeout is
844         reached.
845
846     It is illegal to pass anything other than a function as the first
847     parameter.  The function is wrapped and returned to the caller.
848
849     >>> @timeout(0.2)
850     ... def foo(delay: float):
851     ...     time.sleep(delay)
852     ...     return "ok"
853
854     >>> foo(0)
855     'ok'
856
857     >>> foo(1.0)
858     Traceback (most recent call last):
859     ...
860     Exception: Function call timed out
861
862     """
863     if use_signals is None:
864         import pyutils.parallelize.thread_utils as tu
865
866         use_signals = tu.is_current_thread_main_thread()
867         # Please see warning above!!!
868
869     def decorate(function):
870         if use_signals:
871
872             def handler(unused_signum, unused_frame):
873                 _raise_exception(timeout_exception, error_message)
874
875             @functools.wraps(function)
876             def new_function(*args, **kwargs):
877                 new_seconds = kwargs.pop("timeout", seconds)
878                 if new_seconds:
879                     old = signal.signal(signal.SIGALRM, handler)
880                     signal.setitimer(signal.ITIMER_REAL, new_seconds)
881
882                 if not seconds:
883                     return function(*args, **kwargs)
884
885                 try:
886                     return function(*args, **kwargs)
887                 finally:
888                     if new_seconds:
889                         signal.setitimer(signal.ITIMER_REAL, 0)
890                         signal.signal(signal.SIGALRM, old)
891
892             return new_function
893         else:
894
895             @functools.wraps(function)
896             def new_function(*args, **kwargs):
897                 timeout_wrapper = _Timeout(
898                     function, timeout_exception, error_message, seconds
899                 )
900                 return timeout_wrapper(*args, **kwargs)
901
902             return new_function
903
904     return decorate
905
906
907 def synchronized(lock):
908     """Emulates java's "synchronized" keyword: given a lock, require
909     that threads take that lock (or wait) before invoking the wrapped
910     function and automatically releases the lock afterwards.
911
912     Args:
913         lock: the lock that must be held to invoke the wrapped function.
914
915     Example usage.  Imagine we have shared state between multiple thread
916     or processes and, to update the shared state, code should take a lock
917     to ensure only one writer is modifying the state at a time.  Any kind
918     of python lock that has an `acquire` method can be used with the
919     `@synchronized` decorator and it will handle acquisition and release
920     automatically::
921
922         import threading
923
924         lock = threading.Lock()
925
926         @synchronized(lock)
927         def update_shared_state():
928             do some work
929
930     """
931
932     def wrap(f):
933         @functools.wraps(f)
934         def _gatekeeper(*args, **kw):
935             lock.acquire()
936             try:
937                 return f(*args, **kw)
938             finally:
939                 lock.release()
940
941         return _gatekeeper
942
943     return wrap
944
945
946 def call_probabilistically(probability_of_call: float) -> Callable:
947     """Calls the wrapped function probabilistically given a rate
948     between 0.0 and 1.0 inclusive (0% probability and 100%
949     probability).
950
951     Args:
952         probability_of_call: probability with which to invoke the
953             wrapped function.  Must be 0 <= probabilty <= 1.0.
954
955     Example usage... this example would skip the invocation of
956     `log_the_entire_request_message` 95% of the time and only invoke
957     if 5% of the time.::
958
959         @call_probabilistically(0.05)
960         def log_the_entire_request_message(message: Whatever):
961             expensive work to save message to the log
962
963     """
964     if not 0.0 <= probability_of_call <= 1.0:
965         msg = f"probability_of_call must be between [0, 1]. Got {probability_of_call}."
966         logger.critical(msg)
967         raise ValueError(msg)
968
969     def decorator(f):
970         @functools.wraps(f)
971         def _call_with_probability(*args, **kwargs):
972             if random.uniform(0, 1) < probability_of_call:
973                 return f(*args, **kwargs)
974             else:
975                 logger.debug(
976                     "@call_with_probability_of_call skipping a call to %s", f.__name__
977                 )
978                 return None
979
980         return _call_with_probability
981
982     return decorator
983
984
985 def decorate_matching_methods_with(decorator, acl=None):
986     """Apply the given decorator to all methods in a class whose names
987     begin with prefix.  If prefix is None (default), decorate all
988     methods in the class.
989
990     Args:
991         decorator: the decorator to apply to matching class methods.
992         acl: the matcher used to predicate decorator application; None,
993             the default, applies the decorator to all class methods.
994             See :py:mod:`pyutils.security.acl` for more information
995             and options.
996
997     Example usage to wrap all methods whose names begin with either
998     "enter" or "exit" with the `@invocation_logged` decorator (see
999     :py:meth:`invocation_logged`)::
1000
1001         import pyutils.decorator_utils
1002         import pyutils.security.acl as acl
1003
1004         @decorator_utils.decorate_matching_methods_with(
1005             decorator_utils.invocation_logged,
1006             acl.StringWildcardBasedACL(
1007                 allowed_patterns=['enter*', 'exit*'],
1008                 acl.Order.ALLOW_DENY
1009             )
1010         )
1011         class MyClass:
1012             def __init__(self):
1013                 self.name = None
1014                 self.rating = None
1015
1016             def __repr__(self) -> str:
1017                 return f'{self.name} @ {self.rating}'
1018
1019             def enterName(self, n: str) -> None:
1020                 if len(n) > 5:
1021                     self.name = n
1022
1023             def exitName(self, n: str) -> None:
1024                 pass
1025
1026             def enterRating(self, r: int) -> None:
1027                 if 1 <= r <= 5:
1028                     self.rating = r
1029
1030             def exitRating(self, r: int) -> None:
1031                 pass
1032     """
1033
1034     def decorate_the_class(cls):
1035         for name, m in inspect.getmembers(cls, inspect.isfunction):
1036             if acl is None:
1037                 setattr(cls, name, decorator(m))
1038             else:
1039                 if acl(name):
1040                     setattr(cls, name, decorator(m))
1041         return cls
1042
1043     return decorate_the_class
1044
1045
1046 if __name__ == '__main__':
1047     import doctest
1048
1049     doctest.testmod()