pyutils.parallelize package

This package contains code related to parallelization including some utilities (thread_utils.py) and a frameworks for simple parallelization (everything else).

Submodules

pyutils.parallelize.deferred_operand module

This is the base class of pyutils.parallelize.smart_future.SmartFuture, which is a piece of the simple parallelization framework.

This base class is essentially tries to have every Python __dunder__ method defined with a reasonabe default implementation so that, when it is used in a manner that requires the value to be known, it calls DeferredOperand.resolve() and either gets the requisite value or blocks until the data necessary to resolve the value is ready. This is meant to enable more transparent Future objects that can be just used directly.

See pyutils.parallelize.smart_future.SmartFuture for more information.

class pyutils.parallelize.deferred_operand.DeferredOperand(local_attributes: Set[str] = None)[source]

Bases: ABC, Generic[T]

A wrapper around an operand whose value is deferred until it is needed (i.e. accessed). See the subclass pyutils.parallelize.smart_future.SmartFuture for an example usage and/or a more useful patten.

Parameters:

local_attributes (Set[str]) – because this class attempts to act as a transparent wrapper around a normal Future, it needs to be able to differentiate between attempts to set a property of it/its subclasses or the wrapped object. The local_attributes argument is a set of names that, if we intercept a set operation for, refer to an attribute in the wrapper and not the wrapped class.

static resolve(x: Any) Any[source]

When this object is used in a manner that requires it to know its value, this method is called to either return the value or block until it can do so.

Parameters:

x (Any) – the object whose value is required

Returns:

The value of x… immediately if possible, eventually if not possible.

Return type:

Any

pyutils.parallelize.executors module

This module defines a BaseExecutor interface and three implementations:

The ThreadExecutor is used to dispatch work to background threads in the same Python process for parallelized work. Of course, until the Global Interpreter Lock (GIL) bottleneck is resolved, this is not terribly useful for compute-bound code. But it’s good for work that is mostly I/O bound.

The ProcessExecutor is used to dispatch work to other processes on the same machine and is more useful for compute-bound workloads.

The RemoteExecutor is used in conjunection with ssh, the cloudpickle dependency, and remote_worker.py file to dispatch work to a set of remote worker machines on your network. You can configure this pool via a JSON configuration file, an example of which can be found in examples. If this file is not present when you attempt to use multiple machine parallelization you will get an error. See:

Finally, this file defines a DefaultExecutors pool that contains a pre-created and ready instance of each of the three executors discussed. It has the added benefit of being automatically cleaned up at process termination time.

See instructions in pyutils.parallelize.parallelize for setting up and using the framework.

class pyutils.parallelize.executors.BaseExecutor(*, title='')[source]

Bases: ABC

The base executor interface definition. The interface for ProcessExecutor, RemoteExecutor, and ThreadExecutor.

Parameters:

title – the name of this executor.

adjust_task_count(delta: int) None[source]

Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine.

Parameters:

delta (int) – the delta value by which to adjust task count.

Return type:

None

get_task_count() int[source]

Change the task count. Note: do not call this method from a worker, it should only be called by the launcher process / thread / machine.

Returns:

The executor’s current task count.

Return type:

int

abstract shutdown(*, wait: bool = True, quiet: bool = False) None[source]

Shutdown the executor.

Parameters:
  • wait (bool) – wait for the shutdown to complete before returning?

  • quiet (bool) – keep it quiet, please.

Return type:

None

shutdown_if_idle(*, quiet: bool = False) bool[source]

Shutdown the executor and return True if the executor is idle (i.e. there are no pending or active tasks). Return False otherwise. Note: this should only be called by the launcher process.

Parameters:

quiet (bool) – keep it quiet, please.

Returns:

True if the executor could be shut down because it has no pending work, False otherwise.

Return type:

bool

abstract submit(function: Callable, *args, **kwargs) Future[source]

Submit work for the executor to do.

Parameters:
  • function (Callable) – the Callable to be executed.

  • *args – the arguments to function

  • **kwargs – the arguments to function

Returns:

A concurrent Future representing the result of the work.

Return type:

Future

class pyutils.parallelize.executors.BundleDetails(pickled_code: bytes, uuid: str, function_name: str, worker: RemoteWorkerRecord | None, username: str | None, machine: str | None, controller: str, code_file: str, result_file: str, pid: int, start_ts: float, end_ts: float, slower_than_local_p95: bool, slower_than_global_p95: bool, src_bundle: BundleDetails | None, is_cancelled: Event, was_cancelled: bool, backup_bundles: List[BundleDetails] | None, failure_count: int)[source]

