3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with simple parallelization. When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
9 from pyutils.parallelize import parallelize as p
11 @p.parallelize # defaults to thread-mode
12 def my_function(a, b, c) -> int:
13 ...do some slow / expensive work, e.g., an http request
15 @p.parallelize(method=Method.PROCESS)
16 def my_other_function(d, e, f) -> str:
17 ...do more really expensive work, e.g., a network read
19 @p.parallelize(method=Method.REMOTE)
20 def my_other_other_function(g, h) -> int:
21 ...this work will be distributed to a remote machine pool
23 This will just work out of the box with `Method.THREAD` (the default)
24 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
27 1. To use `@parallelize(method=Method.REMOTE)` with your code you
28 need to hook your code into :mod:`pyutils.config` to enable
29 commandline flags from `pyutil` files. You can do this by
30 either wrapping your main entry point with the
31 :meth:`pyutils.bootstrap.initialize` decorator or just calling
32 `config.parse()` early in your program. See instructions in
33 :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
36 2. You need to create and configure a pool of worker machines.
37 All of these machines should run the same version of Python,
38 ideally in a virtual environment (venv) with the same
39 Python dependencies installed. See: https://docs.python.org/3/library/venv.html
43 Different versions of code, libraries, or of the interpreter
44 itself can cause issues with running cloudpicked code.
46 3. You need an account that can ssh into any / all of these pool
47 machines non-interactively to perform tasks such as copying
48 code to the worker machine and running Python in the
49 aforementioned virtual environment. This likely means setting
50 up `ssh` / `scp` with key-based authentication.
51 See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
53 4. You need to tell this parallelization framework about the pool
54 of machines you created by editing a JSON-based configuration
55 file. The location of this file defaults to
56 :file:`.remote_worker_records` in your home directory but can
57 be overridden via the `--remote_worker_records_file`
58 commandline argument. An example JSON configuration `can be
60 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
62 5. Finally, you will also need tell the
63 :class:`pyutils.parallelize.executors.RemoteExecutor` how to
64 invoke the :file:`remote_worker.py` on remote machines by
65 passing its path on remote worker machines in your setup via
66 the `--remote_worker_helper_path` commandline flag (or,
67 honestly, if you made it this far, just update the default in
68 this code -- find `executors.py` under `site-packages` in your
69 virtual environment and update the default value of the
70 `--remote_worker_helper_path` flag)
72 If you're trying to set this up and struggling, email me at
84 """How should we parallelize; by threads, processes or remote workers?"""
92 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
94 """This is a decorator that was created to make multi-threading,
95 multi-processing and remote machine parallelism simple in Python.
99 from pyutils.parallelize import parallelize as p
101 @p.parallelize # defaults to thread-mode
102 def my_function(a, b, c) -> int:
103 ...do some slow / expensive work, e.g., an http request
105 @p.parallelize(method=Method.PROCESS)
106 def my_other_function(d, e, f) -> str:
107 ...do more really expensive work, e.g., a network read
109 @p.parallelize(method=Method.REMOTE)
110 def my_other_other_function(g, h) -> int:
111 ...this work will be distributed to a remote machine pool
113 This decorator will invoke the wrapped function on:
115 - `Method.THREAD` (default): a background thread
116 - `Method.PROCESS`: a background process
117 - `Method.REMOTE`: a process on a remote host
119 The wrapped function returns immediately with a value that is
121 :class:`pyutils.parallelize.smart_future.SmartFuture`. This value
122 will block if it is either read directly (via a call to
123 :meth:`_resolve`) or indirectly (by using the result in an
124 expression, printing it, hashing it, passing it a function
125 argument, etc...). See comments on :class:`SmartFuture` for
126 details. The value can be safely stored (without hashing) or
127 passed as an argument without causing it to block waiting on a
128 result. There are some convenience methods for dealing with
129 collections of :class:`SmartFuture` objects defined in
130 :file:`smart_future.py`, namely
131 :meth:`pyutils.parallelize.smart_future.wait_any` and
132 :meth:`pyutils.parallelize.smart_future.wait_all`.
135 You may stack @parallelized methods and it will "work".
136 That said, having multiple layers of :code:`Method.PROCESS` or
137 :code:`Method.REMOTE` will prove to be problematic because each
138 process in the stack will use its own independent pool which will
139 likely overload your machine with processes or your network with
140 remote processes beyond the control mechanisms built into one
141 instance of the pool. Be careful.
144 There is non-trivial overhead of pickling code and
145 copying it over the network when you use :code:`Method.REMOTE`. There's
146 a smaller but still considerable cost of creating a new process
147 and passing code to/from it when you use :code:`Method.PROCESS`.
151 def wrapper(funct: typing.Callable):
152 @functools.wraps(funct)
153 def inner_wrapper(*args, **kwargs):
154 from pyutils.parallelize import executors, smart_future
156 # Look for as of yet unresolved arguments in _funct's
157 # argument list and resolve them now.
160 newargs.append(smart_future.SmartFuture.resolve(arg))
163 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
166 if method == Method.PROCESS:
167 executor = executors.DefaultExecutors().process_pool()
168 elif method == Method.THREAD:
169 executor = executors.DefaultExecutors().thread_pool()
170 elif method == Method.REMOTE:
171 executor = executors.DefaultExecutors().remote_pool()
172 assert executor is not None
173 atexit.register(executors.DefaultExecutors().shutdown)
175 future = executor.submit(funct, *newargs, **newkwargs)
177 # Wrap the future that's returned in a SmartFuture object
178 # so that callers do not need to call .result(), they can
179 # just use is as normal.
180 return smart_future.SmartFuture(future)
187 return wrapper(_funct)