More cleanup, yey!
[python_utils.git] / thread_utils.py
index af6e0e1abe840946a8754e426ca93bd653545ca5..4db4cf68b4ef916f323f90bae492d1c59dca361f 100644 (file)
@@ -1,15 +1,33 @@
 #!/usr/bin/env python3
 
 #!/usr/bin/env python3
 
+"""Utilities for dealing with threads + threading."""
+
 import functools
 import logging
 import os
 import threading
 from typing import Callable, Optional, Tuple
 
 import functools
 import logging
 import os
 import threading
 from typing import Callable, Optional, Tuple
 
+# This module is commonly used by others in here and should avoid
+# taking any unnecessary dependencies back on them.
+
 logger = logging.getLogger(__name__)
 
 
 def current_thread_id() -> str:
 logger = logging.getLogger(__name__)
 
 
 def current_thread_id() -> str:
+    """Returns a string composed of the parent process' id, the current
+    process' id and the current thread identifier.  The former two are
+    numbers (pids) whereas the latter is a thread id passed during thread
+    creation time.
+
+    >>> ret = current_thread_id()
+    >>> (ppid, pid, tid) = ret.split('/')
+    >>> ppid.isnumeric()
+    True
+    >>> pid.isnumeric()
+    True
+
+    """
     ppid = os.getppid()
     pid = os.getpid()
     tid = threading.current_thread().name
     ppid = os.getppid()
     pid = os.getpid()
     tid = threading.current_thread().name
@@ -17,12 +35,35 @@ def current_thread_id() -> str:
 
 
 def is_current_thread_main_thread() -> bool:
 
 
 def is_current_thread_main_thread() -> bool:
+    """Returns True is the current (calling) thread is the process' main
+    thread and False otherwise.
+
+    >>> is_current_thread_main_thread()
+    True
+
+    >>> result = None
+    >>> def thunk():
+    ...     global result
+    ...     result = is_current_thread_main_thread()
+
+    >>> thunk()
+    >>> result
+    True
+
+    >>> import threading
+    >>> thread = threading.Thread(target=thunk)
+    >>> thread.start()
+    >>> thread.join()
+    >>> result
+    False
+
+    """
     return threading.current_thread() is threading.main_thread()
 
 
 def background_thread(
     return threading.current_thread() is threading.main_thread()
 
 
 def background_thread(
-        _funct: Optional[Callable]
-) -> Tuple[threading.Thread, threading.Event]:
+    _funct: Optional[Callable],
+) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
     """A function decorator to create a background thread.
 
     *** Please note: the decorated function must take an shutdown ***
     """A function decorator to create a background thread.
 
     *** Please note: the decorated function must take an shutdown ***
@@ -52,11 +93,10 @@ def background_thread(
     periodically check.  If the event is set, it means the thread has
     been requested to terminate ASAP.
     """
     periodically check.  If the event is set, it means the thread has
     been requested to terminate ASAP.
     """
+
     def wrapper(funct: Callable):
         @functools.wraps(funct)
     def wrapper(funct: Callable):
         @functools.wraps(funct)
-        def inner_wrapper(
-                *a, **kwa
-        ) -> Tuple[threading.Thread, threading.Event]:
+        def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (*a, should_terminate)
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (*a, should_terminate)
@@ -66,21 +106,20 @@ def background_thread(
                 kwargs=kwa,
             )
             thread.start()
                 kwargs=kwa,
             )
             thread.start()
-            logger.debug(
-                f'Started thread {thread.name} tid={thread.ident}'
-            )
+            logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
             return (thread, should_terminate)
             return (thread, should_terminate)
+
         return inner_wrapper
 
     if _funct is None:
         return inner_wrapper
 
     if _funct is None:
-        return wrapper
+        return wrapper  # type: ignore
     else:
         return wrapper(_funct)
 
 
 def periodically_invoke(
     else:
         return wrapper(_funct)
 
 
 def periodically_invoke(
-        period_sec: float,
-        stop_after: Optional[int],
+    period_sec: float,
+    stop_after: Optional[int],
 ):
     """
     Periodically invoke a decorated function.  Stop after N invocations
 ):
     """
     Periodically invoke a decorated function.  Stop after N invocations
@@ -102,6 +141,7 @@ def periodically_invoke(
             print(f"Hello, {name}")
 
     """
             print(f"Hello, {name}")
 
     """
+
     def decorator_repeat(func):
         def helper_thread(should_terminate, *args, **kwargs) -> None:
             if stop_after is None:
     def decorator_repeat(func):
         def helper_thread(should_terminate, *args, **kwargs) -> None:
             if stop_after is None:
@@ -123,15 +163,17 @@ def periodically_invoke(
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (should_terminate, *args)
             should_terminate = threading.Event()
             should_terminate.clear()
             newargs = (should_terminate, *args)
-            thread = threading.Thread(
-                target=helper_thread,
-                args = newargs,
-                kwargs = kwargs
-            )
+            thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
             thread.start()
             thread.start()
-            logger.debug(
-                f'Started thread {thread.name} tid={thread.ident}'
-            )
+            logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
             return (thread, should_terminate)
             return (thread, should_terminate)
+
         return wrapper_repeat
         return wrapper_repeat
+
     return decorator_repeat
     return decorator_repeat
+
+
+if __name__ == '__main__':
+    import doctest
+
+    doctest.testmod()