Optionally surface exceptions that happen under executors by reading
[python_utils.git] / thread_utils.py
index 0130cdc510547196d418d6699d1b46b84a6ddf7c..51078a4e57ebe9193a3eee4669a1cf33a55bb4e0 100644 (file)
@@ -13,6 +13,19 @@ logger = logging.getLogger(__name__)
 
 
 def current_thread_id() -> str:
 
 
 def current_thread_id() -> str:
+    """Returns a string composed of the parent process' id, the current
+    process' id and the current thread identifier.  The former two are
+    numbers (pids) whereas the latter is a thread id passed during thread
+    creation time.
+
+    >>> ret = current_thread_id()
+    >>> (ppid, pid, tid) = ret.split('/')
+    >>> ppid.isnumeric()
+    True
+    >>> pid.isnumeric()
+    True
+
+    """
     ppid = os.getppid()
     pid = os.getpid()
     tid = threading.current_thread().name
     ppid = os.getppid()
     pid = os.getpid()
     tid = threading.current_thread().name
@@ -22,12 +35,32 @@ def current_thread_id() -> str:
 def is_current_thread_main_thread() -> bool:
     """Returns True is the current (calling) thread is the process' main
     thread and False otherwise.
 def is_current_thread_main_thread() -> bool:
     """Returns True is the current (calling) thread is the process' main
     thread and False otherwise.
+
+    >>> is_current_thread_main_thread()
+    True
+
+    >>> result = None
+    >>> def thunk():
+    ...     global result
+    ...     result = is_current_thread_main_thread()
+
+    >>> thunk()
+    >>> result
+    True
+
+    >>> import threading
+    >>> thread = threading.Thread(target=thunk)
+    >>> thread.start()
+    >>> thread.join()
+    >>> result
+    False
+
     """
     return threading.current_thread() is threading.main_thread()
 
 
 def background_thread(
     """
     return threading.current_thread() is threading.main_thread()
 
 
 def background_thread(
-        _funct: Optional[Callable]
+    _funct: Optional[Callable],
 ) -> Tuple[threading.Thread, threading.Event]:
     """A function decorator to create a background thread.
 
 ) -> Tuple[threading.Thread, threading.Event]:
     """A function decorator to create a background thread.
 
@@ -58,11 +91,10 @@ def background_thread(
     periodically check.  If the event is set, it means the thread has
     been requested to terminate ASAP.
     """
     periodically check.  If the event is set, it means the thread has
     been requested to terminate ASAP.
     """
+
     def wrapper(funct: Callable):
         @functools.wraps(funct)
     def wrapper(funct: Callable):
         @functools.wraps(funct)
-        def inner_wrapper(
-                *a, **kwa
-        ) -> Tuple[threading.Thread, threading.Event]:
+        def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (*a, should_terminate)
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (*a, should_terminate)
@@ -72,10 +104,9 @@ def background_thread(
                 kwargs=kwa,
             )
             thread.start()
                 kwargs=kwa,
             )
             thread.start()
-            logger.debug(
-                f'Started thread {thread.name} tid={thread.ident}'
-            )
+            logger.debug(f'Started thread {thread.name} tid={thread.ident}')
             return (thread, should_terminate)
             return (thread, should_terminate)
+
         return inner_wrapper
 
     if _funct is None:
         return inner_wrapper
 
     if _funct is None:
@@ -85,8 +116,8 @@ def background_thread(
 
 
 def periodically_invoke(
 
 
 def periodically_invoke(
-        period_sec: float,
-        stop_after: Optional[int],
+    period_sec: float,
+    stop_after: Optional[int],
 ):
     """
     Periodically invoke a decorated function.  Stop after N invocations
 ):
     """
     Periodically invoke a decorated function.  Stop after N invocations
@@ -108,6 +139,7 @@ def periodically_invoke(
             print(f"Hello, {name}")
 
     """
             print(f"Hello, {name}")
 
     """
+
     def decorator_repeat(func):
         def helper_thread(should_terminate, *args, **kwargs) -> None:
             if stop_after is None:
     def decorator_repeat(func):
         def helper_thread(should_terminate, *args, **kwargs) -> None:
             if stop_after is None:
@@ -129,15 +161,17 @@ def periodically_invoke(
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (should_terminate, *args)
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (should_terminate, *args)
-            thread = threading.Thread(
-                target=helper_thread,
-                args = newargs,
-                kwargs = kwargs
-            )
+            thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
             thread.start()
             thread.start()
-            logger.debug(
-                f'Started thread {thread.name} tid={thread.ident}'
-            )
+            logger.debug(f'Started thread {thread.name} tid={thread.ident}')
             return (thread, should_terminate)
             return (thread, should_terminate)
+
         return wrapper_repeat
         return wrapper_repeat
+
     return decorator_repeat
     return decorator_repeat
+
+
+if __name__ == '__main__':
+    import doctest
+
+    doctest.testmod()