Bases: object

All info necessary to define some unit of work that needs to be done, where it is being run, its state, whether it is an original bundle of a backup bundle, how many times it has failed, etc…

Parameters:
  • pickled_code (bytes) –

  • uuid (str) –

  • function_name (str) –

  • worker (RemoteWorkerRecord | None) –

  • username (str | None) –

  • machine (str | None) –

  • controller (str) –

  • code_file (str) –

  • result_file (str) –

  • pid (int) –

  • start_ts (float) –

  • end_ts (float) –

  • slower_than_local_p95 (bool) –

  • slower_than_global_p95 (bool) –

  • src_bundle (BundleDetails | None) –

  • is_cancelled (Event) –

  • was_cancelled (bool) –

  • backup_bundles (List[BundleDetails] | None) –

  • failure_count (int) –

backup_bundles: List[BundleDetails] | None

If we’ve created backups of this bundle, this is the list of them

code_file: str

A unique filename to hold the work to be done

controller: str

The controller machine

end_ts: float

Ending time

failure_count: int

How many times has this bundle failed already?

function_name: str

The name of the function we pickled

is_cancelled: Event

An event that can be signaled to indicate this bundle is cancelled. This is set when another copy (backup or original) of this work has completed successfully elsewhere.

machine: str | None

The remote machine running this bundle or None if none (yet)

pickled_code: bytes

The code to run, cloud pickled

pid: int

The process id of the local subprocess watching the ssh connection to the remote machine

result_file: str

Where the results should be placed / read from

slower_than_global_p95: bool

Currently slower than 95% of other bundles globally

slower_than_local_p95: bool

Currently slower then 95% of other bundles on remote host

src_bundle: BundleDetails | None

If this is a backup bundle, this points to the original bundle that it’s backing up. None otherwise.

start_ts: float

Starting time

username: str | None

The remote username running this bundle or None if none (yet)

uuid: str

A unique identifier

was_cancelled: bool

True if this bundle was cancelled, False if it finished normally

worker: RemoteWorkerRecord | None

The remote worker running this bundle or None if none (yet)

pyutils.parallelize.executors.ConfigRemoteWorkerPoolProvider(json_remote_worker_pool: Dict[str, Any])[source]
Parameters:

json_remote_worker_pool (Dict[str, Any]) –

class pyutils.parallelize.executors.ProcessExecutor(max_workers=None)[source]

Bases: BaseExecutor

An executor which runs tasks in child processes.

See also ThreadExecutor and RemoteExecutor.

Parameters:

max_workers – the max number of worker processes to create.

shutdown(*, wait: bool = True, quiet: bool = False) None[source]

Shutdown the executor.

Parameters:
  • wait (bool) – wait for the shutdown to complete before returning?

  • quiet (bool) – keep it quiet, please.

Return type:

None

submit(function: Callable, *args, **kwargs) Future[source]
Raises:

Exception – executor is shutting down already.

Parameters:

function (Callable) –

Return type:

Future

class pyutils.parallelize.executors.RemoteExecutor(workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy)[source]

Bases: BaseExecutor

An executor that uses processes on remote machines to do work. To do so, it requires that a pool of remote workers to be properly configured. See instructions in pyutils.parallelize.parallelize.

Each machine in a worker pool has a weight and a count. A weight captures the relative speed of a processor on that worker and a count captures the number of synchronous tasks the worker can accept (i.e. the number of cpus on the machine).

To dispatch work to a remote machine, this class pickles the code to be executed remotely using cloudpickle. For that to work, the remote machine should be running the same version of Python as this machine, ideally in a virtual environment with the same import libraries installed. Differences in operating system and/or processor architecture don’t seem to matter for most code, though.

Warning

Mismatches in Python version or in the version numbers of third-party libraries between machines can cause problems when trying to unpickle and run code remotely.

Work to be dispatched is represented in this code by creating a “bundle”. Each bundle is assigned to a remote worker based on heuristics captured in a RemoteWorkerSelectionPolicy. In general, it attempts to load all workers in the pool and maximize throughput. Once assigned to a remote worker, pickled code is copied to that worker via scp and a remote command is issued via ssh to execute a remote_worker.py process on the remote machine. This process unpickles the code, runs it, and produces a result which is then copied back to the local machine (again via scp) where it can be processed by local code.

