pyutils.parallelize package
This package contains code related to parallelization including some
utilities (thread_utils.py
) and a frameworks for simple
parallelization (everything else).
Submodules
pyutils.parallelize.deferred_operand module
This is the base class of
pyutils.parallelize.smart_future.SmartFuture
, which is a
piece of the simple parallelization framework.
This base class is essentially tries to have every Python __dunder__
method defined with a reasonabe default implementation so that, when
it is used in a manner that requires the value to be known, it calls
DeferredOperand.resolve()
and either gets the requisite value or
blocks until the data necessary to resolve the value is ready. This
is meant to enable more transparent Future
objects that can
be just used directly.
See pyutils.parallelize.smart_future.SmartFuture
for more
information.
- class pyutils.parallelize.deferred_operand.DeferredOperand(local_attributes: Set[str] = None)[source]
Bases:
ABC
,Generic
[T
]A wrapper around an operand whose value is deferred until it is needed (i.e. accessed). See the subclass
pyutils.parallelize.smart_future.SmartFuture
for an example usage and/or a more useful patten.- Parameters:
local_attributes (Set[str]) – because this class attempts to act as a transparent wrapper around a normal Future, it needs to be able to differentiate between attempts to set a property of it/its subclasses or the wrapped object. The local_attributes argument is a set of names that, if we intercept a set operation for, refer to an attribute in the wrapper and not the wrapped class.
- static resolve(x: Any) Any [source]
When this object is used in a manner that requires it to know its value, this method is called to either return the value or block until it can do so.
- Parameters:
x (Any) – the object whose value is required
- Returns:
The value of x… immediately if possible, eventually if not possible.
- Return type:
Any
pyutils.parallelize.executors module
This module defines a BaseExecutor
interface and three
implementations:
The ThreadExecutor
is used to dispatch work to background
threads in the same Python process for parallelized work. Of course,
until the Global Interpreter Lock (GIL) bottleneck is resolved, this
is not terribly useful for compute-bound code. But it’s good for
work that is mostly I/O bound.
The ProcessExecutor
is used to dispatch work to other
processes on the same machine and is more useful for compute-bound
workloads.
The RemoteExecutor
is used in conjunection with ssh,
the cloudpickle dependency, and remote_worker.py file
to dispatch work to a set of remote worker machines on your
network. You can configure this pool via a JSON configuration file,
an example of which can be found in examples. If
this file is not present when you attempt to use multiple machine
parallelization you will get an error. See:
Finally, this file defines a DefaultExecutors
pool that
contains a pre-created and ready instance of each of the three
executors discussed. It has the added benefit of being automatically
cleaned up at process termination time.
See instructions in pyutils.parallelize.parallelize
for
setting up and using the framework.
- class pyutils.parallelize.executors.BaseExecutor(*, title='')[source]
Bases:
ABC
The base executor interface definition. The interface for
ProcessExecutor
,RemoteExecutor
, andThreadExecutor
.- Parameters:
title – the name of this executor.
- adjust_task_count(delta: int) None [source]
Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine.
- Parameters:
delta (int) – the delta value by which to adjust task count.
- Return type:
None
- get_task_count() int [source]
Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine.
- Returns:
The executor’s current task count.
- Return type:
int
- abstract shutdown(*, wait: bool = True, quiet: bool = False) None [source]
Shutdown the executor.
- Parameters:
wait (bool) – wait for the shutdown to complete before returning?
quiet (bool) – keep it quiet, please.
- Return type:
None
- shutdown_if_idle(*, quiet: bool = False) bool [source]
Shutdown the executor and return True if the executor is idle (i.e. there are no pending or active tasks). Return False otherwise. Note: this should only be called by the launcher process.
- Parameters:
quiet (bool) – keep it quiet, please.
- Returns:
True if the executor could be shut down because it has no pending work, False otherwise.
- Return type:
bool
- abstract submit(function: Callable, *args, **kwargs) Future [source]
Submit work for the executor to do.
- Parameters:
function (Callable) – the Callable to be executed.
*args – the arguments to function
**kwargs – the arguments to function
- Returns:
A concurrent
Future
representing the result of the work.- Return type:
Future
- class pyutils.parallelize.executors.BundleDetails(pickled_code: bytes, uuid: str, function_name: str, worker: RemoteWorkerRecord | None, username: str | None, machine: str | None, controller: str, code_file: str, result_file: str, pid: int, start_ts: float, end_ts: float, slower_than_local_p95: bool, slower_than_global_p95: bool, src_bundle: BundleDetails | None, is_cancelled: Event, was_cancelled: bool, backup_bundles: List[BundleDetails] | None, failure_count: int)[source]
Bases:
object
All info necessary to define some unit of work that needs to be done, where it is being run, its state, whether it is an original bundle of a backup bundle, how many times it has failed, etc…
- Parameters:
pickled_code (bytes) –
uuid (str) –
function_name (str) –
worker (RemoteWorkerRecord | None) –
username (str | None) –
machine (str | None) –
controller (str) –
code_file (str) –
result_file (str) –
pid (int) –
start_ts (float) –
end_ts (float) –
slower_than_local_p95 (bool) –
slower_than_global_p95 (bool) –
src_bundle (BundleDetails | None) –
is_cancelled (Event) –
was_cancelled (bool) –
backup_bundles (List[BundleDetails] | None) –
failure_count (int) –
- backup_bundles: List[BundleDetails] | None
If we’ve created backups of this bundle, this is the list of them
- code_file: str
A unique filename to hold the work to be done
- controller: str
The controller machine
- end_ts: float
Ending time
- failure_count: int
How many times has this bundle failed already?
- function_name: str
The name of the function we pickled
- is_cancelled: Event
An event that can be signaled to indicate this bundle is cancelled. This is set when another copy (backup or original) of this work has completed successfully elsewhere.
- machine: str | None
The remote machine running this bundle or None if none (yet)
- pickled_code: bytes
The code to run, cloud pickled
- pid: int
The process id of the local subprocess watching the ssh connection to the remote machine
- result_file: str
Where the results should be placed / read from
- slower_than_global_p95: bool
Currently slower than 95% of other bundles globally
- slower_than_local_p95: bool
Currently slower then 95% of other bundles on remote host
- src_bundle: BundleDetails | None
If this is a backup bundle, this points to the original bundle that it’s backing up. None otherwise.
- start_ts: float
Starting time
- username: str | None
The remote username running this bundle or None if none (yet)
- uuid: str
A unique identifier
- was_cancelled: bool
True if this bundle was cancelled, False if it finished normally
- worker: RemoteWorkerRecord | None
The remote worker running this bundle or None if none (yet)
- pyutils.parallelize.executors.ConfigRemoteWorkerPoolProvider(json_remote_worker_pool: Dict[str, Any])[source]
- Parameters:
json_remote_worker_pool (Dict[str, Any]) –
- class pyutils.parallelize.executors.ProcessExecutor(max_workers=None)[source]
Bases:
BaseExecutor
An executor which runs tasks in child processes.
See also
ThreadExecutor
andRemoteExecutor
.- Parameters:
max_workers – the max number of worker processes to create.
- class pyutils.parallelize.executors.RemoteExecutor(workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy)[source]
Bases:
BaseExecutor
An executor that uses processes on remote machines to do work. To do so, it requires that a pool of remote workers to be properly configured. See instructions in
pyutils.parallelize.parallelize
.Each machine in a worker pool has a weight and a count. A weight captures the relative speed of a processor on that worker and a count captures the number of synchronous tasks the worker can accept (i.e. the number of cpus on the machine).
To dispatch work to a remote machine, this class pickles the code to be executed remotely using cloudpickle. For that to work, the remote machine should be running the same version of Python as this machine, ideally in a virtual environment with the same import libraries installed. Differences in operating system and/or processor architecture don’t seem to matter for most code, though.
Warning
Mismatches in Python version or in the version numbers of third-party libraries between machines can cause problems when trying to unpickle and run code remotely.
Work to be dispatched is represented in this code by creating a “bundle”. Each bundle is assigned to a remote worker based on heuristics captured in a
RemoteWorkerSelectionPolicy
. In general, it attempts to load all workers in the pool and maximize throughput. Once assigned to a remote worker, pickled code is copied to that worker via scp and a remote command is issued via ssh to execute aremote_worker.py
process on the remote machine. This process unpickles the code, runs it, and produces a result which is then copied back to the local machine (again via scp) where it can be processed by local code.You can and probably must override the path of
remote_worker.py
on your pool machines using the –remote_worker_helper_path commandline argument (or by just changing the default in code, see above in this file’s code).During remote work execution, this local machine acts as a controller dispatching all work to the network, copying pickled tasks out, and copying results back in. It may also be a worker in the pool but do not underestimate the cost of being a controller – it takes some cpu and a lot of network bandwidth. The work dispatcher logic attempts to detect when a controller is also a worker and reduce its load.
Some redundancy and safety provisions are made when scheduling tasks to the worker pool; e.g. slower than expected tasks have redundant backups tasks created, especially if there are otherwise idle workers. If a task fails repeatedly, the dispatcher consider it poisoned and give up on it.
Warning
This executor probably only makes sense to use with computationally expensive tasks such as jobs that will execute for ~30 seconds or longer.
The network overhead and latency of copying work from the controller (local) machine to the remote workers and copying results back again is relatively high. Especially at startup, the network can become a bottleneck. Future versions of this code may attempt to split the responsibility of being a controller (distributing work to pool machines).
Instructions for how to set this up are provided in
pyutils.parallelize.parallelize
.See also
ProcessExecutor
andThreadExecutor
.- Parameters:
workers (List[RemoteWorkerRecord]) – A list of remote workers we can call on to do tasks.
policy (RemoteWorkerSelectionPolicy) – A policy for selecting remote workers for tasks.
- Raises:
PyUtilsException – unable to find a place to schedule work.
- class pyutils.parallelize.executors.RemoteExecutorStatus(total_worker_count: int)[source]
Bases:
object
A status ‘scoreboard’ for a remote executor tracking various metrics and able to render a periodic dump of global state.
- Parameters:
total_worker_count (int) – number of workers in the pool
- periodic_dump(total_bundles_submitted: int) None [source]
- Parameters:
total_bundles_submitted (int) –
- Return type:
None
- record_acquire_worker(worker: RemoteWorkerRecord, uuid: str) None [source]
Record that bundle with uuid is assigned to a particular worker.
- Parameters:
worker (RemoteWorkerRecord) – the record of the worker to which uuid is assigned
uuid (str) – the uuid of a bundle that has been assigned to a worker
- Return type:
None
- record_acquire_worker_already_locked(worker: RemoteWorkerRecord, uuid: str) None [source]
Same as above but an entry point that doesn’t acquire the lock for codepaths where it’s already held.
- Parameters:
worker (RemoteWorkerRecord) –
uuid (str) –
- Return type:
None
- record_bundle_details(details: BundleDetails) None [source]
Register the details about a bundle of work.
- Parameters:
details (BundleDetails) –
- Return type:
None
- record_bundle_details_already_locked(details: BundleDetails) None [source]
Same as above but for codepaths that already hold the lock.
- Parameters:
details (BundleDetails) –
- Return type:
None
- record_processing_began(uuid: str)[source]
Record when work on a bundle begins.
- Parameters:
uuid (str) –
- record_release_worker(worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool) None [source]
Record that a bundle has released a worker.
- Parameters:
worker (RemoteWorkerRecord) –
uuid (str) –
was_cancelled (bool) –
- Return type:
None
- record_release_worker_already_locked(worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool) None [source]
Same as above but for codepaths that already hold the lock.
- Parameters:
worker (RemoteWorkerRecord) –
uuid (str) –
was_cancelled (bool) –
- Return type:
None
- class pyutils.parallelize.executors.RemoteWorkerPoolProvider[source]
Bases:
object
- abstract get_remote_workers() List[RemoteWorkerRecord] [source]
- Return type:
List[RemoteWorkerRecord]
- class pyutils.parallelize.executors.RemoteWorkerRecord(username: str, machine: str, weight: int, count: int)[source]
Bases:
object
A record of info about a remote worker.
- Parameters:
username (str) –
machine (str) –
weight (int) –
count (int) –
- count: int
If this machine is selected, what is the maximum number of task that it can handle?
- machine: str
Machine address / name.
- username: str
Username we can ssh into on this machine to run work.
- weight: int
Relative probability for the weighted policy to select this machine for scheduling work.
- class pyutils.parallelize.executors.RemoteWorkerSelectionPolicy[source]
Bases:
ABC
An interface definition of a policy for selecting a remote worker.
- abstract acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None [source]
- Parameters:
machine_to_avoid (str | None) –
- Return type:
RemoteWorkerRecord | None
- register_worker_pool(workers: List[RemoteWorkerRecord])[source]
- Parameters:
workers (List[RemoteWorkerRecord]) –
- class pyutils.parallelize.executors.RoundRobinRemoteWorkerSelectionPolicy[source]
Bases:
RemoteWorkerSelectionPolicy
A remote worker selector that just round robins.
- acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None [source]
- Parameters:
machine_to_avoid (str | None) –
- Return type:
RemoteWorkerRecord | None
- class pyutils.parallelize.executors.ThreadExecutor(max_workers: int | None = None)[source]
Bases:
BaseExecutor
A threadpool executor. This executor uses Python threads to schedule tasks. Note that, at least as of python3.10, because of the global lock in the interpreter itself, these do not parallelize very well so this class is useful mostly for non-CPU intensive tasks.
See also
ProcessExecutor
andRemoteExecutor
.- Parameters:
max_workers (Optional[int]) – maximum number of threads to create in the pool.
- class pyutils.parallelize.executors.WeightedRandomRemoteWorkerSelectionPolicy[source]
Bases:
RemoteWorkerSelectionPolicy
A remote worker selector that uses weighted RNG.
- acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None [source]
- Parameters:
machine_to_avoid (str | None) –
- Return type:
RemoteWorkerRecord | None
pyutils.parallelize.parallelize module
A decorator to help with simple parallelization. When decorated functions are invoked they execute on a background thread, process or remote machine depending on the style of decoration:
from pyutils.parallelize import parallelize as p
@p.parallelize # defaults to thread-mode
def my_function(a, b, c) -> int:
...do some slow / expensive work, e.g., an http request
# Run with background subprocess
@p.parallelize(method=Method.PROCESS)
def my_other_function(d, e, f) -> str:
...do more really expensive work, e.g., a network read
# Run in a helper process on another machine.
@p.parallelize(method=Method.REMOTE)
def my_other_other_function(g, h) -> int:
...this work will be distributed to a remote machine pool
This will just work out of the box with Method.THREAD (the default) and Method.PROCESS but in order to use Method.REMOTE you need to do some setup work:
To use @parallelize(method=Method.REMOTE) with your code you need to hook your code into
pyutils.config
to enable commandline flags from pyutil files. You can do this by either wrapping your main entry point with thepyutils.bootstrap.initialize()
decorator or just calling config.parse() early in your program. See instructions inpyutils.bootstrap
andpyutils.config
for more information.You need to create and configure a pool of worker machines. All of these machines should run the same version of Python, ideally in a virtual environment (venv) with the same Python dependencies installed. See: https://docs.python.org/3/library/venv.html
Warning
Different versions of code, libraries, or of the interpreter itself can cause issues with running cloudpicked code.
You need an account that can ssh into any / all of these pool machines non-interactively to perform tasks such as copying code to the worker machine and running Python in the aforementioned virtual environment. This likely means setting up ssh / scp with key-based authentication. See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
You need to tell this parallelization framework about the pool of machines you created by editing a JSON-based configuration file. The location of this file defaults to
.remote_worker_records
in your home directory but can be overridden via the –remote_worker_records_file commandline argument. An example JSON configuration can be found under examples.Finally, you will also need tell the
pyutils.parallelize.executors.RemoteExecutor
how to invoke theremote_worker.py
on remote machines by passing its path on remote worker machines in your setup via the –remote_worker_helper_path commandline flag (or, honestly, if you made it this far, just update the default in this code – find executors.py under site-packages in your virtual environment and update the default value of the –remote_worker_helper_path flag)If you’re trying to set this up and struggling, email me at scott.gasch@gmail.com. I’m happy to help.
What you get back when you call a decorated function (using threads, processes or a remote worker) is a
pyutils.parallelize.smart_future.SmartFuture
. This class attempts to transparently wrap a normal PythonFuture
(see https://docs.python.org/3/library/concurrent.futures.html#future-objects). If your code just uses the result of a parallelized method it will block waiting on the result of the wrapped function as soon as it uses that result in a manner that requires its value to be known (e.g. using it in an expression, calling a method on it, passing it into a method, hashing it / using it as a dict key, etc…) But you can do operations that do not require the value to be known (e.g. storing it in a list, storing it as a value in a dict, etc…) safely without blocking.There are two helper methods in
pyutils.parallelize.smart_future
to help deal with theseSmartFuture
objects calledpyutils.parallelize.smart_future.wait_all()
andpyutils.parallelize.smart_future.wait_any()
. These, when given a collection ofSmartFuture
objects, will block until one (any) or all (all) are finished and yield the finished objects to the caller. Callers can be confident that any objects returned from these methods will not block when accessed. See documentation inpyutils.parallelize.smart_future
for more details.
- class pyutils.parallelize.parallelize.Method(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
How should we parallelize; by threads, processes or remote workers?
- PROCESS = 2
- REMOTE = 3
- THREAD = 1
- pyutils.parallelize.parallelize.parallelize(_funct: Callable | None = None, *, method: Method = Method.THREAD) Callable [source]
This is a decorator that was created to make multi-threading, multi-processing and remote machine parallelism simple in Python.
Sample usage:
from pyutils.parallelize import parallelize as p @p.parallelize # defaults to thread-mode def my_function(a, b, c) -> int: ...do some slow / expensive work, e.g., an http request # Run with background subprocess @p.parallelize(method=Method.PROCESS) def my_other_function(d, e, f) -> str: ...do more really expensive work, e.g., a network read # Run in a helper process on another machine. @p.parallelize(method=Method.REMOTE) def my_other_other_function(g, h) -> int: ...this work will be distributed to a remote machine pool
This decorator will invoke the wrapped function on:
Method.THREAD (default): a background thread
Method.PROCESS: a background process
Method.REMOTE: a process on a remote host
The wrapped function returns immediately with a value that is wrapped in a
pyutils.parallelize.smart_future.SmartFuture
. This value will block if it is either read directly (via a call to_resolve()
) or indirectly (by using the result in an expression, printing it, hashing it, passing it a function argument, etc…). See comments onSmartFuture
for details. The value can be safely stored (without hashing) or passed as an argument without causing it to block waiting on a result. There are some convenience methods for dealing with collections ofSmartFuture
objects defined insmart_future.py
, namelypyutils.parallelize.smart_future.wait_any()
andpyutils.parallelize.smart_future.wait_all()
.Warning
You may stack @parallelized methods and it will “work”. That said, having multiple layers of
Method.PROCESS
orMethod.REMOTE
will prove to be problematic because each process in the stack will use its own independent pool which will likely overload your machine with processes or your network with remote processes beyond the control mechanisms built into one instance of the pool. Be careful.Warning
There is non-trivial overhead of pickling code and copying it over the network when you use
Method.REMOTE
. There’s a smaller but still considerable cost of creating a new process and passing code to/from it when you useMethod.PROCESS
.- Parameters:
_funct (Callable | None) –
method (Method) –
- Return type:
Callable
pyutils.parallelize.selectable_event module
An object that adheres to the threading.Event interface (https://docs.python.org/3/library/threading.html#event-objects) that, unlike threading.Event, can be used with select() allowing for efficient waiting on many events. Idea stolen from https://lat.sk/2015/02/multiple-event-waiting-python-3/.
>>> events = []
>>> for n in range(10):
... events.append(SelectableEvent())
>>> t = wait_for_multiple(events, timeout=0.5)
>>> t is None
True
>>> import random
>>> e = random.choice(events)
>>> e.set()
>>> t = wait_for_multiple(events, timeout=0.5)
>>> t == e
True
- class pyutils.parallelize.selectable_event.SelectableEvent[source]
Bases:
object
Create and store two file descriptors connected by a pipe. We’ll use data written to the pipe (and available on the read file descriptor) to indicate that the event is signaled. The event is created is a “not signaled” state initially.
>>> e = SelectableEvent() >>> e.is_set() False
- clear()[source]
Make the event not set. Note: like threading.Event, this method must be called to reset the event state; simply calling
wait()
doesn’t change the state of the event.The lock is to ensure that we do not have a race condition between the is_set check and the subsequent read.
- fileno()[source]
- Returns:
The file descriptor number of the read side of the pipe, allows this event object to be used with select.select().
- is_set()[source]
- Returns:
True if the event is signaled and False otherwise.
>>> e = SelectableEvent() >>> e.is_set() False >>> e.set() >>> e.is_set() True
- set()[source]
Signal the event. Wake any waiters. The lock is to ensure that we do not accidentally write more than one byte to the pipe because of a race condition between the call to is_set() and the call to write.
>>> e = SelectableEvent() >>> e.is_set() False >>> e.set() >>> e.is_set() True
>>> e.set() >>> e.set() >>> e.set() >>> rfds, _, _ = select.select([e._read_fd], [], [], 0) >>> e._read_fd in rfds True
>>> e.wait(0) True >>> e.clear() >>> rfds, _, _ = select.select([e._read_fd], [], [], 0) >>> e._read_fd in rfds False
- wait(timeout: float | None = None) bool [source]
Use select to check if a byte is ready to read from the read side of the pipe thus indicating the event is signaled.
- Parameters:
timeout (float | None) – number of seconds to wait, at max; defaults to no time limit (wait forever).
- Returns:
True if the event is signaled on exit and False otherwise.
- Return type:
bool
>>> e = SelectableEvent() >>> e.wait(1.0) False
>>> e.set() >>> e.wait() True
- pyutils.parallelize.selectable_event.wait_for_multiple(events: Iterable[SelectableEvent], *, timeout: float | None = None) SelectableEvent | None [source]
A helper function that, given a list of SelectableEvent, will wait until one becomes triggered with an optional timeout.
- Parameters:
events (Iterable[SelectableEvent]) – the list of events to wait for
timeout (float | None) – an optional max number of seconds to wait; the default is to wait forever.
- Returns:
A reference to the event that has become signaled.
- Return type:
SelectableEvent | None
pyutils.parallelize.smart_future module
A Future
that can be treated as a substutute for the result
that it contains and will not block until it is used. At that point,
if the underlying value is not yet available yet, it will block until
the internal result actually becomes available.
Results from parallelize.parallelize
are returned wrapped
in SmartFuture
instances.
Also contains some utilility code for waiting for one/many futures.
- class pyutils.parallelize.smart_future.SmartFuture(wrapped_future: Future)[source]
Bases:
DeferredOperand
This is a SmartFuture, a class that wraps a normal
Future
and can then be used, mostly, like a normal (non-Future) identifier of the type of that SmartFuture’s result.Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known.
- Parameters:
wrapped_future (fut.Future) – a normal Python
concurrent.Future
object that we are wrapping.
- pyutils.parallelize.smart_future.wait_all(futures: List[SmartFuture], *, log_exceptions: bool = True) None [source]
Wait for all of the SmartFutures in the collection to finish before returning.
- Parameters:
futures (List[SmartFuture]) – A collection of futures that we’re waiting for
log_exceptions (bool) – Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore them?
- Returns:
Only when all futures in the input list are ready. Blocks until such time.
- Return type:
None
- pyutils.parallelize.smart_future.wait_any(futures: List[SmartFuture], *, callback: Callable = None, log_exceptions: bool = True, timeout: float = None)[source]
Await the completion of any of a collection of SmartFutures and invoke callback each time one completes, repeatedly, until they are all finished.
- Parameters:
futures (List[SmartFuture]) – A collection of SmartFutures to wait on
callback (Callable) – An optional callback to invoke whenever one of the futures completes
log_exceptions (bool) – Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore them?
timeout (float) – invoke callback with a periodicity of timeout while awaiting futures
- Returns:
A
SmartFuture
from the futures list with a result available without blocking.
pyutils.parallelize.thread_utils module
Utilities for dealing with threads + threading.
- class pyutils.parallelize.thread_utils.ThreadWithReturnValue(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)[source]
Bases:
Thread
,Runnable
A thread whose return value is plumbed back out as the return value of
join()
. Use like a normal thread:import threading from pyutils.parallelize import thread_utils def thread_entry_point(args): # do something interesting... return result if __name__ == "__main__": thread = thread_utils.ThreadWithReturnValue( target=thread_entry_point, args=(whatever) ) thread.start() result = thread.join() print(f"thread finished and returned {result}")
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.
args is a list or tuple of arguments for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.
- join(*args) Any [source]
Wait until the thread terminates and return the value it terminated with as the result of join.
Like normal
join()
, this blocks the calling thread until the thread whosejoin()
is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).
When the timeout argument is not present or None, the operation will block until the thread terminates.
A thread can be joined many times.
- Raises:
RuntimeError – an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join a thread before it has been started and attempts to do so raises the same exception.
- Return type:
Any
- pyutils.parallelize.thread_utils.background_thread(_funct: Callable[[...], Any] | None) Callable[[...], Tuple[Thread, Event]] [source]
A function decorator to create a background thread.
- Parameters:
_funct (Callable[[...], Any] | None) – The function being wrapped such that it is invoked on a background thread.
- Return type:
Callable[[…], Tuple[Thread, Event]]
Example usage:
import threading import time from pyutils.parallelize import thread_utils @thread_utils.background_thread def random(a: int, b: str, stop_event: threading.Event) -> None: while True: print(f"Hi there {b}: {a}!") time.sleep(10.0) if stop_event.is_set(): return def main() -> None: (thread, event) = random(22, "dude") print("back!") time.sleep(30.0) event.set() thread.join()
Warning
In addition to any other arguments the function has, it must take a stop_event as the last unnamed argument which it should periodically check. If the event is set, it means the thread has been requested to terminate ASAP.
- pyutils.parallelize.thread_utils.current_thread_id() str [source]
- Returns:
A string composed of the parent process’ id, the current process’ id and the current thread name that can be used as a unique identifier for the current thread. The former two are numbers (pids) whereas the latter is a thread id passed during thread creation time.
- Return type:
str
>>> from pyutils.parallelize import thread_utils >>> ret = thread_utils.current_thread_id() >>> ret '76891/84444/MainThread:' >>> (ppid, pid, tid) = ret.split('/') >>> ppid.isnumeric() True >>> pid.isnumeric() True
- pyutils.parallelize.thread_utils.is_current_thread_main_thread() bool [source]
- Returns:
True is the current (calling) thread is the process’ main thread and False otherwise.
- Return type:
bool
>>> from pyutils.parallelize import thread_utils >>> thread_utils.is_current_thread_main_thread() True
>>> result = None >>> def am_i_the_main_thread(): ... global result ... result = thread_utils.is_current_thread_main_thread()
>>> am_i_the_main_thread() >>> result True
>>> import threading >>> thread = threading.Thread(target=am_i_the_main_thread) >>> thread.start() >>> thread.join() >>> result False
- pyutils.parallelize.thread_utils.periodically_invoke(period_sec: float, stop_after: int | None)[source]
Periodically invoke the decorated function on a background thread.
- Parameters:
period_sec (float) – the delay period in seconds between invocations
stop_after (int | None) – total number of invocations to make or, if None, call forever
- Returns:
a
Thread
object and anEvent
that, when signaled, will stop the invocations.
Note
It is possible to be invoked one time after the
Event
is set. This event can be used to stop infinite invocation style or finite invocation style decorations.Usage:
from pyutils.parallelize import thread_utils @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3) def hello(name: str) -> None: print(f"Hello, {name}") @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None) def there(name: str, age: int) -> None: print(f" ...there {name}, {age}")
Usage as a decorator doesn’t give you access to the returned stop event or thread object. To get those, wrap your periodic function manually:
from pyutils.parallelize import thread_utils def periodic(m: str) -> None: print(m) f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic) thread, event = f("testing") ... event.set() thread.join()
See also
pyutils.state_tracker
.