X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=thread_utils.py;h=4db4cf68b4ef916f323f90bae492d1c59dca361f;hb=e8fbbb7306430478dec55d2c963eed116d8330cc;hp=ad1f0bf9029b3232ba9cbd28b085afade91f0186;hpb=36fea7f15ed17150691b5b3ead75450e575229ef;p=python_utils.git diff --git a/thread_utils.py b/thread_utils.py index ad1f0bf..4db4cf6 100644 --- a/thread_utils.py +++ b/thread_utils.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +"""Utilities for dealing with threads + threading.""" + import functools import logging import os @@ -13,6 +15,19 @@ logger = logging.getLogger(__name__) def current_thread_id() -> str: + """Returns a string composed of the parent process' id, the current + process' id and the current thread identifier. The former two are + numbers (pids) whereas the latter is a thread id passed during thread + creation time. + + >>> ret = current_thread_id() + >>> (ppid, pid, tid) = ret.split('/') + >>> ppid.isnumeric() + True + >>> pid.isnumeric() + True + + """ ppid = os.getppid() pid = os.getpid() tid = threading.current_thread().name @@ -22,13 +37,33 @@ def current_thread_id() -> str: def is_current_thread_main_thread() -> bool: """Returns True is the current (calling) thread is the process' main thread and False otherwise. + + >>> is_current_thread_main_thread() + True + + >>> result = None + >>> def thunk(): + ... global result + ... result = is_current_thread_main_thread() + + >>> thunk() + >>> result + True + + >>> import threading + >>> thread = threading.Thread(target=thunk) + >>> thread.start() + >>> thread.join() + >>> result + False + """ return threading.current_thread() is threading.main_thread() def background_thread( _funct: Optional[Callable], -) -> Tuple[threading.Thread, threading.Event]: +) -> Callable[..., Tuple[threading.Thread, threading.Event]]: """A function decorator to create a background thread. *** Please note: the decorated function must take an shutdown *** @@ -61,9 +96,7 @@ def background_thread( def wrapper(funct: Callable): @functools.wraps(funct) - def inner_wrapper( - *a, **kwa - ) -> Tuple[threading.Thread, threading.Event]: + def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]: should_terminate = threading.Event() should_terminate.clear() newargs = (*a, should_terminate) @@ -73,13 +106,13 @@ def background_thread( kwargs=kwa, ) thread.start() - logger.debug(f'Started thread {thread.name} tid={thread.ident}') + logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident) return (thread, should_terminate) return inner_wrapper if _funct is None: - return wrapper + return wrapper # type: ignore else: return wrapper(_funct) @@ -130,13 +163,17 @@ def periodically_invoke( should_terminate = threading.Event() should_terminate.clear() newargs = (should_terminate, *args) - thread = threading.Thread( - target=helper_thread, args=newargs, kwargs=kwargs - ) + thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs) thread.start() - logger.debug(f'Started thread {thread.name} tid={thread.ident}') + logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident) return (thread, should_terminate) return wrapper_repeat return decorator_repeat + + +if __name__ == '__main__': + import doctest + + doctest.testmod()