3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with simple parallelization. When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
9 from pyutils.parallelize import parallelize as p
11 @p.parallelize # defaults to thread-mode
12 def my_function(a, b, c) -> int:
13 ...do some slow / expensive work, e.g., an http request
15 # Run with background subprocess
16 @p.parallelize(method=Method.PROCESS)
17 def my_other_function(d, e, f) -> str:
18 ...do more really expensive work, e.g., a network read
20 # Run in a helper process on another machine.
21 @p.parallelize(method=Method.REMOTE)
22 def my_other_other_function(g, h) -> int:
23 ...this work will be distributed to a remote machine pool
25 This will just work out of the box with `Method.THREAD` (the default)
26 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
29 1. To use `@parallelize(method=Method.REMOTE)` with your code you
30 need to hook your code into :mod:`pyutils.config` to enable
31 commandline flags from `pyutil` files. You can do this by
32 either wrapping your main entry point with the
33 :meth:`pyutils.bootstrap.initialize` decorator or just calling
34 `config.parse()` early in your program. See instructions in
35 :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
38 2. You need to create and configure a pool of worker machines.
39 All of these machines should run the same version of Python,
40 ideally in a virtual environment (venv) with the same
41 Python dependencies installed. See: https://docs.python.org/3/library/venv.html
45 Different versions of code, libraries, or of the interpreter
46 itself can cause issues with running cloudpicked code.
48 3. You need an account that can ssh into any / all of these pool
49 machines non-interactively to perform tasks such as copying
50 code to the worker machine and running Python in the
51 aforementioned virtual environment. This likely means setting
52 up `ssh` / `scp` with key-based authentication.
53 See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
55 4. You need to tell this parallelization framework about the pool
56 of machines you created by editing a JSON-based configuration
57 file. The location of this file defaults to
58 :file:`.remote_worker_records` in your home directory but can
59 be overridden via the `--remote_worker_records_file`
60 commandline argument. An example JSON configuration `can be
62 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
64 5. Finally, you will also need tell the
65 :class:`pyutils.parallelize.executors.RemoteExecutor` how to
66 invoke the :file:`remote_worker.py` on remote machines by
67 passing its path on remote worker machines in your setup via
68 the `--remote_worker_helper_path` commandline flag (or,
69 honestly, if you made it this far, just update the default in
70 this code -- find `executors.py` under `site-packages` in your
71 virtual environment and update the default value of the
72 `--remote_worker_helper_path` flag)
74 If you're trying to set this up and struggling, email me at
77 What you get back when you call a decorated function (using
78 threads, processes or a remote worker) is a
79 :class:`pyutils.parallelize.smart_future.SmartFuture`. This class
80 attempts to transparently wrap a normal Python :class:`Future`
82 https://docs.python.org/3/library/concurrent.futures.html#future-objects).
83 If your code just uses the result of a `parallelized` method it
84 will block waiting on the result of the wrapped function as soon
85 as it uses that result in a manner that requires its value to be
86 known (e.g. using it in an expression, calling a method on it,
87 passing it into a method, hashing it / using it as a dict key,
88 etc...) But you can do operations that do not require the value
89 to be known (e.g. storing it in a list, storing it as a value in a
90 dict, etc...) safely without blocking.
92 There are two helper methods in
93 :mod:`pyutils.parallelize.smart_future` to help deal with these
94 :class:`SmartFuture` objects called
95 :meth:`pyutils.parallelize.smart_future.wait_all` and
96 :meth:`pyutils.parallelize.smart_future.wait_any`. These, when
97 given a collection of :class:`SmartFuture` objects,
98 will block until one (any) or all (all) are finished and yield the
99 finished objects to the caller. Callers can be confident that any
100 objects returned from these methods will not block when accessed.
101 See documentation in :mod:`pyutils.parallelize.smart_future` for
109 from enum import Enum
113 """How should we parallelize; by threads, processes or remote workers?"""
121 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
122 ) -> typing.Callable:
123 """This is a decorator that was created to make multi-threading,
124 multi-processing and remote machine parallelism simple in Python.
128 from pyutils.parallelize import parallelize as p
130 @p.parallelize # defaults to thread-mode
131 def my_function(a, b, c) -> int:
132 ...do some slow / expensive work, e.g., an http request
134 # Run with background subprocess
135 @p.parallelize(method=Method.PROCESS)
136 def my_other_function(d, e, f) -> str:
137 ...do more really expensive work, e.g., a network read
139 # Run in a helper process on another machine.
140 @p.parallelize(method=Method.REMOTE)
141 def my_other_other_function(g, h) -> int:
142 ...this work will be distributed to a remote machine pool
144 This decorator will invoke the wrapped function on:
146 - `Method.THREAD` (default): a background thread
147 - `Method.PROCESS`: a background process
148 - `Method.REMOTE`: a process on a remote host
150 The wrapped function returns immediately with a value that is
152 :class:`pyutils.parallelize.smart_future.SmartFuture`. This value
153 will block if it is either read directly (via a call to
154 :meth:`_resolve`) or indirectly (by using the result in an
155 expression, printing it, hashing it, passing it a function
156 argument, etc...). See comments on :class:`SmartFuture` for
157 details. The value can be safely stored (without hashing) or
158 passed as an argument without causing it to block waiting on a
159 result. There are some convenience methods for dealing with
160 collections of :class:`SmartFuture` objects defined in
161 :file:`smart_future.py`, namely
162 :meth:`pyutils.parallelize.smart_future.wait_any` and
163 :meth:`pyutils.parallelize.smart_future.wait_all`.
166 You may stack @parallelized methods and it will "work".
167 That said, having multiple layers of :code:`Method.PROCESS` or
168 :code:`Method.REMOTE` will prove to be problematic because each
169 process in the stack will use its own independent pool which will
170 likely overload your machine with processes or your network with
171 remote processes beyond the control mechanisms built into one
172 instance of the pool. Be careful.
175 There is non-trivial overhead of pickling code and
176 copying it over the network when you use :code:`Method.REMOTE`. There's
177 a smaller but still considerable cost of creating a new process
178 and passing code to/from it when you use :code:`Method.PROCESS`.
182 def wrapper(funct: typing.Callable):
183 @functools.wraps(funct)
184 def inner_wrapper(*args, **kwargs):
185 from pyutils.parallelize import executors, smart_future
187 # Look for as of yet unresolved arguments in _funct's
188 # argument list and resolve them now.
191 newargs.append(smart_future.SmartFuture.resolve(arg))
194 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
197 if method == Method.PROCESS:
198 executor = executors.DefaultExecutors().process_pool()
199 elif method == Method.THREAD:
200 executor = executors.DefaultExecutors().thread_pool()
201 elif method == Method.REMOTE:
202 executor = executors.DefaultExecutors().remote_pool()
203 assert executor is not None
204 atexit.register(executors.DefaultExecutors().shutdown)
206 future = executor.submit(funct, *newargs, **newkwargs)
208 # Wrap the future that's returned in a SmartFuture object
209 # so that callers do not need to call .result(), they can
210 # just use is as normal.
211 return smart_future.SmartFuture(future)
218 return wrapper(_funct)