Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / decorator_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4 # A portion (marked) below retain the original author's copyright.
5
6 """This is a grab bag of, hopefully, useful decorators."""
7
8 import enum
9 import functools
10 import inspect
11 import logging
12 import math
13 import multiprocessing
14 import random
15 import signal
16 import sys
17 import threading
18 import time
19 import traceback
20 import warnings
21 from typing import Any, Callable, List, NoReturn, Optional, Union
22
23 # This module is commonly used by others in here and should avoid
24 # taking any unnecessary dependencies back on them.
25
26 logger = logging.getLogger(__name__)
27
28
29 def timed(func: Callable) -> Callable:
30     """Prints + info logs the runtime of the decorated function at
31     each invocation.
32
33     >>> @timed
34     ... def foo():
35     ...     import time
36     ...     time.sleep(0.01)
37
38     >>> foo()  # doctest: +ELLIPSIS
39     Finished foo in ...
40     """
41
42     @functools.wraps(func)
43     def wrapper_timer(*args, **kwargs):
44         start_time = time.perf_counter()
45         value = func(*args, **kwargs)
46         end_time = time.perf_counter()
47         run_time = end_time - start_time
48         msg = f"Finished {func.__qualname__} in {run_time:.4f}s"
49         print(msg)
50         logger.info(msg)
51         return value
52
53     return wrapper_timer
54
55
56 def invocation_logged(func: Callable) -> Callable:
57     """Log the call of a function on sys.stdout and the info log.
58
59     >>> @invocation_logged
60     ... def foo():
61     ...     print('Hello, world.')
62
63     >>> foo()
64     Entered foo
65     Hello, world.
66     Exited foo
67     """
68
69     @functools.wraps(func)
70     def wrapper_invocation_logged(*args, **kwargs):
71         msg = f"Entered {func.__qualname__}"
72         print(msg)
73         logger.info(msg)
74         ret = func(*args, **kwargs)
75         msg = f"Exited {func.__qualname__}"
76         print(msg)
77         logger.info(msg)
78         return ret
79
80     return wrapper_invocation_logged
81
82
83 def rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) -> Callable:
84     """Limit invocation of a wrapped function to n calls per time period.
85     Thread-safe.  In testing this was relatively fair with multiple
86     threads using it though that hasn't been measured in detail.
87
88     .. note::
89
90         The doctest below makes use of
91         :py:class:`pyutils.parallelize.thread_utils.background_thread`.  See
92         that class' documentation for details.
93
94     >>> import time
95     >>> from pyutils import decorator_utils
96     >>> from pyutils.parallelize import thread_utils
97
98     >>> calls = 0
99
100     >>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0)
101     ... def limited(x: int):
102     ...     global calls
103     ...     calls += 1
104
105     >>> @thread_utils.background_thread
106     ... def a(stop):
107     ...     for _ in range(3):
108     ...         limited(_)
109
110     >>> @thread_utils.background_thread
111     ... def b(stop):
112     ...     for _ in range(3):
113     ...         limited(_)
114
115     >>> start = time.time()
116     >>> (thread1, event1) = a()
117     >>> (thread2, event2) = b()
118     >>> thread1.join()
119     >>> thread2.join()
120     >>> end = time.time()
121     >>> dur = end - start
122     >>> dur > 0.5
123     True
124     >>> calls
125     6
126     """
127
128     min_interval_seconds = per_period_in_seconds / float(n_calls)
129
130     def wrapper_rate_limited(func: Callable) -> Callable:
131         cv = threading.Condition()
132         last_invocation_timestamp = [0.0]
133
134         def may_proceed() -> float:
135             now = time.time()
136             last_invocation = last_invocation_timestamp[0]
137             if last_invocation != 0.0:
138                 elapsed_since_last = now - last_invocation
139                 wait_time = min_interval_seconds - elapsed_since_last
140             else:
141                 wait_time = 0.0
142             logger.debug("@%.4f> wait_time = %.4f", time.time(), wait_time)
143             return wait_time
144
145         def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
146             with cv:
147                 while True:
148                     if cv.wait_for(
149                         lambda: may_proceed() <= 0.0,
150                         timeout=may_proceed(),
151                     ):
152                         break
153             with cv:
154                 logger.debug("@%.4f> calling it...", time.time())
155                 ret = func(*args, **kargs)
156                 last_invocation_timestamp[0] = time.time()
157                 logger.debug(
158                     "@%.4f> Last invocation <- %.4f",
159                     time.time(),
160                     last_invocation_timestamp[0],
161                 )
162                 cv.notify()
163             return ret
164
165         return wrapper_wrapper_rate_limited
166
167     return wrapper_rate_limited
168
169
170 def debug_args(func: Callable) -> Callable:
171     """Print the function signature and return value at each call.
172
173     >>> @debug_args
174     ... def foo(a, b, c):
175     ...     print(a)
176     ...     print(b)
177     ...     print(c)
178     ...     return (a + b, c)
179
180     >>> foo(1, 2.0, "test")
181     Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>)
182     1
183     2.0
184     test
185     foo returned (3.0, 'test'):<class 'tuple'>
186     (3.0, 'test')
187     """
188
189     @functools.wraps(func)
190     def wrapper_debug_args(*args, **kwargs):
191         args_repr = [f"{repr(a)}:{type(a)}" for a in args]
192         kwargs_repr = [f"{k}={v!r}:{type(v)}" for k, v in kwargs.items()]
193         signature = ", ".join(args_repr + kwargs_repr)
194         msg = f"Calling {func.__qualname__}({signature})"
195         print(msg)
196         logger.info(msg)
197         value = func(*args, **kwargs)
198         msg = f"{func.__qualname__} returned {value!r}:{type(value)}"
199         print(msg)
200         logger.info(msg)
201         return value
202
203     return wrapper_debug_args
204
205
206 def debug_count_calls(func: Callable) -> Callable:
207     """Count function invocations and print a message befor every call.
208
209     >>> @debug_count_calls
210     ... def factoral(x):
211     ...     if x == 1:
212     ...         return 1
213     ...     return x * factoral(x - 1)
214
215     >>> factoral(5)
216     Call #1 of 'factoral'
217     Call #2 of 'factoral'
218     Call #3 of 'factoral'
219     Call #4 of 'factoral'
220     Call #5 of 'factoral'
221     120
222     """
223
224     @functools.wraps(func)
225     def wrapper_debug_count_calls(*args, **kwargs):
226         wrapper_debug_count_calls.num_calls += 1
227         msg = f"Call #{wrapper_debug_count_calls.num_calls} of {func.__name__!r}"
228         print(msg)
229         logger.info(msg)
230         return func(*args, **kwargs)
231
232     wrapper_debug_count_calls.num_calls = 0  # type: ignore
233     return wrapper_debug_count_calls
234
235
236 class DelayWhen(enum.IntEnum):
237     """This enum is used with the `@delay` decorator to indicate that the
238     delay should happen before wrapped function invocation, after wrapped
239     function invocation, or both.
240
241     See: :py:meth:`delay`.
242     """
243
244     BEFORE_CALL = 1
245     AFTER_CALL = 2
246     BEFORE_AND_AFTER = 3
247
248
249 def delay(
250     _func: Callable = None,
251     *,
252     seconds: float = 1.0,
253     when: DelayWhen = DelayWhen.BEFORE_CALL,
254 ) -> Callable:
255     """Slow down a function by inserting a delay before and/or after its
256     invocation.
257
258     Args:
259         seconds: how long should we delay (via a simple `time.sleep()`)?
260         when: when should we delay.. before the invocation, after it, or both?
261
262     >>> @delay(seconds=1.0)
263     ... def foo():
264     ...     pass
265
266     >>> import time
267     >>> start = time.time()
268     >>> foo()
269     >>> dur = time.time() - start
270     >>> dur >= 1.0
271     True
272     """
273
274     def decorator_delay(func: Callable) -> Callable:
275         @functools.wraps(func)
276         def wrapper_delay(*args, **kwargs):
277             if when & DelayWhen.BEFORE_CALL:
278                 logger.debug("@delay for %fs BEFORE_CALL to %s", seconds, func.__name__)
279                 time.sleep(seconds)
280             retval = func(*args, **kwargs)
281             if when & DelayWhen.AFTER_CALL:
282                 logger.debug("@delay for %fs AFTER_CALL to %s", seconds, func.__name__)
283                 time.sleep(seconds)
284             return retval
285
286         return wrapper_delay
287
288     if _func is None:
289         return decorator_delay
290     else:
291         return decorator_delay(_func)
292
293
294 class _SingletonWrapper:
295     """An internal singleton wrapper class. Its instances are created
296     for each decorated class.
297     """
298
299     def __init__(self, cls):
300         self.__wrapped__ = cls
301         self._instance = None
302
303     def __call__(self, *args, **kwargs):
304         """Returns a single instance of decorated class"""
305         logger.debug(
306             "@singleton returning global instance of %s", self.__wrapped__.__name__
307         )
308         if self._instance is None:
309             self._instance = self.__wrapped__(*args, **kwargs)
310         return self._instance
311
312
313 def singleton(cls):
314     """
315     A singleton decorator; adding this to a class results in the decorator making
316     sure that there exists only one instance of that class globally in the
317     program by creating an instance the first time the class is constructed
318     and then returning the previously created singleton instance on subsequent
319     creation requests.
320
321     See also :py:meth:`pyutils.persistent.persistent_autoloaded_singleton`.
322
323     >>> @singleton
324     ... class global_configuration(object):
325     ...     pass
326
327     >>> a = global_configuration()
328     >>> b = global_configuration()
329     >>> a is b
330     True
331     >>> id(a) == id(b)
332     True
333     """
334     return _SingletonWrapper(cls)
335
336
337 def memoized(func: Callable) -> Callable:
338     """Keep a cache of previous function call results.  Use this with
339     pure functions without side effects that do expensive work.
340
341     The internal cache is a simple dict with a key based on the
342     arguments to the call so the result of the function must be determined
343     only by its parameters (i.e. it must be "functional") or this will
344     introduce errors.  See:
345     https://en.wikipedia.org/wiki/Functional_programming#Pure_functions
346
347     Consider also: :py:meth:`functools.cache` for a more advanced
348     implementation.  See:
349     https://docs.python.org/3/library/functools.html#functools.cache
350
351     >>> import time
352     >>> @memoized
353     ... def expensive(arg) -> int:
354     ...     # Simulate something slow to compute or lookup, like a
355     ...     # computationally expensive task or a network read of
356     ...     # static data (i.e. that should never change).
357     ...     time.sleep(1.0)
358     ...     return arg * arg
359
360     >>> start = time.time()
361     >>> expensive(5)           # Takes about 1 sec
362     25
363     >>> expensive(3)           # Also takes about 1 sec
364     9
365     >>> expensive(5)           # Pulls from cache, fast
366     25
367     >>> expensive(3)           # Pulls from cache again, fast
368     9
369     >>> dur = time.time() - start
370     >>> dur < 3.0
371     True
372
373     """
374
375     @functools.wraps(func)
376     def wrapper_memoized(*args, **kwargs):
377         cache_key = args + tuple(kwargs.items())
378         if cache_key not in wrapper_memoized.cache:
379             value = func(*args, **kwargs)
380             logger.debug("Memoizing %s => %s for %s", cache_key, value, func.__name__)
381             wrapper_memoized.cache[cache_key] = value
382         else:
383             logger.debug("Returning memoized value for %s", {func.__name__})
384         return wrapper_memoized.cache[cache_key]
385
386     wrapper_memoized.cache = {}  # type: ignore
387     return wrapper_memoized
388
389
390 def predicated_retry_with_backoff(
391     tries: int,
392     *,
393     predicate: Callable[..., bool],
394     delay_sec: float = 3.0,
395     backoff: float = 2.0,
396 ):
397     """Retries a function or method up to a certain number of times with a
398     prescribed initial delay period and backoff rate (multiplier).  Note
399     that :py:meth:`retry_if_false` and :py:meth:`retry_if_none` both
400     use this class with a predefined predicate but you can also use
401     it directly with your own custom predicate.
402
403     Args:
404         tries: the maximum number of attempts to run the function
405         delay_sec: sets the initial delay period in seconds
406         backoff: a multiplier (must be >=1.0) used to modify the
407             delay at each subsequent invocation
408         predicate: a Callable that will be passed the retval of
409             the decorated function and must return True to indicate
410             that we should stop calling or False to indicate a retry
411             is necessary
412
413     Raises:
414         ValueError: on invalid arguments; e.g. backoff must be >= 1.0,
415             delay_sec must be >= 0.0, tries must be > 0.
416
417     .. note::
418
419         If after `tries` attempts the wrapped function is still
420         failing, this code returns the failure result to the caller.
421
422     Example usage that would call `make_the_RPC_call` up to three
423     times (as long as it returns a tuple with `False` in the second
424     element) with a delay of 1.0s the first time, 2.0s the second
425     time, and 4.0s the third time.::
426
427         @decorator_utils.predicated_retry_with_backoff(
428             3,
429             predicate=lambda _: _[2] is False,
430             delay_sec=1.0,
431             backoff=2
432         )
433         def make_the_RPC_call() -> Tuple[str, int, bool]:
434             whatever
435
436     """
437
438     if backoff < 1.0:
439         msg = f"backoff must be greater than or equal to 1, got {backoff}"
440         logger.critical(msg)
441         raise ValueError(msg)
442
443     tries = math.floor(tries)
444     if tries < 0:
445         msg = f"tries must be 0 or greater, got {tries}"
446         logger.critical(msg)
447         raise ValueError(msg)
448
449     if delay_sec <= 0:
450         msg = f"delay_sec must be greater than 0, got {delay_sec}"
451         logger.critical(msg)
452         raise ValueError(msg)
453
454     def deco_retry(f):
455         @functools.wraps(f)
456         def f_retry(*args, **kwargs):
457             mtries, mdelay = tries, delay_sec  # make mutable
458             logger.debug("deco_retry: will make up to %d attempts...", mtries)
459             retval = f(*args, **kwargs)
460             while mtries > 0:
461                 if predicate(retval) is True:
462                     logger.debug("Predicate succeeded, deco_retry is done.")
463                     return retval
464                 logger.debug("Predicate failed, sleeping and retrying.")
465                 mtries -= 1
466                 time.sleep(mdelay)
467                 mdelay *= backoff
468                 retval = f(*args, **kwargs)
469             return retval
470
471         return f_retry
472
473     return deco_retry
474
475
476 def retry_if_false(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
477     """A helper for `@predicated_retry_with_backoff` that retries a
478     decorated function as long as it keeps returning False.
479
480     Args:
481         tries: max number of times to retry
482         delay_sec: initial delay before retry length in seconds
483         backoff: a multiplier (must be >= 1.0) used to optionally increase
484             subsequent delays on repeated failures.
485
486     .. note::
487
488         If after `tries` attempts the wrapped function is still
489         failing, this code returns the failure result (i.e. False) to
490         the caller.
491
492     >>> import time
493     >>> counter = 0
494     >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1)
495     ... def foo():
496     ...     global counter
497     ...     counter += 1
498     ...     return counter >= 3
499
500     >>> start = time.time()
501     >>> foo()  # fail, delay 1.0, fail, delay 1.1, succeed
502     True
503
504     >>> dur = time.time() - start
505     >>> counter
506     3
507     >>> dur > 2.0
508     True
509     >>> dur < 2.3
510     True
511
512     """
513     return predicated_retry_with_backoff(
514         tries,
515         predicate=lambda x: x is True,
516         delay_sec=delay_sec,
517         backoff=backoff,
518     )
519
520
521 def retry_if_none(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
522     """A helper for `@predicated_retry_with_backoff` that continues to
523     invoke the wrapped function as long as it keeps returning None.
524     Retries up to N times with a delay between each retry and a
525     backoff that can increase the delay.
526
527     Args:
528         tries: max number of times to retry
529         delay_sec: initial delay before retry length in seconds
530         backoff: a multiplier (must be >= 1.0) used to optionally increase
531             subsequent delays on repeated failures.
532
533     .. note::
534
535         If after `tries` attempts the wrapped function is still
536         failing, this code returns the failure result (i.e. None) to
537         the caller.
538
539     Example usage... calls a function that reads a URL from the network
540     and returns the raw HTTP response or None on error with up to three
541     retries with an increasing backoff::
542
543         @retry_if_none(3, delay_sec=1.0, backoff=4.0)
544         def fetch_the_image(url: str) -> Optional[bytes]:
545             r = requests.get(url)
546             if r.status_code != 200:
547                 return None
548             return r.content
549
550         # Use normally
551         image_binary_data = fetch_the_image(
552             'https://www.whatever.com/foo/bar/baz.jpg'
553         )
554
555         # Note: even with retries this might still fail; be prepared
556         # to still receive a None return value.
557         if image_binary_data is None:
558             raise Exception(f"Couldn't read {url}?!")
559     """
560     return predicated_retry_with_backoff(
561         tries,
562         predicate=lambda x: x is not None,
563         delay_sec=delay_sec,
564         backoff=backoff,
565     )
566
567
568 def deprecated(func):
569     """This is a decorator which can be used to mark functions
570     as deprecated. It will result in a warning being emitted
571     when the function is used.  The warning includes the caller
572     as determined by examining the stack in the warning log.
573
574     >>> @deprecated
575     ... def foo() -> None:
576     ...     pass
577     >>> foo()   # prints + logs "Call to deprecated function foo"
578     """
579
580     @functools.wraps(func)
581     def wrapper_deprecated(*args, **kwargs):
582         msg = f"Call to deprecated function {func.__qualname__}"
583         logger.warning(msg)
584         warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
585         print(msg, file=sys.stderr)
586         return func(*args, **kwargs)
587
588     return wrapper_deprecated
589
590
591 def thunkify(func):
592     """Make a function immediately return a function of no args which,
593     when called, waits for the original result.  Meanwhile spin up a
594     background thread to begin computing the result in parallel.
595
596     Example usage... hide a slow network read behind a thunk that will
597     block only when it is called::
598
599         @thunkify
600         def read_url(url: str) -> Result:
601             make a slow network read
602
603         urls = [ long list of urls ]
604         results = []
605
606         for url in urls:
607             results.append(read_url(url))
608
609     In this example, we will start one background thread per url(!!)
610     requested.  The result of read_url is no longer a `Result` but
611     rather a `Callable` (see `thunk` below) that, when invoked, awaits
612     the Result and returns it.
613
614     For more control over things like the number of worker threads and
615     the ability cause work to be done on background processes or even
616     on other machines, see
617     :py:class:`pyutils.parallelize.SmartFuture`,
618     :py:class:`pyutils.parallelize.DeferredOperation` and
619     :py:mod:`pyutils.parallelize.parallelize`.
620     """
621
622     @functools.wraps(func)
623     def lazy_thunked(*args, **kwargs):
624         wait_event = threading.Event()
625
626         result = [None]
627         exc: List[Any] = [False, None]
628
629         def worker_func():
630             try:
631                 func_result = func(*args, **kwargs)
632                 result[0] = func_result
633             except Exception:
634                 exc[0] = True
635                 exc[1] = sys.exc_info()  # (type, value, traceback)
636                 msg = f"Thunkify has thrown an exception (will be raised on thunk()):\n{traceback.format_exc()}"
637                 logger.warning(msg)
638             finally:
639                 wait_event.set()
640
641         def thunk():
642             wait_event.wait()
643             if exc[0]:
644                 assert exc[1]
645                 raise exc[1][0](exc[1][1])
646             return result[0]
647
648         threading.Thread(target=worker_func).start()
649         return thunk
650
651     return lazy_thunked
652
653
654 ############################################################
655 # Timeout
656 ############################################################
657
658 # http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
659 # Used work of Stephen "Zero" Chappell <[email protected]>
660 # in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
661
662 # Original work is covered by PSF-2.0:
663
664 # 1. This LICENSE AGREEMENT is between the Python Software Foundation
665 # ("PSF"), and the Individual or Organization ("Licensee") accessing
666 # and otherwise using this software ("Python") in source or binary
667 # form and its associated documentation.
668 #
669 # 2. Subject to the terms and conditions of this License Agreement,
670 # PSF hereby grants Licensee a nonexclusive, royalty-free, world-wide
671 # license to reproduce, analyze, test, perform and/or display
672 # publicly, prepare derivative works, distribute, and otherwise use
673 # Python alone or in any derivative version, provided, however, that
674 # PSF's License Agreement and PSF's notice of copyright, i.e.,
675 # "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 Python Software
676 # Foundation; All Rights Reserved" are retained in Python alone or in
677 # any derivative version prepared by Licensee.
678
679 # 3. In the event Licensee prepares a derivative work that is based on
680 # or incorporates Python or any part thereof, and wants to make the
681 # derivative work available to others as provided herein, then
682 # Licensee hereby agrees to include in any such work a brief summary
683 # of the changes made to Python.
684
685 # (N.B. See `NOTICE <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=NOTICE;hb=HEAD>`__ file in the root of this module for a list
686 # of changes)
687
688 # 4. PSF is making Python available to Licensee on an "AS IS"
689 # basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
690 # IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
691 # DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR
692 # FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL
693 # NOT INFRINGE ANY THIRD PARTY RIGHTS.
694
695 # 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
696 # FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A
697 # RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, OR ANY
698 # DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
699
700 # 6. This License Agreement will automatically terminate upon a
701 # material breach of its terms and conditions.
702
703 # 7. Nothing in this License Agreement shall be deemed to create any
704 # relationship of agency, partnership, or joint venture between PSF
705 # and Licensee. This License Agreement does not grant permission to
706 # use PSF trademarks or trade name in a trademark sense to endorse or
707 # promote products or services of Licensee, or any third party.
708
709 # 8. By copying, installing or otherwise using Python, Licensee agrees
710 # to be bound by the terms and conditions of this License Agreement.
711
712
713 def _raise_exception(exception, error_message: Optional[str]) -> NoReturn:
714     """Internal.  Raise a deferred exception"""
715     if error_message is None:
716         raise Exception(exception)
717     else:
718         raise Exception(error_message)
719
720
721 def _target(queue, function, *args, **kwargs):
722     """Run a function with arguments and return output via a queue.
723
724     This is a helper function for the Process created in _Timeout. It runs
725     the function with positional arguments and keyword arguments and then
726     returns the function's output by way of a queue. If an exception gets
727     raised, it is returned to _Timeout to be raised by the value property.
728     """
729     try:
730         queue.put((True, function(*args, **kwargs)))
731     except Exception:
732         queue.put((False, sys.exc_info()[1]))
733
734
735 class _Timeout(object):
736     """Wrap a function and add a timeout to it.
737
738     .. warning::
739
740         Instances of this class are automatically generated by the
741         :py:meth:`timeout` function defined below.  Do not use
742         directly.  Example usage on :py:meth:`timeout`.
743
744     """
745
746     def __init__(
747         self,
748         function: Callable,
749         timeout_exception: Exception,
750         error_message: str,
751         seconds: float,
752     ):
753         """
754         .. warning::
755
756             Instances of this class are automatically generated by the
757             :py:meth:`timeout` function defined below.  Do not use
758             directly.  Example usage on :py:meth:`timeout`.
759         """
760         self.__limit = seconds
761         self.__function = function
762         self.__timeout_exception = timeout_exception
763         self.__error_message = error_message
764         self.__name__ = function.__name__
765         self.__doc__ = function.__doc__
766         self.__timeout = time.time()
767         self.__process = multiprocessing.Process()
768         self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
769
770     def __call__(self, *args, **kwargs):
771         """Execute the embedded function object asynchronously.
772
773         The function given to the constructor is transparently called and
774         requires that "ready" be intermittently polled. If and when it is
775         True, the "value" property may then be checked for returned data.
776         """
777         self.__limit = kwargs.pop("timeout", self.__limit)
778         self.__queue = multiprocessing.Queue(1)
779         args = (self.__queue, self.__function) + args
780         self.__process = multiprocessing.Process(
781             target=_target, args=args, kwargs=kwargs
782         )
783         self.__process.daemon = True
784         self.__process.start()
785         if self.__limit is not None:
786             self.__timeout = self.__limit + time.time()
787         while not self.ready:
788             time.sleep(0.1)
789         return self.value
790
791     def cancel(self):
792         """Terminate any possible execution of the embedded function."""
793         if self.__process.is_alive():
794             self.__process.terminate()
795         _raise_exception(self.__timeout_exception, self.__error_message)
796
797     @property
798     def ready(self):
799         """Read-only property indicating status of "value" property."""
800         if self.__limit and self.__timeout < time.time():
801             self.cancel()
802         return self.__queue.full() and not self.__queue.empty()
803
804     @property
805     def value(self):
806         """Read-only property containing data returned from function."""
807         if self.ready is True:
808             flag, load = self.__queue.get()
809             if flag:
810                 return load
811             raise load
812         return None
813
814
815 def timeout(
816     seconds: float = 1.0,
817     use_signals: Optional[bool] = None,
818     timeout_exception=TimeoutError,
819     error_message="Function call timed out",
820 ):
821     """Add a timeout to a function.  If the function takes longer than
822     the given timeout (in seconds) it will raise an exception and
823     return control to the caller.
824
825     .. note::
826
827         the use_signals parameter is included in order to support
828         multiprocessing scenarios (signal can only be used from the
829         process' main thread).  When not using signals, timeout
830         granularity will be rounded to the nearest 0.1s and will poll.
831
832     .. warning::
833
834         Beware that a @timeout on a function inside at the
835         module-level will be evaluated at module load time and not
836         when the wrapped function is invoked.  This is somewhat
837         counterintuitive and tricky and it can lead to problems when
838         relying on the automatic main thread detection code
839         (`use_signals=None`, the default) since the import probably
840         happens on the main thread and the invocation can happen on a
841         different thread (one which can't use signals).  If in doubt,
842         do not use the automatic signal safety logic and set their
843         `use_signals` argument explicitly.
844
845     Raises:
846         Exception: the timeout was reached
847
848     It is illegal to pass anything other than a function as the first
849     parameter.  The function is wrapped and returned to the caller.
850
851     >>> @timeout(0.2)
852     ... def foo(delay: float):
853     ...     time.sleep(delay)
854     ...     return "ok"
855
856     >>> foo(0)
857     'ok'
858
859     >>> foo(1.0)
860     Traceback (most recent call last):
861     ...
862     Exception: Function call timed out
863
864     """
865     if use_signals is None:
866         import pyutils.parallelize.thread_utils as tu
867
868         use_signals = tu.is_current_thread_main_thread()
869         # Please see warning above!!!
870
871     def decorate(function):
872         if use_signals:
873
874             def handler(unused_signum, unused_frame):
875                 _raise_exception(timeout_exception, error_message)
876
877             @functools.wraps(function)
878             def new_function(*args, **kwargs):
879                 new_seconds = kwargs.pop("timeout", seconds)
880                 if new_seconds:
881                     old = signal.signal(signal.SIGALRM, handler)
882                     signal.setitimer(signal.ITIMER_REAL, new_seconds)
883
884                 if not seconds:
885                     return function(*args, **kwargs)
886
887                 try:
888                     return function(*args, **kwargs)
889                 finally:
890                     if new_seconds:
891                         signal.setitimer(signal.ITIMER_REAL, 0)
892                         signal.signal(signal.SIGALRM, old)
893
894             return new_function
895         else:
896
897             @functools.wraps(function)
898             def new_function(*args, **kwargs):
899                 timeout_wrapper = _Timeout(
900                     function, timeout_exception, error_message, seconds
901                 )
902                 return timeout_wrapper(*args, **kwargs)
903
904             return new_function
905
906     return decorate
907
908
909 def synchronized(lock: Union[threading.Lock, threading.RLock]):
910     """Emulates java's "synchronized" keyword: given a lock, require
911     that threads take that lock (or wait) before invoking the wrapped
912     function and automatically releases the lock afterwards.
913
914     Args:
915         lock: the lock that must be held to invoke the wrapped function.
916
917     Example usage.  Imagine we have shared state between multiple thread
918     or processes and, to update the shared state, code should take a lock
919     to ensure only one writer is modifying the state at a time.  Any kind
920     of python lock that has an `acquire` method can be used with the
921     `@synchronized` decorator and it will handle acquisition and release
922     automatically::
923
924         import threading
925
926         lock = threading.Lock()
927
928         @synchronized(lock)
929         def update_shared_state():
930             do some work
931
932     """
933
934     def wrap(f):
935         @functools.wraps(f)
936         def _gatekeeper(*args, **kw):
937             lock.acquire()
938             try:
939                 return f(*args, **kw)
940             finally:
941                 lock.release()
942
943         return _gatekeeper
944
945     return wrap
946
947
948 def call_probabilistically(probability_of_call: float) -> Callable:
949     """Calls the wrapped function probabilistically given a rate
950     between 0.0 and 1.0 inclusive (0% probability and 100%
951     probability).
952
953     Args:
954         probability_of_call: probability with which to invoke the
955             wrapped function.  Must be 0 <= probabilty <= 1.0.
956
957     Raises:
958         ValueError: invalid probability argument
959
960     Example usage... this example would skip the invocation of
961     `log_the_entire_request_message` 95% of the time and only invoke
962     if 5% of the time.::
963
964         @call_probabilistically(0.05)
965         def log_the_entire_request_message(message: Whatever):
966             expensive work to save message to the log
967
968     """
969     if not 0.0 <= probability_of_call <= 1.0:
970         msg = f"probability_of_call must be between [0, 1]. Got {probability_of_call}."
971         logger.critical(msg)
972         raise ValueError(msg)
973
974     def decorator(f):
975         @functools.wraps(f)
976         def _call_with_probability(*args, **kwargs):
977             if random.uniform(0, 1) < probability_of_call:
978                 return f(*args, **kwargs)
979             else:
980                 logger.debug(
981                     "@call_with_probability_of_call skipping a call to %s", f.__name__
982                 )
983                 return None
984
985         return _call_with_probability
986
987     return decorator
988
989
990 def decorate_matching_methods_with(decorator: Callable, acl: Optional[Callable] = None):
991     """Apply the given decorator to all methods in a class whose names
992     begin with prefix.  If prefix is None (default), decorate all
993     methods in the class.
994
995     Args:
996         decorator: the decorator to apply to matching class methods.
997         acl: the matcher used to predicate decorator application; None,
998             the default, applies the decorator to all class methods.
999             See :py:mod:`pyutils.security.acl` for more information
1000             and options.
1001
1002     Example usage to wrap all methods whose names begin with either
1003     "enter" or "exit" with the `@invocation_logged` decorator (see
1004     :py:meth:`invocation_logged`)::
1005
1006         import pyutils.decorator_utils
1007         import pyutils.security.acl as acl
1008
1009         @decorator_utils.decorate_matching_methods_with(
1010             decorator_utils.invocation_logged,
1011             acl.StringWildcardBasedACL(
1012                 allowed_patterns=['enter*', 'exit*'],
1013                 acl.Order.ALLOW_DENY
1014             )
1015         )
1016         class MyClass:
1017             def __init__(self):
1018                 self.name = None
1019                 self.rating = None
1020
1021             def __repr__(self) -> str:
1022                 return f'{self.name} @ {self.rating}'
1023
1024             def enterName(self, n: str) -> None:
1025                 if len(n) > 5:
1026                     self.name = n
1027
1028             def exitName(self, n: str) -> None:
1029                 pass
1030
1031             def enterRating(self, r: int) -> None:
1032                 if 1 <= r <= 5:
1033                     self.rating = r
1034
1035             def exitRating(self, r: int) -> None:
1036                 pass
1037     """
1038
1039     def decorate_the_class(cls):
1040         for name, m in inspect.getmembers(cls, inspect.isfunction):
1041             if acl is None or acl(name):
1042                 setattr(cls, name, decorator(m))
1043         return cls
1044
1045     return decorate_the_class
1046
1047
1048 if __name__ == "__main__":
1049     import doctest
1050
1051     doctest.testmod()