You can and probably must override the path of remote_worker.py on your pool machines using the –remote_worker_helper_path commandline argument (or by just changing the default in code, see above in this file’s code).

During remote work execution, this local machine acts as a controller dispatching all work to the network, copying pickled tasks out, and copying results back in. It may also be a worker in the pool but do not underestimate the cost of being a controller – it takes some cpu and a lot of network bandwidth. The work dispatcher logic attempts to detect when a controller is also a worker and reduce its load.

Some redundancy and safety provisions are made when scheduling tasks to the worker pool; e.g. slower than expected tasks have redundant backups tasks created, especially if there are otherwise idle workers. If a task fails repeatedly, the dispatcher consider it poisoned and give up on it.

Warning

This executor probably only makes sense to use with computationally expensive tasks such as jobs that will execute for ~30 seconds or longer.

The network overhead and latency of copying work from the controller (local) machine to the remote workers and copying results back again is relatively high. Especially at startup, the network can become a bottleneck. Future versions of this code may attempt to split the responsibility of being a controller (distributing work to pool machines).

Instructions for how to set this up are provided in pyutils.parallelize.parallelize.

See also ProcessExecutor and ThreadExecutor.

Parameters:
Raises:

PyUtilsException – unable to find a place to schedule work.

shutdown(*, wait: bool = True, quiet: bool = False) None[source]

Shutdown the executor.

Parameters:
  • wait (bool) –

  • quiet (bool) –

Return type:

None

submit(function: Callable, *args, **kwargs) Future[source]

Submit work to be done. This is the user entry point of this class.

Raises:

Exception – executor is already shutting down.

Parameters:

function (Callable) –

Return type:

Future

class pyutils.parallelize.executors.RemoteExecutorStatus(total_worker_count: int)[source]

Bases: object

A status ‘scoreboard’ for a remote executor tracking various metrics and able to render a periodic dump of global state.

Parameters:

total_worker_count (int) – number of workers in the pool

periodic_dump(total_bundles_submitted: int) None[source]
Parameters:

total_bundles_submitted (int) –

Return type:

None

record_acquire_worker(worker: RemoteWorkerRecord, uuid: str) None[source]

Record that bundle with uuid is assigned to a particular worker.

Parameters:
  • worker (RemoteWorkerRecord) – the record of the worker to which uuid is assigned

  • uuid (str) – the uuid of a bundle that has been assigned to a worker

Return type:

None

record_acquire_worker_already_locked(worker: RemoteWorkerRecord, uuid: str) None[source]

Same as above but an entry point that doesn’t acquire the lock for codepaths where it’s already held.

Parameters:
Return type:

None

record_bundle_details(details: BundleDetails) None[source]

Register the details about a bundle of work.

Parameters:

details (BundleDetails) –

Return type:

None

record_bundle_details_already_locked(details: BundleDetails) None[source]

Same as above but for codepaths that already hold the lock.

Parameters:

details (BundleDetails) –

Return type:

None

record_processing_began(uuid: str)[source]

Record when work on a bundle begins.

Parameters:

uuid (str) –

record_release_worker(worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool) None[source]

Record that a bundle has released a worker.

Parameters:
Return type:

None

record_release_worker_already_locked(worker: RemoteWorkerRecord, uuid: str, was_cancelled: bool) None[source]

Same as above but for codepaths that already hold the lock.

Parameters:
Return type:

None

total_idle() int[source]

How many idle workers are there currently?

Return type:

int

total_in_flight() int[source]

How many bundles are in flight currently?

Return type:

int

class pyutils.parallelize.executors.RemoteWorkerPoolProvider[source]

Bases: object

abstract get_remote_workers() List[RemoteWorkerRecord][source]
Return type:

List[RemoteWorkerRecord]

class pyutils.parallelize.executors.RemoteWorkerRecord(username: str, machine: str, weight: int, count: int)[source]

Bases: object

A record of info about a remote worker.

Parameters:
  • username (str) –

  • machine (str) –

  • weight (int) –

  • count (int) –

count: int

If this machine is selected, what is the maximum number of task that it can handle?

machine: str

Machine address / name.

username: str

Username we can ssh into on this machine to run work.

weight: int

Relative probability for the weighted policy to select this machine for scheduling work.

