3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with simple parallelization. When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
9 from pyutils.parallelize import parallelize as p
11 @p.parallelize # defaults to thread-mode
12 def my_function(a, b, c) -> int:
13 ...do some slow / expensive work, e.g., an http request
15 # Run with background subprocess
16 @p.parallelize(method=Method.PROCESS)
17 def my_other_function(d, e, f) -> str:
18 ...do more really expensive work, e.g., a network read
20 # Run in a helper process on another machine.
21 @p.parallelize(method=Method.REMOTE)
22 def my_other_other_function(g, h) -> int:
23 ...this work will be distributed to a remote machine pool
25 This will just work out of the box with `Method.THREAD` (the default)
26 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
29 1. To use `@parallelize(method=Method.REMOTE)` with your code you
30 need to hook your code into :mod:`pyutils.config` to enable
31 commandline flags from `pyutil` files. You can do this by
32 either wrapping your main entry point with the
33 :meth:`pyutils.bootstrap.initialize` decorator or just calling
34 `config.parse()` early in your program. See instructions in
35 :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
38 2. You need to create and configure a pool of worker machines.
39 All of these machines should run the same version of Python,
40 ideally in a virtual environment (venv) with the same
41 Python dependencies installed. See: https://docs.python.org/3/library/venv.html
45 Different versions of code, libraries, or of the interpreter
46 itself can cause issues with running cloudpicked code.
48 3. You need an account that can ssh into any / all of these pool
49 machines non-interactively to perform tasks such as copying
50 code to the worker machine and running Python in the
51 aforementioned virtual environment. This likely means setting
52 up `ssh` / `scp` with key-based authentication.
53 See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
55 4. You need to tell this parallelization framework about the pool
56 of machines you created by editing a JSON-based configuration
57 file. The location of this file defaults to
58 :file:`.remote_worker_records` in your home directory but can
59 be overridden via the `--remote_worker_records_file`
60 commandline argument. An example JSON configuration `can be
62 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
64 5. Finally, you will also need tell the
65 :class:`pyutils.parallelize.executors.RemoteExecutor` how to
66 invoke the :file:`remote_worker.py` on remote machines by
67 passing its path on remote worker machines in your setup via
68 the `--remote_worker_helper_path` commandline flag (or,
69 honestly, if you made it this far, just update the default in
70 this code -- find `executors.py` under `site-packages` in your
71 virtual environment and update the default value of the
72 `--remote_worker_helper_path` flag)
74 If you're trying to set this up and struggling, email me at
77 What you get back when you call a decorated message is a
78 :class:`pyutils.parallelize.smart_future.SmartFuture`. This class
79 attempts to transparently wrap a normal Python :class:`Future`
81 https://docs.python.org/3/library/concurrent.futures.html#future-objects).
82 If your code just uses the result of a `parallelized` method it
83 will block waiting on the result of the method as soon as it uses
84 the result in a manner that requires its value to be known
85 (e.g. using it in an expression, calling a method on it, hashing
86 it, etc...) But you can do operations that do not require the
87 value to be known (e.g. storing it in a list, storing it as a
88 value in a dict, etc...) without blocking.
90 There are two helper methods in
91 :mod:`pyutils.parallelize.smart_future` to help deal with these
92 :class:`SmartFuture` objects called
93 :meth:`pyutils.parallelize.smart_future.wait_all` and
94 :meth:`pyutils.parallelize.smart_future.wait_any`. These, when
95 given a collection of :class:`SmartFuture` objects,
96 will block until one (any) or all (all) are finished and yield the
97 finished objects to the caller. Callers can be confident that any
98 objects returned from these methods will not block when accessed.
99 See documentation in :mod:`pyutils.parallelize.smart_future` for
107 from enum import Enum
111 """How should we parallelize; by threads, processes or remote workers?"""
119 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
120 ) -> typing.Callable:
121 """This is a decorator that was created to make multi-threading,
122 multi-processing and remote machine parallelism simple in Python.
126 from pyutils.parallelize import parallelize as p
128 @p.parallelize # defaults to thread-mode
129 def my_function(a, b, c) -> int:
130 ...do some slow / expensive work, e.g., an http request
132 @p.parallelize(method=Method.PROCESS)
133 def my_other_function(d, e, f) -> str:
134 ...do more really expensive work, e.g., a network read
136 @p.parallelize(method=Method.REMOTE)
137 def my_other_other_function(g, h) -> int:
138 ...this work will be distributed to a remote machine pool
140 This decorator will invoke the wrapped function on:
142 - `Method.THREAD` (default): a background thread
143 - `Method.PROCESS`: a background process
144 - `Method.REMOTE`: a process on a remote host
146 The wrapped function returns immediately with a value that is
148 :class:`pyutils.parallelize.smart_future.SmartFuture`. This value
149 will block if it is either read directly (via a call to
150 :meth:`_resolve`) or indirectly (by using the result in an
151 expression, printing it, hashing it, passing it a function
152 argument, etc...). See comments on :class:`SmartFuture` for
153 details. The value can be safely stored (without hashing) or
154 passed as an argument without causing it to block waiting on a
155 result. There are some convenience methods for dealing with
156 collections of :class:`SmartFuture` objects defined in
157 :file:`smart_future.py`, namely
158 :meth:`pyutils.parallelize.smart_future.wait_any` and
159 :meth:`pyutils.parallelize.smart_future.wait_all`.
162 You may stack @parallelized methods and it will "work".
163 That said, having multiple layers of :code:`Method.PROCESS` or
164 :code:`Method.REMOTE` will prove to be problematic because each
165 process in the stack will use its own independent pool which will
166 likely overload your machine with processes or your network with
167 remote processes beyond the control mechanisms built into one
168 instance of the pool. Be careful.
171 There is non-trivial overhead of pickling code and
172 copying it over the network when you use :code:`Method.REMOTE`. There's
173 a smaller but still considerable cost of creating a new process
174 and passing code to/from it when you use :code:`Method.PROCESS`.
178 def wrapper(funct: typing.Callable):
179 @functools.wraps(funct)
180 def inner_wrapper(*args, **kwargs):
181 from pyutils.parallelize import executors, smart_future
183 # Look for as of yet unresolved arguments in _funct's
184 # argument list and resolve them now.
187 newargs.append(smart_future.SmartFuture.resolve(arg))
190 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
193 if method == Method.PROCESS:
194 executor = executors.DefaultExecutors().process_pool()
195 elif method == Method.THREAD:
196 executor = executors.DefaultExecutors().thread_pool()
197 elif method == Method.REMOTE:
198 executor = executors.DefaultExecutors().remote_pool()
199 assert executor is not None
200 atexit.register(executors.DefaultExecutors().shutdown)
202 future = executor.submit(funct, *newargs, **newkwargs)
204 # Wrap the future that's returned in a SmartFuture object
205 # so that callers do not need to call .result(), they can
206 # just use is as normal.
207 return smart_future.SmartFuture(future)
214 return wrapper(_funct)