X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=src%2Fpyutils%2Fparallelize%2Fthread_utils.py;h=062f0646bf174ecdbbc47ae994a013f85cdbea31;hb=a89fc804288579e5c285a28db8756c40570a037e;hp=aaef13bf6ea8d53a788699f37c45f02bef47ccbf;hpb=69566c003b4f1c3a4905f37d3735d7921502d14a;p=pyutils.git diff --git a/src/pyutils/parallelize/thread_utils.py b/src/pyutils/parallelize/thread_utils.py index aaef13b..062f064 100644 --- a/src/pyutils/parallelize/thread_utils.py +++ b/src/pyutils/parallelize/thread_utils.py @@ -19,18 +19,21 @@ logger = logging.getLogger(__name__) def current_thread_id() -> str: """ Returns: - a string composed of the parent process' id, the current - process' id and the current thread identifier. The former two are - numbers (pids) whereas the latter is a thread id passed during thread - creation time. - - >>> ret = current_thread_id() + A string composed of the parent process' id, the + current process' id and the current thread name that can be used + as a unique identifier for the current thread. The former two are + numbers (pids) whereas the latter is a thread id passed during + thread creation time. + + >>> from pyutils.parallelize import thread_utils + >>> ret = thread_utils.current_thread_id() + >>> ret # doctest: +SKIP + '76891/84444/MainThread:' >>> (ppid, pid, tid) = ret.split('/') >>> ppid.isnumeric() True >>> pid.isnumeric() True - """ ppid = os.getppid() pid = os.getpid() @@ -44,25 +47,25 @@ def is_current_thread_main_thread() -> bool: True is the current (calling) thread is the process' main thread and False otherwise. - >>> is_current_thread_main_thread() + >>> from pyutils.parallelize import thread_utils + >>> thread_utils.is_current_thread_main_thread() True >>> result = None - >>> def thunk(): + >>> def am_i_the_main_thread(): ... global result - ... result = is_current_thread_main_thread() + ... result = thread_utils.is_current_thread_main_thread() - >>> thunk() + >>> am_i_the_main_thread() >>> result True >>> import threading - >>> thread = threading.Thread(target=thunk) + >>> thread = threading.Thread(target=am_i_the_main_thread) >>> thread.start() >>> thread.join() >>> result False - """ return threading.current_thread() is threading.main_thread() @@ -72,9 +75,18 @@ def background_thread( ) -> Callable[..., Tuple[threading.Thread, threading.Event]]: """A function decorator to create a background thread. - Usage:: + Args: + _funct: The function being wrapped such that it is invoked + on a background thread. + + Example usage:: + + import threading + import time + + from pyutils.parallelize import thread_utils - @background_thread + @thread_utils.background_thread def random(a: int, b: str, stop_event: threading.Event) -> None: while True: print(f"Hi there {b}: {a}!") @@ -122,7 +134,25 @@ def background_thread( class ThreadWithReturnValue(threading.Thread): """A thread whose return value is plumbed back out as the return - value of :meth:`join`. + value of :meth:`join`. Use like a normal thread:: + + import threading + + from pyutils.parallelize import thread_utils + + def thread_entry_point(args): + # do something interesting... + return result + + if __name__ == "__main__": + thread = thread_utils.ThreadWithReturnValue( + target=thread_entry_point, + args=(whatever) + ) + thread.start() + result = thread.join() + print(f"thread finished and returned {result}") + """ def __init__( @@ -134,11 +164,35 @@ class ThreadWithReturnValue(threading.Thread): self._target = target self._return = None - def run(self): + def run(self) -> None: + """Create a little wrapper around invoking the real thread entry + point so we can pay attention to its return value.""" if self._target is not None: self._return = self._target(*self._args, **self._kwargs) - def join(self, *args): + def join(self, *args) -> Any: + """Wait until the thread terminates and return the value it terminated with + as the result of join. + + Like normal :meth:`join`, this blocks the calling thread until + the thread whose :meth:`join` is called terminates – either + normally or through an unhandled exception or until the + optional timeout occurs. + + When the timeout argument is present and not None, it should + be a floating point number specifying a timeout for the + operation in seconds (or fractions thereof). + + When the timeout argument is not present or None, the + operation will block until the thread terminates. + + A thread can be joined many times. + + :meth:`join` raises a RuntimeError if an attempt is made to join the + current thread as that would cause a deadlock. It is also an + error to join a thread before it has been started and + attempts to do so raises the same exception. + """ threading.Thread.join(self, *args) return self._return @@ -148,7 +202,7 @@ def periodically_invoke( stop_after: Optional[int], ): """ - Periodically invoke the decorated function. + Periodically invoke the decorated function on a background thread. Args: period_sec: the delay period in seconds between invocations @@ -166,13 +220,31 @@ def periodically_invoke( Usage:: - @periodically_invoke(period_sec=0.5, stop_after=None) - def there(name: str, age: int) -> None: - print(f" ...there {name}, {age}") + from pyutils.parallelize import thread_utils - @periodically_invoke(period_sec=1.0, stop_after=3) + @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3) def hello(name: str) -> None: print(f"Hello, {name}") + + @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None) + def there(name: str, age: int) -> None: + print(f" ...there {name}, {age}") + + Usage as a decorator doesn't give you access to the returned stop event or + thread object. To get those, wrap your periodic function manually:: + + from pyutils.parallelize import thread_utils + + def periodic(m: str) -> None: + print(m) + + f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic) + thread, event = f("testing") + ... + event.set() + thread.join() + + See also :mod:`pyutils.state_tracker`. """ def decorator_repeat(func):