def current_thread_id() -> str:
"""
Returns:
- a string composed of the parent process' id, the current
- process' id and the current thread identifier. The former two are
- numbers (pids) whereas the latter is a thread id passed during thread
- creation time.
-
- >>> ret = current_thread_id()
+ A string composed of the parent process' id, the
+ current process' id and the current thread name that can be used
+ as a unique identifier for the current thread. The former two are
+ numbers (pids) whereas the latter is a thread id passed during
+ thread creation time.
+
+ >>> from pyutils.parallelize import thread_utils
+ >>> ret = thread_utils.current_thread_id()
+ >>> ret # doctest: +SKIP
+ '76891/84444/MainThread:'
>>> (ppid, pid, tid) = ret.split('/')
>>> ppid.isnumeric()
True
>>> pid.isnumeric()
True
-
"""
ppid = os.getppid()
pid = os.getpid()
True is the current (calling) thread is the process' main
thread and False otherwise.
- >>> is_current_thread_main_thread()
+ >>> from pyutils.parallelize import thread_utils
+ >>> thread_utils.is_current_thread_main_thread()
True
>>> result = None
- >>> def thunk():
+ >>> def am_i_the_main_thread():
... global result
- ... result = is_current_thread_main_thread()
+ ... result = thread_utils.is_current_thread_main_thread()
- >>> thunk()
+ >>> am_i_the_main_thread()
>>> result
True
>>> import threading
- >>> thread = threading.Thread(target=thunk)
+ >>> thread = threading.Thread(target=am_i_the_main_thread)
>>> thread.start()
>>> thread.join()
>>> result
False
-
"""
return threading.current_thread() is threading.main_thread()
Example usage::
- @background_thread
+ import threading
+ import time
+
+ from pyutils.parallelize import thread_utils
+
+ @thread_utils.background_thread
def random(a: int, b: str, stop_event: threading.Event) -> None:
while True:
print(f"Hi there {b}: {a}!")
class ThreadWithReturnValue(threading.Thread):
"""A thread whose return value is plumbed back out as the return
- value of :meth:`join`.
+ value of :meth:`join`. Use like a normal thread::
+
+ import threading
+
+ from pyutils.parallelize import thread_utils
+
+ def thread_entry_point(args):
+ # do something interesting...
+ return result
+
+ if __name__ == "__main__":
+ thread = thread_utils.ThreadWithReturnValue(
+ target=thread_entry_point,
+ args=(whatever)
+ )
+ thread.start()
+ result = thread.join()
+ print(f"thread finished and returned {result}")
+
"""
def __init__(
self._target = target
self._return = None
- def run(self):
+ def run(self) -> None:
+ """Create a little wrapper around invoking the real thread entry
+ point so we can pay attention to its return value."""
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
- def join(self, *args):
+ def join(self, *args) -> Any:
+ """Wait until the thread terminates and return the value it terminated with
+ as the result of join.
+
+ Like normal :meth:`join`, this blocks the calling thread until
+ the thread whose :meth:`join` is called terminates – either
+ normally or through an unhandled exception or until the
+ optional timeout occurs.
+
+ When the timeout argument is present and not None, it should
+ be a floating point number specifying a timeout for the
+ operation in seconds (or fractions thereof).
+
+ When the timeout argument is not present or None, the
+ operation will block until the thread terminates.
+
+ A thread can be joined many times.
+
+ :meth:`join` raises a RuntimeError if an attempt is made to join the
+ current thread as that would cause a deadlock. It is also an
+ error to join a thread before it has been started and
+ attempts to do so raises the same exception.
+ """
threading.Thread.join(self, *args)
return self._return
stop_after: Optional[int],
):
"""
- Periodically invoke the decorated function.
+ Periodically invoke the decorated function on a background thread.
Args:
period_sec: the delay period in seconds between invocations
Usage::
- @periodically_invoke(period_sec=0.5, stop_after=None)
- def there(name: str, age: int) -> None:
- print(f" ...there {name}, {age}")
+ from pyutils.parallelize import thread_utils
- @periodically_invoke(period_sec=1.0, stop_after=3)
+ @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3)
def hello(name: str) -> None:
print(f"Hello, {name}")
+
+ @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None)
+ def there(name: str, age: int) -> None:
+ print(f" ...there {name}, {age}")
+
+ Usage as a decorator doesn't give you access to the returned stop event or
+ thread object. To get those, wrap your periodic function manually::
+
+ from pyutils.parallelize import thread_utils
+
+ def periodic(m: str) -> None:
+ print(m)
+
+ f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic)
+ thread, event = f("testing")
+ ...
+ event.set()
+ thread.join()
+
+ See also :mod:`pyutils.state_tracker`.
"""
def decorator_repeat(func):