#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
"""A decorator to help with dead simple parallelization."""
-from enum import Enum
+
+import atexit
import functools
import typing
-
-import executors
-import smart_future
-
-ps_count = 0
-thread_count = 0
-remote_count = 0
+from enum import Enum
class Method(Enum):
+ """How should we parallelize; by threads, processes or remote workers?"""
+
THREAD = 1
PROCESS = 2
REMOTE = 3
def parallelize(
- _funct: typing.Optional[typing.Callable] = None,
- *,
- method: Method = Method.THREAD
+ _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
) -> typing.Callable:
- """Usage:
+ """This is a decorator that was created to make multi-threading,
+ multi-processing and remote machine parallelism simple in python.
+
+ Sample usage::
- @parallelize # defaults to thread-mode
- def my_function(a, b, c) -> int:
- ...do some slow / expensive work, e.g., an http request
+ @parallelize # defaults to thread-mode
+ def my_function(a, b, c) -> int:
+ ...do some slow / expensive work, e.g., an http request
- @parallelize(method=Method.PROCESS)
- def my_other_function(d, e, f) -> str:
- ...do more really expensice work, e.g., a network read
+ @parallelize(method=Method.PROCESS)
+ def my_other_function(d, e, f) -> str:
+ ...do more really expensive work, e.g., a network read
- @parallelize(method=Method.REMOTE)
- def my_other_other_function(g, h) -> int:
- ...this work will be distributed to a remote machine pool
+ @parallelize(method=Method.REMOTE)
+ def my_other_other_function(g, h) -> int:
+ ...this work will be distributed to a remote machine pool
- This decorator will invoke the wrapped function on:
+ This decorator will invoke the wrapped function on::
Method.THREAD (default): a background thread
Method.PROCESS: a background process
Method.REMOTE: a process on a remote host
The wrapped function returns immediately with a value that is
- wrapped in a SmartFuture. This value will block if it is either
- read directly (via a call to result._resolve) or indirectly (by
- using the result in an expression, printing it, hashing it,
- passing it a function argument, etc...). See comments on the
- SmartFuture class for details.
-
- Note: you may stack @parallelized methods and it will "work".
- That said, having multiple layers of Method.PROCESS or
- Method.REMOTE may prove to be problematic because each process in
- the stack will use its own independent pool which may overload
- your machine with processes or your network with remote processes
- beyond the control mechanisms built into one instance of the pool.
- Be careful.
+ wrapped in a :class:`SmartFuture`. This value will block if it is
+ either read directly (via a call to :meth:`_resolve`) or indirectly
+ (by using the result in an expression, printing it, hashing it,
+ passing it a function argument, etc...). See comments on
+ :class:`SmartFuture` for details.
+
+ .. warning::
+ You may stack @parallelized methods and it will "work".
+ That said, having multiple layers of :code:`Method.PROCESS` or
+ :code:`Method.REMOTE` will prove to be problematic because each process in
+ the stack will use its own independent pool which may overload
+ your machine with processes or your network with remote processes
+ beyond the control mechanisms built into one instance of the pool.
+ Be careful.
+
+ .. note::
+ There is non-trivial overhead of pickling code and
+ copying it over the network when you use :code:`Method.REMOTE`. There's
+ a smaller but still considerable cost of creating a new process
+ and passing code to/from it when you use :code:`Method.PROCESS`.
"""
- def wrapper(funct: typing.Callable):
+ def wrapper(funct: typing.Callable):
@functools.wraps(funct)
def inner_wrapper(*args, **kwargs):
+ import executors
+ import smart_future
+
# Look for as of yet unresolved arguments in _funct's
# argument list and resolve them now.
newargs = []
newargs.append(smart_future.SmartFuture.resolve(arg))
newkwargs = {}
for kw in kwargs:
- newkwargs[kw] = smart_future.SmartFuture.resolve(
- kwargs[kw]
- )
+ newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
executor = None
if method == Method.PROCESS:
elif method == Method.REMOTE:
executor = executors.DefaultExecutors().remote_pool()
assert executor is not None
+ atexit.register(executors.DefaultExecutors().shutdown)
future = executor.submit(funct, *newargs, **newkwargs)