class pyutils.parallelize.executors.RemoteWorkerSelectionPolicy[source]

Bases: ABC

An interface definition of a policy for selecting a remote worker.

abstract acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None[source]
Parameters:

machine_to_avoid (str | None) –

Return type:

RemoteWorkerRecord | None

abstract is_worker_available() bool[source]
Return type:

bool

register_worker_pool(workers: List[RemoteWorkerRecord])[source]
Parameters:

workers (List[RemoteWorkerRecord]) –

class pyutils.parallelize.executors.RoundRobinRemoteWorkerSelectionPolicy[source]

Bases: RemoteWorkerSelectionPolicy

A remote worker selector that just round robins.

acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None[source]
Parameters:

machine_to_avoid (str | None) –

Return type:

RemoteWorkerRecord | None

is_worker_available() bool[source]
Return type:

bool

class pyutils.parallelize.executors.ThreadExecutor(max_workers: int | None = None)[source]

Bases: BaseExecutor

A threadpool executor. This executor uses Python threads to schedule tasks. Note that, at least as of python3.10, because of the global lock in the interpreter itself, these do not parallelize very well so this class is useful mostly for non-CPU intensive tasks.

See also ProcessExecutor and RemoteExecutor.

Parameters:

max_workers (Optional[int]) – maximum number of threads to create in the pool.

shutdown(*, wait: bool = True, quiet: bool = False) None[source]

Shutdown the executor.

Parameters:
  • wait (bool) – wait for the shutdown to complete before returning?

  • quiet (bool) – keep it quiet, please.

Return type:

None

submit(function: Callable, *args, **kwargs) Future[source]
Raises:

Exception – executor is shutting down already.

Parameters:

function (Callable) –

Return type:

Future

class pyutils.parallelize.executors.WeightedRandomRemoteWorkerSelectionPolicy[source]

Bases: RemoteWorkerSelectionPolicy

A remote worker selector that uses weighted RNG.

acquire_worker(machine_to_avoid: str | None = None) RemoteWorkerRecord | None[source]
Parameters:

machine_to_avoid (str | None) –

Return type:

RemoteWorkerRecord | None

is_worker_available() bool[source]
Return type:

bool

pyutils.parallelize.executors.get_remote_workers_filename() str[source]
Return type:

str

pyutils.parallelize.parallelize module

A decorator to help with simple parallelization. When decorated functions are invoked they execute on a background thread, process or remote machine depending on the style of decoration:

from pyutils.parallelize import parallelize as p

@p.parallelize    # defaults to thread-mode
def my_function(a, b, c) -> int:
    ...do some slow / expensive work, e.g., an http request

# Run with background subprocess
@p.parallelize(method=Method.PROCESS)
def my_other_function(d, e, f) -> str:
    ...do more really expensive work, e.g., a network read

# Run in a helper process on another machine.
@p.parallelize(method=Method.REMOTE)
def my_other_other_function(g, h) -> int:
    ...this work will be distributed to a remote machine pool

This will just work out of the box with Method.THREAD (the default) and Method.PROCESS but in order to use Method.REMOTE you need to do some setup work:

  1. To use @parallelize(method=Method.REMOTE) with your code you need to hook your code into pyutils.config to enable commandline flags from pyutil files. You can do this by either wrapping your main entry point with the pyutils.bootstrap.initialize() decorator or just calling config.parse() early in your program. See instructions in pyutils.bootstrap and pyutils.config for more information.

  2. You need to create and configure a pool of worker machines. All of these machines should run the same version of Python, ideally in a virtual environment (venv) with the same Python dependencies installed. See: https://docs.python.org/3/library/venv.html

    Warning

    Different versions of code, libraries, or of the interpreter itself can cause issues with running cloudpicked code.

  3. You need an account that can ssh into any / all of these pool machines non-interactively to perform tasks such as copying code to the worker machine and running Python in the aforementioned virtual environment. This likely means setting up ssh / scp with key-based authentication. See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2

  4. You need to tell this parallelization framework about the pool of machines you created by editing a JSON-based configuration file. The location of this file defaults to .remote_worker_records in your home directory but can be overridden via the –remote_worker_records_file commandline argument. An example JSON configuration can be found under examples.

  5. Finally, you will also need tell the pyutils.parallelize.executors.RemoteExecutor how to invoke the remote_worker.py on remote machines by passing its path on remote worker machines in your setup via the –remote_worker_helper_path commandline flag (or, honestly, if you made it this far, just update the default in this code – find executors.py under site-packages in your virtual environment and update the default value of the –remote_worker_helper_path flag)

