3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with dead simple parallelization."""
15 """How should we parallelize; by threads, processes or remote workers?"""
23 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
25 """This is a decorator that was created to make multi-threading,
26 multi-processing and remote machine parallelism simple in python.
30 @parallelize # defaults to thread-mode
31 def my_function(a, b, c) -> int:
32 ...do some slow / expensive work, e.g., an http request
34 @parallelize(method=Method.PROCESS)
35 def my_other_function(d, e, f) -> str:
36 ...do more really expensive work, e.g., a network read
38 @parallelize(method=Method.REMOTE)
39 def my_other_other_function(g, h) -> int:
40 ...this work will be distributed to a remote machine pool
42 This decorator will invoke the wrapped function on::
44 Method.THREAD (default): a background thread
45 Method.PROCESS: a background process
46 Method.REMOTE: a process on a remote host
48 The wrapped function returns immediately with a value that is
49 wrapped in a :class:`SmartFuture`. This value will block if it is
50 either read directly (via a call to :meth:`_resolve`) or indirectly
51 (by using the result in an expression, printing it, hashing it,
52 passing it a function argument, etc...). See comments on
53 :class:`SmartFuture` for details.
56 You may stack @parallelized methods and it will "work".
57 That said, having multiple layers of :code:`Method.PROCESS` or
58 :code:`Method.REMOTE` will prove to be problematic because each process in
59 the stack will use its own independent pool which may overload
60 your machine with processes or your network with remote processes
61 beyond the control mechanisms built into one instance of the pool.
65 There is non-trivial overhead of pickling code and
66 copying it over the network when you use :code:`Method.REMOTE`. There's
67 a smaller but still considerable cost of creating a new process
68 and passing code to/from it when you use :code:`Method.PROCESS`.
71 def wrapper(funct: typing.Callable):
72 @functools.wraps(funct)
73 def inner_wrapper(*args, **kwargs):
74 from pyutils.parallelize import executors, smart_future
76 # Look for as of yet unresolved arguments in _funct's
77 # argument list and resolve them now.
80 newargs.append(smart_future.SmartFuture.resolve(arg))
83 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
86 if method == Method.PROCESS:
87 executor = executors.DefaultExecutors().process_pool()
88 elif method == Method.THREAD:
89 executor = executors.DefaultExecutors().thread_pool()
90 elif method == Method.REMOTE:
91 executor = executors.DefaultExecutors().remote_pool()
92 assert executor is not None
93 atexit.register(executors.DefaultExecutors().shutdown)
95 future = executor.submit(funct, *newargs, **newkwargs)
97 # Wrap the future that's returned in a SmartFuture object
98 # so that callers do not need to call .result(), they can
99 # just use is as normal.
100 return smart_future.SmartFuture(future)
107 return wrapper(_funct)