X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=thread_utils.py;h=c4a293794a99cb1f479da105a499cb2ce93b564e;hb=02302bbd9363facb59c4df2c1f4013087702cfa6;hp=0130cdc510547196d418d6699d1b46b84a6ddf7c;hpb=709370b2198e09f1dbe195fe8813602a3125b7f6;p=python_utils.git diff --git a/thread_utils.py b/thread_utils.py index 0130cdc..c4a2937 100644 --- a/thread_utils.py +++ b/thread_utils.py @@ -1,10 +1,14 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + +"""Utilities for dealing with threads + threading.""" + import functools import logging import os import threading -from typing import Callable, Optional, Tuple +from typing import Any, Callable, Optional, Tuple # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. @@ -13,6 +17,21 @@ logger = logging.getLogger(__name__) def current_thread_id() -> str: + """ + Returns: + a string composed of the parent process' id, the current + process' id and the current thread identifier. The former two are + numbers (pids) whereas the latter is a thread id passed during thread + creation time. + + >>> ret = current_thread_id() + >>> (ppid, pid, tid) = ret.split('/') + >>> ppid.isnumeric() + True + >>> pid.isnumeric() + True + + """ ppid = os.getppid() pid = os.getpid() tid = threading.current_thread().name @@ -20,22 +39,40 @@ def current_thread_id() -> str: def is_current_thread_main_thread() -> bool: - """Returns True is the current (calling) thread is the process' main - thread and False otherwise. + """ + Returns: + True is the current (calling) thread is the process' main + thread and False otherwise. + + >>> is_current_thread_main_thread() + True + + >>> result = None + >>> def thunk(): + ... global result + ... result = is_current_thread_main_thread() + + >>> thunk() + >>> result + True + + >>> import threading + >>> thread = threading.Thread(target=thunk) + >>> thread.start() + >>> thread.join() + >>> result + False + """ return threading.current_thread() is threading.main_thread() def background_thread( - _funct: Optional[Callable] -) -> Tuple[threading.Thread, threading.Event]: + _funct: Optional[Callable[..., Any]], +) -> Callable[..., Tuple[threading.Thread, threading.Event]]: """A function decorator to create a background thread. - *** Please note: the decorated function must take an shutdown *** - *** event as an input parameter and should periodically check *** - *** it and stop if the event is set. *** - - Usage: + Usage:: @background_thread def random(a: int, b: str, stop_event: threading.Event) -> None: @@ -45,7 +82,6 @@ def background_thread( if stop_event.is_set(): return - def main() -> None: (thread, event) = random(22, "dude") print("back!") @@ -53,16 +89,17 @@ def background_thread( event.set() thread.join() - Note: in addition to any other arguments the function has, it must - take a stop_event as the last unnamed argument which it should - periodically check. If the event is set, it means the thread has - been requested to terminate ASAP. + .. warning:: + + In addition to any other arguments the function has, it must + take a stop_event as the last unnamed argument which it should + periodically check. If the event is set, it means the thread has + been requested to terminate ASAP. """ + def wrapper(funct: Callable): @functools.wraps(funct) - def inner_wrapper( - *a, **kwa - ) -> Tuple[threading.Thread, threading.Event]: + def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]: should_terminate = threading.Event() should_terminate.clear() newargs = (*a, should_terminate) @@ -72,42 +109,49 @@ def background_thread( kwargs=kwa, ) thread.start() - logger.debug( - f'Started thread {thread.name} tid={thread.ident}' - ) + logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident) return (thread, should_terminate) + return inner_wrapper if _funct is None: - return wrapper + return wrapper # type: ignore else: return wrapper(_funct) def periodically_invoke( - period_sec: float, - stop_after: Optional[int], + period_sec: float, + stop_after: Optional[int], ): """ - Periodically invoke a decorated function. Stop after N invocations - (or, if stop_after is None, call forever). Delay period_sec between - invocations. + Periodically invoke the decorated function. + + Args: + period_sec: the delay period in seconds between invocations + stop_after: total number of invocations to make or, if None, + call forever + + Returns: + a :class:Thread object and an :class:Event that, when + signaled, will stop the invocations. + + .. note:: + It is possible to be invoked one time after the :class:Event + is set. This event can be used to stop infinite + invocation style or finite invocation style decorations. - Returns a Thread object and an Event that, when signaled, will stop - the invocations. Note that it is possible to be invoked one time - after the Event is set. This event can be used to stop infinite - invocation style or finite invocation style decorations. + Usage:: @periodically_invoke(period_sec=0.5, stop_after=None) def there(name: str, age: int) -> None: print(f" ...there {name}, {age}") - @periodically_invoke(period_sec=1.0, stop_after=3) def hello(name: str) -> None: print(f"Hello, {name}") - """ + def decorator_repeat(func): def helper_thread(should_terminate, *args, **kwargs) -> None: if stop_after is None: @@ -129,15 +173,17 @@ def periodically_invoke( should_terminate = threading.Event() should_terminate.clear() newargs = (should_terminate, *args) - thread = threading.Thread( - target=helper_thread, - args = newargs, - kwargs = kwargs - ) + thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs) thread.start() - logger.debug( - f'Started thread {thread.name} tid={thread.ident}' - ) + logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident) return (thread, should_terminate) + return wrapper_repeat + return decorator_repeat + + +if __name__ == '__main__': + import doctest + + doctest.testmod()