If you’re trying to set this up and struggling, email me at scott.gasch@gmail.com. I’m happy to help.

What you get back when you call a decorated function (using threads, processes or a remote worker) is a pyutils.parallelize.smart_future.SmartFuture. This class attempts to transparently wrap a normal Python Future (see https://docs.python.org/3/library/concurrent.futures.html#future-objects). If your code just uses the result of a parallelized method it will block waiting on the result of the wrapped function as soon as it uses that result in a manner that requires its value to be known (e.g. using it in an expression, calling a method on it, passing it into a method, hashing it / using it as a dict key, etc…) But you can do operations that do not require the value to be known (e.g. storing it in a list, storing it as a value in a dict, etc…) safely without blocking.

There are two helper methods in pyutils.parallelize.smart_future to help deal with these SmartFuture objects called pyutils.parallelize.smart_future.wait_all() and pyutils.parallelize.smart_future.wait_any(). These, when given a collection of SmartFuture objects, will block until one (any) or all (all) are finished and yield the finished objects to the caller. Callers can be confident that any objects returned from these methods will not block when accessed. See documentation in pyutils.parallelize.smart_future for more details.

class pyutils.parallelize.parallelize.Method(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

How should we parallelize; by threads, processes or remote workers?

PROCESS = 2
REMOTE = 3
THREAD = 1
pyutils.parallelize.parallelize.parallelize(_funct: Callable | None = None, *, method: Method = Method.THREAD) Callable[source]

This is a decorator that was created to make multi-threading, multi-processing and remote machine parallelism simple in Python.

Sample usage:

from pyutils.parallelize import parallelize as p

@p.parallelize    # defaults to thread-mode
def my_function(a, b, c) -> int:
    ...do some slow / expensive work, e.g., an http request

# Run with background subprocess
@p.parallelize(method=Method.PROCESS)
def my_other_function(d, e, f) -> str:
    ...do more really expensive work, e.g., a network read

# Run in a helper process on another machine.
@p.parallelize(method=Method.REMOTE)
def my_other_other_function(g, h) -> int:
    ...this work will be distributed to a remote machine pool

This decorator will invoke the wrapped function on:

  • Method.THREAD (default): a background thread

  • Method.PROCESS: a background process

  • Method.REMOTE: a process on a remote host

The wrapped function returns immediately with a value that is wrapped in a pyutils.parallelize.smart_future.SmartFuture. This value will block if it is either read directly (via a call to _resolve()) or indirectly (by using the result in an expression, printing it, hashing it, passing it a function argument, etc…). See comments on SmartFuture for details. The value can be safely stored (without hashing) or passed as an argument without causing it to block waiting on a result. There are some convenience methods for dealing with collections of SmartFuture objects defined in smart_future.py, namely pyutils.parallelize.smart_future.wait_any() and pyutils.parallelize.smart_future.wait_all().

Warning

You may stack @parallelized methods and it will “work”. That said, having multiple layers of Method.PROCESS or Method.REMOTE will prove to be problematic because each process in the stack will use its own independent pool which will likely overload your machine with processes or your network with remote processes beyond the control mechanisms built into one instance of the pool. Be careful.

Warning

There is non-trivial overhead of pickling code and copying it over the network when you use Method.REMOTE. There’s a smaller but still considerable cost of creating a new process and passing code to/from it when you use Method.PROCESS.

Parameters:
  • _funct (Callable | None) –

  • method (Method) –

Return type:

Callable

pyutils.parallelize.selectable_event module

An object that adheres to the threading.Event interface (https://docs.python.org/3/library/threading.html#event-objects) that, unlike threading.Event, can be used with select() allowing for efficient waiting on many events. Idea stolen from https://lat.sk/2015/02/multiple-event-waiting-python-3/.

>>> events = []
>>> for n in range(10):
...     events.append(SelectableEvent())
>>> t = wait_for_multiple(events, timeout=0.5)
>>> t is None
True
>>> import random
>>> e = random.choice(events)
>>> e.set()
>>> t = wait_for_multiple(events, timeout=0.5)
>>> t == e
True
class pyutils.parallelize.selectable_event.SelectableEvent[source]

Bases: object

Create and store two file descriptors connected by a pipe. We’ll use data written to the pipe (and available on the read file descriptor) to indicate that the event is signaled. The event is created is a “not signaled” state initially.

>>> e = SelectableEvent()
>>> e.is_set()
False
clear()[source]

Make the event not set. Note: like threading.Event, this method must be called to reset the event state; simply calling wait() doesn’t change the state of the event.

The lock is to ensure that we do not have a race condition between the is_set check and the subsequent read.

fileno()[source]
Returns:

The file descriptor number of the read side of the pipe, allows this event object to be used with select.select().

is_set()[source]
Returns:

True if the event is signaled and False otherwise.

>>> e = SelectableEvent()
>>> e.is_set()
False
>>> e.set()
>>> e.is_set()
True
set()[source]

Signal the event. Wake any waiters. The lock is to ensure that we do not accidentally write more than one byte to the pipe because of a race condition between the call to is_set() and the call to write.

>>> e = SelectableEvent()
>>> e.is_set()
False
>>> e.set()
>>> e.is_set()
True
>>> e.set()
>>> e.set()
>>> e.set()
>>> rfds, _, _ = select.select([e._read_fd], [], [], 0)
>>> e._read_fd in rfds
True
>>> e.wait(0)
True
>>> e.clear()
>>> rfds, _, _ = select.select([e._read_fd], [], [], 0)
>>> e._read_fd in rfds
False
wait(timeout: float | None = None) bool[source]

Use select to check if a byte is ready to read from the read side of the pipe thus indicating the event is signaled.

Parameters:

timeout (float | None) – number of seconds to wait, at max; defaults to no time limit (wait forever).

Returns:

True if the event is signaled on exit and False otherwise.

Return type:

bool

>>> e = SelectableEvent()
>>> e.wait(1.0)
False
>>> e.set()
>>> e.wait()
True
pyutils.parallelize.selectable_event.wait_for_multiple(events: Iterable[SelectableEvent], *, timeout: float | None = None) SelectableEvent | None[source]

A helper function that, given a list of SelectableEvent, will wait until one becomes triggered with an optional timeout.

Parameters:
  • events (Iterable[SelectableEvent]) – the list of events to wait for

  • timeout (float | None) – an optional max number of seconds to wait; the default is to wait forever.

Returns:

A reference to the event that has become signaled.

Return type:

SelectableEvent | None

pyutils.parallelize.smart_future module

A Future that can be treated as a substutute for the result that it contains and will not block until it is used. At that point, if the underlying value is not yet available yet, it will block until the internal result actually becomes available.

Results from parallelize.parallelize are returned wrapped in SmartFuture instances.

Also contains some utilility code for waiting for one/many futures.

class pyutils.parallelize.smart_future.SmartFuture(wrapped_future: Future)[source]

Bases: DeferredOperand

This is a SmartFuture, a class that wraps a normal Future and can then be used, mostly, like a normal (non-Future) identifier of the type of that SmartFuture’s result.

Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known.

Parameters:

wrapped_future (fut.Future) – a normal Python concurrent.Future object that we are wrapping.

get_id() int[source]
Returns:

A unique identifier for this instance.

Return type:

int

is_ready() bool[source]
Returns:

True if the wrapped future is ready without blocking, False otherwise.

Return type:

bool

pyutils.parallelize.smart_future.wait_all(futures: List[SmartFuture], *, log_exceptions: bool = True) None[source]

Wait for all of the SmartFutures in the collection to finish before returning.

Parameters:
  • futures (List[SmartFuture]) – A collection of futures that we’re waiting for

  • log_exceptions (bool) – Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore them?

Returns:

Only when all futures in the input list are ready. Blocks until such time.

Return type:

None

pyutils.parallelize.smart_future.wait_any(futures: List[SmartFuture], *, callback: Callable = None, log_exceptions: bool = True, timeout: float = None)[source]

Await the completion of any of a collection of SmartFutures and invoke callback each time one completes, repeatedly, until they are all finished.

Parameters:
  • futures (List[SmartFuture]) – A collection of SmartFutures to wait on

  • callback (Callable) – An optional callback to invoke whenever one of the futures completes

  • log_exceptions (bool) – Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore them?

  • timeout (float) – invoke callback with a periodicity of timeout while awaiting futures

Returns:

A SmartFuture from the futures list with a result available without blocking.

pyutils.parallelize.thread_utils module

Utilities for dealing with threads + threading.

class pyutils.parallelize.thread_utils.ThreadWithReturnValue(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)[source]

Bases: Thread, Runnable

A thread whose return value is plumbed back out as the return value of join(). Use like a normal thread:

import threading

from pyutils.parallelize import thread_utils

def thread_entry_point(args):
    # do something interesting...
    return result

if __name__ == "__main__":
    thread = thread_utils.ThreadWithReturnValue(
        target=thread_entry_point,
        args=(whatever)
    )
    thread.start()
    result = thread.join()
    print(f"thread finished and returned {result}")

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

join(*args) Any[source]

Wait until the thread terminates and return the value it terminated with as the result of join.

Like normal join(), this blocks the calling thread until the thread whose join() is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be joined many times.

Raises:

RuntimeError – an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join a thread before it has been started and attempts to do so raises the same exception.

Return type:

Any

run() None[source]

Create a little wrapper around invoking the real thread entry point so we can pay attention to its return value.

Return type:

None

pyutils.parallelize.thread_utils.background_thread(_funct: Callable[[...], Any] | None) Callable[[...], Tuple[Thread, Event]][source]

A function decorator to create a background thread.

Parameters:

_funct (Callable[[...], Any] | None) – The function being wrapped such that it is invoked on a background thread.

Return type:

Callable[[…], Tuple[Thread, Event]]

Example usage:

import threading
import time

from pyutils.parallelize import thread_utils

@thread_utils.background_thread
def random(a: int, b: str, stop_event: threading.Event) -> None:
    while True:
        print(f"Hi there {b}: {a}!")
        time.sleep(10.0)
        if stop_event.is_set():
            return

def main() -> None:
    (thread, event) = random(22, "dude")
    print("back!")
    time.sleep(30.0)
    event.set()
    thread.join()

Warning

In addition to any other arguments the function has, it must take a stop_event as the last unnamed argument which it should periodically check. If the event is set, it means the thread has been requested to terminate ASAP.

pyutils.parallelize.thread_utils.current_thread_id() str[source]
Returns:

A string composed of the parent process’ id, the current process’ id and the current thread name that can be used as a unique identifier for the current thread. The former two are numbers (pids) whereas the latter is a thread id passed during thread creation time.

Return type:

str

>>> from pyutils.parallelize import thread_utils
>>> ret = thread_utils.current_thread_id()
>>> ret  
'76891/84444/MainThread:'
>>> (ppid, pid, tid) = ret.split('/')
>>> ppid.isnumeric()
True
>>> pid.isnumeric()
True
pyutils.parallelize.thread_utils.is_current_thread_main_thread() bool[source]
Returns:

True is the current (calling) thread is the process’ main thread and False otherwise.

Return type:

bool

>>> from pyutils.parallelize import thread_utils
>>> thread_utils.is_current_thread_main_thread()
True
>>> result = None
>>> def am_i_the_main_thread():
...     global result
...     result = thread_utils.is_current_thread_main_thread()
>>> am_i_the_main_thread()
>>> result
True
>>> import threading
>>> thread = threading.Thread(target=am_i_the_main_thread)
>>> thread.start()
>>> thread.join()
>>> result
False
pyutils.parallelize.thread_utils.periodically_invoke(period_sec: float, stop_after: int | None)[source]

Periodically invoke the decorated function on a background thread.

Parameters:
  • period_sec (float) – the delay period in seconds between invocations

  • stop_after (int | None) – total number of invocations to make or, if None, call forever

Returns:

a Thread object and an Event that, when signaled, will stop the invocations.

Note

It is possible to be invoked one time after the Event is set. This event can be used to stop infinite invocation style or finite invocation style decorations.

Usage:

from pyutils.parallelize import thread_utils

@thread_utils.periodically_invoke(period_sec=1.0, stop_after=3)
def hello(name: str) -> None:
    print(f"Hello, {name}")

@thread_utils.periodically_invoke(period_sec=0.5, stop_after=None)
def there(name: str, age: int) -> None:
    print(f"   ...there {name}, {age}")

Usage as a decorator doesn’t give you access to the returned stop event or thread object. To get those, wrap your periodic function manually:

from pyutils.parallelize import thread_utils

def periodic(m: str) -> None:
    print(m)

f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic)
thread, event = f("testing")
...
event.set()
thread.join()

See also pyutils.state_tracker.

Module contents