0004f17d8a8bbab65e47dc9e6fa448cb5e83a618
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with simple parallelization.  When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
8
9     from pyutils.parallelize import parallelize as p
10
11     @p.parallelize    # defaults to thread-mode
12     def my_function(a, b, c) -> int:
13         ...do some slow / expensive work, e.g., an http request
14
15     # Run with background subprocess
16     @p.parallelize(method=Method.PROCESS)
17     def my_other_function(d, e, f) -> str:
18         ...do more really expensive work, e.g., a network read
19
20     # Run in a helper process on another machine.
21     @p.parallelize(method=Method.REMOTE)
22     def my_other_other_function(g, h) -> int:
23         ...this work will be distributed to a remote machine pool
24
25 This will just work out of the box with `Method.THREAD` (the default)
26 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
27 do some setup work:
28
29     1. To use `@parallelize(method=Method.REMOTE)` with your code you
30        need to hook your code into :mod:`pyutils.config` to enable
31        commandline flags from `pyutil` files.  You can do this by
32        either wrapping your main entry point with the
33        :meth:`pyutils.bootstrap.initialize` decorator or just calling
34        `config.parse()` early in your program.  See instructions in
35        :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
36        more information.
37
38     2. You need to create and configure a pool of worker machines.
39        All of these machines should run the same version of Python,
40        ideally in a virtual environment (venv) with the same
41        Python dependencies installed.  See: https://docs.python.org/3/library/venv.html
42
43        .. warning::
44
45            Different versions of code, libraries, or of the interpreter
46            itself can cause issues with running cloudpicked code.
47
48     3. You need an account that can ssh into any / all of these pool
49        machines non-interactively to perform tasks such as copying
50        code to the worker machine and running Python in the
51        aforementioned virtual environment.  This likely means setting
52        up `ssh` / `scp` with key-based authentication.
53        See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
54
55     4. You need to tell this parallelization framework about the pool
56        of machines you created by editing a JSON-based configuration
57        file.  The location of this file defaults to
58        :file:`.remote_worker_records` in your home directory but can
59        be overridden via the `--remote_worker_records_file`
60        commandline argument.  An example JSON configuration `can be
61        found under examples
62        <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
63
64     5. Finally, you will also need tell the
65        :class:`pyutils.parallelize.executors.RemoteExecutor` how to
66        invoke the :file:`remote_worker.py` on remote machines by
67        passing its path on remote worker machines in your setup via
68        the `--remote_worker_helper_path` commandline flag (or,
69        honestly, if you made it this far, just update the default in
70        this code -- find `executors.py` under `site-packages` in your
71        virtual environment and update the default value of the
72        `--remote_worker_helper_path` flag)
73
74     If you're trying to set this up and struggling, email me at
75     [email protected].  I'm happy to help.
76
77     What you get back when you call a decorated function (using
78     threads, processes or a remote worker) is a
79     :class:`pyutils.parallelize.smart_future.SmartFuture`.  This class
80     attempts to transparently wrap a normal Python :class:`Future`
81     (see
82     https://docs.python.org/3/library/concurrent.futures.html#future-objects).
83     If your code just uses the result of a `parallelized` method it
84     will block waiting on the result of the wrapped function as soon
85     as it uses that result in a manner that requires its value to be
86     known (e.g. using it in an expression, calling a method on it,
87     passing it into a method, hashing it / using it as a dict key,
88     etc...)  But you can do operations that do not require the value
89     to be known (e.g. storing it in a list, storing it as a value in a
90     dict, etc...) safely without blocking.
91
92     There are two helper methods in
93     :mod:`pyutils.parallelize.smart_future` to help deal with these
94     :class:`SmartFuture` objects called
95     :meth:`pyutils.parallelize.smart_future.wait_all` and
96     :meth:`pyutils.parallelize.smart_future.wait_any`.  These, when
97     given a collection of :class:`SmartFuture` objects,
98     will block until one (any) or all (all) are finished and yield the
99     finished objects to the caller.  Callers can be confident that any
100     objects returned from these methods will not block when accessed.
101     See documentation in :mod:`pyutils.parallelize.smart_future` for
102     more details.
103
104 """
105
106 import atexit
107 import functools
108 import typing
109 from enum import Enum
110
111
112 class Method(Enum):
113     """How should we parallelize; by threads, processes or remote workers?"""
114
115     THREAD = 1
116     PROCESS = 2
117     REMOTE = 3
118
119
120 def parallelize(
121     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
122 ) -> typing.Callable:
123     """This is a decorator that was created to make multi-threading,
124     multi-processing and remote machine parallelism simple in Python.
125
126     Sample usage::
127
128         from pyutils.parallelize import parallelize as p
129
130         @p.parallelize    # defaults to thread-mode
131         def my_function(a, b, c) -> int:
132             ...do some slow / expensive work, e.g., an http request
133
134         # Run with background subprocess
135         @p.parallelize(method=Method.PROCESS)
136         def my_other_function(d, e, f) -> str:
137             ...do more really expensive work, e.g., a network read
138
139         # Run in a helper process on another machine.
140         @p.parallelize(method=Method.REMOTE)
141         def my_other_other_function(g, h) -> int:
142             ...this work will be distributed to a remote machine pool
143
144     This decorator will invoke the wrapped function on:
145
146         - `Method.THREAD` (default): a background thread
147         - `Method.PROCESS`: a background process
148         - `Method.REMOTE`: a process on a remote host
149
150     The wrapped function returns immediately with a value that is
151     wrapped in a
152     :class:`pyutils.parallelize.smart_future.SmartFuture`.  This value
153     will block if it is either read directly (via a call to
154     :meth:`_resolve`) or indirectly (by using the result in an
155     expression, printing it, hashing it, passing it a function
156     argument, etc...).  See comments on :class:`SmartFuture` for
157     details.  The value can be safely stored (without hashing) or
158     passed as an argument without causing it to block waiting on a
159     result.  There are some convenience methods for dealing with
160     collections of :class:`SmartFuture` objects defined in
161     :file:`smart_future.py`, namely
162     :meth:`pyutils.parallelize.smart_future.wait_any` and
163     :meth:`pyutils.parallelize.smart_future.wait_all`.
164
165     .. warning::
166         You may stack @parallelized methods and it will "work".
167         That said, having multiple layers of :code:`Method.PROCESS` or
168         :code:`Method.REMOTE` will prove to be problematic because each
169         process in the stack will use its own independent pool which will
170         likely overload your machine with processes or your network with
171         remote processes beyond the control mechanisms built into one
172         instance of the pool.  Be careful.
173
174     .. warning::
175         There is non-trivial overhead of pickling code and
176         copying it over the network when you use :code:`Method.REMOTE`.  There's
177         a smaller but still considerable cost of creating a new process
178         and passing code to/from it when you use :code:`Method.PROCESS`.
179
180     """
181
182     def wrapper(funct: typing.Callable):
183         @functools.wraps(funct)
184         def inner_wrapper(*args, **kwargs):
185             from pyutils.parallelize import executors, smart_future
186
187             # Look for as of yet unresolved arguments in _funct's
188             # argument list and resolve them now.
189             newargs = []
190             for arg in args:
191                 newargs.append(smart_future.SmartFuture.resolve(arg))
192             newkwargs = {}
193             for kw in kwargs:
194                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
195
196             executor = None
197             if method == Method.PROCESS:
198                 executor = executors.DefaultExecutors().process_pool()
199             elif method == Method.THREAD:
200                 executor = executors.DefaultExecutors().thread_pool()
201             elif method == Method.REMOTE:
202                 executor = executors.DefaultExecutors().remote_pool()
203             assert executor is not None
204             atexit.register(executors.DefaultExecutors().shutdown)
205
206             future = executor.submit(funct, *newargs, **newkwargs)
207
208             # Wrap the future that's returned in a SmartFuture object
209             # so that callers do not need to call .result(), they can
210             # just use is as normal.
211             return smart_future.SmartFuture(future)
212
213         return inner_wrapper
214
215     if _funct is None:
216         return wrapper
217     else:
218         return wrapper(_funct)