3 """A decorator to help with dead simple parallelization."""
13 """How should we parallelize; by threads, processes or remote workers?"""
21 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
25 @parallelize # defaults to thread-mode
26 def my_function(a, b, c) -> int:
27 ...do some slow / expensive work, e.g., an http request
29 @parallelize(method=Method.PROCESS)
30 def my_other_function(d, e, f) -> str:
31 ...do more really expensice work, e.g., a network read
33 @parallelize(method=Method.REMOTE)
34 def my_other_other_function(g, h) -> int:
35 ...this work will be distributed to a remote machine pool
37 This decorator will invoke the wrapped function on:
39 Method.THREAD (default): a background thread
40 Method.PROCESS: a background process
41 Method.REMOTE: a process on a remote host
43 The wrapped function returns immediately with a value that is
44 wrapped in a SmartFuture. This value will block if it is either
45 read directly (via a call to result._resolve) or indirectly (by
46 using the result in an expression, printing it, hashing it,
47 passing it a function argument, etc...). See comments on the
48 SmartFuture class for details.
50 Note: you may stack @parallelized methods and it will "work".
51 That said, having multiple layers of Method.PROCESS or
52 Method.REMOTE may prove to be problematic because each process in
53 the stack will use its own independent pool which may overload
54 your machine with processes or your network with remote processes
55 beyond the control mechanisms built into one instance of the pool.
58 Also note: there is a non trivial overhead of pickling code and
59 scp'ing it over the network when you use Method.REMOTE. There's
60 a smaller but still considerable cost of creating a new process
61 and passing code to/from it when you use Method.PROCESS.
64 def wrapper(funct: typing.Callable):
65 @functools.wraps(funct)
66 def inner_wrapper(*args, **kwargs):
70 # Look for as of yet unresolved arguments in _funct's
71 # argument list and resolve them now.
74 newargs.append(smart_future.SmartFuture.resolve(arg))
77 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
80 if method == Method.PROCESS:
81 executor = executors.DefaultExecutors().process_pool()
82 elif method == Method.THREAD:
83 executor = executors.DefaultExecutors().thread_pool()
84 elif method == Method.REMOTE:
85 executor = executors.DefaultExecutors().remote_pool()
86 assert executor is not None
87 atexit.register(executors.DefaultExecutors().shutdown)
89 future = executor.submit(funct, *newargs, **newkwargs)
91 # Wrap the future that's returned in a SmartFuture object
92 # so that callers do not need to call .result(), they can
93 # just use is as normal.
94 return smart_future.SmartFuture(future)
101 return wrapper(_funct)