"""A decorator to help with dead simple parallelization."""
+import atexit
from enum import Enum
import functools
import typing
def parallelize(
- _funct: typing.Optional[typing.Callable] = None,
- *,
- method: Method = Method.THREAD
+ _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
) -> typing.Callable:
"""Usage:
- @parallelize # defaults to thread-mode
- def my_function(a, b, c) -> int:
- ...do some slow / expensive work, e.g., an http request
+ @parallelize # defaults to thread-mode
+ def my_function(a, b, c) -> int:
+ ...do some slow / expensive work, e.g., an http request
- @parallelize(method=Method.PROCESS)
- def my_other_function(d, e, f) -> str:
- ...do more really expensice work, e.g., a network read
+ @parallelize(method=Method.PROCESS)
+ def my_other_function(d, e, f) -> str:
+ ...do more really expensice work, e.g., a network read
- @parallelize(method=Method.REMOTE)
- def my_other_other_function(g, h) -> int:
- ...this work will be distributed to a remote machine pool
+ @parallelize(method=Method.REMOTE)
+ def my_other_other_function(g, h) -> int:
+ ...this work will be distributed to a remote machine pool
This decorator will invoke the wrapped function on:
your machine with processes or your network with remote processes
beyond the control mechanisms built into one instance of the pool.
Be careful.
+
+ Also note: there is a non trivial overhead of pickling code and
+ scp'ing it over the network when you use Method.REMOTE. There's
+ a smaller but still considerable cost of creating a new process
+ and passing code to/from it when you use Method.PROCESS.
"""
- def wrapper(funct: typing.Callable):
+ def wrapper(funct: typing.Callable):
@functools.wraps(funct)
def inner_wrapper(*args, **kwargs):
import executors
newargs.append(smart_future.SmartFuture.resolve(arg))
newkwargs = {}
for kw in kwargs:
- newkwargs[kw] = smart_future.SmartFuture.resolve(
- kwargs[kw]
- )
+ newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
executor = None
if method == Method.PROCESS:
elif method == Method.REMOTE:
executor = executors.DefaultExecutors().remote_pool()
assert executor is not None
+ atexit.register(executors.DefaultExecutors().shutdown)
future = executor.submit(funct, *newargs, **newkwargs)