#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""Utilities for dealing with threads + threading."""
+
import functools
import logging
import os
import threading
-from typing import Callable, Optional, Tuple
+from typing import Any, Callable, Optional, Tuple
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
def current_thread_id() -> str:
+ """Returns a string composed of the parent process' id, the current
+ process' id and the current thread identifier. The former two are
+ numbers (pids) whereas the latter is a thread id passed during thread
+ creation time.
+
+ >>> ret = current_thread_id()
+ >>> (ppid, pid, tid) = ret.split('/')
+ >>> ppid.isnumeric()
+ True
+ >>> pid.isnumeric()
+ True
+
+ """
ppid = os.getppid()
pid = os.getpid()
tid = threading.current_thread().name
def is_current_thread_main_thread() -> bool:
"""Returns True is the current (calling) thread is the process' main
thread and False otherwise.
+
+ >>> is_current_thread_main_thread()
+ True
+
+ >>> result = None
+ >>> def thunk():
+ ... global result
+ ... result = is_current_thread_main_thread()
+
+ >>> thunk()
+ >>> result
+ True
+
+ >>> import threading
+ >>> thread = threading.Thread(target=thunk)
+ >>> thread.start()
+ >>> thread.join()
+ >>> result
+ False
+
"""
return threading.current_thread() is threading.main_thread()
def background_thread(
- _funct: Optional[Callable]
-) -> Tuple[threading.Thread, threading.Event]:
+ _funct: Optional[Callable[..., Any]],
+) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
"""A function decorator to create a background thread.
*** Please note: the decorated function must take an shutdown ***
periodically check. If the event is set, it means the thread has
been requested to terminate ASAP.
"""
+
def wrapper(funct: Callable):
@functools.wraps(funct)
- def inner_wrapper(
- *a, **kwa
- ) -> Tuple[threading.Thread, threading.Event]:
+ def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
should_terminate = threading.Event()
should_terminate.clear()
newargs = (*a, should_terminate)
kwargs=kwa,
)
thread.start()
- logger.debug(
- f'Started thread {thread.name} tid={thread.ident}'
- )
+ logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
return (thread, should_terminate)
+
return inner_wrapper
if _funct is None:
- return wrapper
+ return wrapper # type: ignore
else:
return wrapper(_funct)
def periodically_invoke(
- period_sec: float,
- stop_after: Optional[int],
+ period_sec: float,
+ stop_after: Optional[int],
):
"""
Periodically invoke a decorated function. Stop after N invocations
print(f"Hello, {name}")
"""
+
def decorator_repeat(func):
def helper_thread(should_terminate, *args, **kwargs) -> None:
if stop_after is None:
should_terminate = threading.Event()
should_terminate.clear()
newargs = (should_terminate, *args)
- thread = threading.Thread(
- target=helper_thread,
- args = newargs,
- kwargs = kwargs
- )
+ thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
thread.start()
- logger.debug(
- f'Started thread {thread.name} tid={thread.ident}'
- )
+ logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
return (thread, should_terminate)
+
return wrapper_repeat
+
return decorator_repeat
+
+
+if __name__ == '__main__':
+ import doctest
+
+ doctest.testmod()