#!/usr/bin/env python3 import functools import logging import os import threading from typing import Callable, Optional, Tuple # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. logger = logging.getLogger(__name__) def current_thread_id() -> str: ppid = os.getppid() pid = os.getpid() tid = threading.current_thread().name return f'{ppid}/{pid}/{tid}:' def is_current_thread_main_thread() -> bool: """Returns True is the current (calling) thread is the process' main thread and False otherwise. """ return threading.current_thread() is threading.main_thread() def background_thread( _funct: Optional[Callable], ) -> Tuple[threading.Thread, threading.Event]: """A function decorator to create a background thread. *** Please note: the decorated function must take an shutdown *** *** event as an input parameter and should periodically check *** *** it and stop if the event is set. *** Usage: @background_thread def random(a: int, b: str, stop_event: threading.Event) -> None: while True: print(f"Hi there {b}: {a}!") time.sleep(10.0) if stop_event.is_set(): return def main() -> None: (thread, event) = random(22, "dude") print("back!") time.sleep(30.0) event.set() thread.join() Note: in addition to any other arguments the function has, it must take a stop_event as the last unnamed argument which it should periodically check. If the event is set, it means the thread has been requested to terminate ASAP. """ def wrapper(funct: Callable): @functools.wraps(funct) def inner_wrapper( *a, **kwa ) -> Tuple[threading.Thread, threading.Event]: should_terminate = threading.Event() should_terminate.clear() newargs = (*a, should_terminate) thread = threading.Thread( target=funct, args=newargs, kwargs=kwa, ) thread.start() logger.debug(f'Started thread {thread.name} tid={thread.ident}') return (thread, should_terminate) return inner_wrapper if _funct is None: return wrapper else: return wrapper(_funct) def periodically_invoke( period_sec: float, stop_after: Optional[int], ): """ Periodically invoke a decorated function. Stop after N invocations (or, if stop_after is None, call forever). Delay period_sec between invocations. Returns a Thread object and an Event that, when signaled, will stop the invocations. Note that it is possible to be invoked one time after the Event is set. This event can be used to stop infinite invocation style or finite invocation style decorations. @periodically_invoke(period_sec=0.5, stop_after=None) def there(name: str, age: int) -> None: print(f" ...there {name}, {age}") @periodically_invoke(period_sec=1.0, stop_after=3) def hello(name: str) -> None: print(f"Hello, {name}") """ def decorator_repeat(func): def helper_thread(should_terminate, *args, **kwargs) -> None: if stop_after is None: while True: func(*args, **kwargs) res = should_terminate.wait(period_sec) if res: return else: for _ in range(stop_after): func(*args, **kwargs) res = should_terminate.wait(period_sec) if res: return return @functools.wraps(func) def wrapper_repeat(*args, **kwargs): should_terminate = threading.Event() should_terminate.clear() newargs = (should_terminate, *args) thread = threading.Thread( target=helper_thread, args=newargs, kwargs=kwargs ) thread.start() logger.debug(f'Started thread {thread.name} tid={thread.ident}') return (thread, should_terminate) return wrapper_repeat return decorator_repeat