Update docs.
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with simple parallelization.  When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
8
9     from pyutils.parallelize import parallelize as p
10
11     @p.parallelize    # defaults to thread-mode
12     def my_function(a, b, c) -> int:
13         ...do some slow / expensive work, e.g., an http request
14
15     # Run with background subprocess
16     @p.parallelize(method=Method.PROCESS)
17     def my_other_function(d, e, f) -> str:
18         ...do more really expensive work, e.g., a network read
19
20     # Run in a helper process on another machine.
21     @p.parallelize(method=Method.REMOTE)
22     def my_other_other_function(g, h) -> int:
23         ...this work will be distributed to a remote machine pool
24
25 This will just work out of the box with `Method.THREAD` (the default)
26 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
27 do some setup work:
28
29     1. To use `@parallelize(method=Method.REMOTE)` with your code you
30        need to hook your code into :mod:`pyutils.config` to enable
31        commandline flags from `pyutil` files.  You can do this by
32        either wrapping your main entry point with the
33        :meth:`pyutils.bootstrap.initialize` decorator or just calling
34        `config.parse()` early in your program.  See instructions in
35        :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
36        more information.
37
38     2. You need to create and configure a pool of worker machines.
39        All of these machines should run the same version of Python,
40        ideally in a virtual environment (venv) with the same
41        Python dependencies installed.  See: https://docs.python.org/3/library/venv.html
42
43        .. warning::
44
45            Different versions of code, libraries, or of the interpreter
46            itself can cause issues with running cloudpicked code.
47
48     3. You need an account that can ssh into any / all of these pool
49        machines non-interactively to perform tasks such as copying
50        code to the worker machine and running Python in the
51        aforementioned virtual environment.  This likely means setting
52        up `ssh` / `scp` with key-based authentication.
53        See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
54
55     4. You need to tell this parallelization framework about the pool
56        of machines you created by editing a JSON-based configuration
57        file.  The location of this file defaults to
58        :file:`.remote_worker_records` in your home directory but can
59        be overridden via the `--remote_worker_records_file`
60        commandline argument.  An example JSON configuration `can be
61        found under examples
62        <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
63
64     5. Finally, you will also need tell the
65        :class:`pyutils.parallelize.executors.RemoteExecutor` how to
66        invoke the :file:`remote_worker.py` on remote machines by
67        passing its path on remote worker machines in your setup via
68        the `--remote_worker_helper_path` commandline flag (or,
69        honestly, if you made it this far, just update the default in
70        this code -- find `executors.py` under `site-packages` in your
71        virtual environment and update the default value of the
72        `--remote_worker_helper_path` flag)
73
74     If you're trying to set this up and struggling, email me at
75     [email protected].  I'm happy to help.
76
77     What you get back when you call a decorated message is a
78     :class:`pyutils.parallelize.smart_future.SmartFuture`.  This class
79     attempts to transparently wrap a normal Python :class:`Future`
80     (see
81     https://docs.python.org/3/library/concurrent.futures.html#future-objects).
82     If your code just uses the result of a `parallelized` method it
83     will block waiting on the result of the method as soon as it uses
84     the result in a manner that requires its value to be known
85     (e.g. using it in an expression, calling a method on it, hashing
86     it, etc...)  But you can do operations that do not require the
87     value to be known (e.g. storing it in a list, storing it as a
88     value in a dict, etc...) without blocking.
89
90     There are two helper methods in
91     :mod:`pyutils.parallelize.smart_future` to help deal with these
92     :class:`SmartFuture` objects called
93     :meth:`pyutils.parallelize.smart_future.wait_all` and
94     :meth:`pyutils.parallelize.smart_future.wait_any`.  These, when
95     given a collection of :class:`SmartFuture` objects,
96     will block until one (any) or all (all) are finished and yield the
97     finished objects to the caller.  Callers can be confident that any
98     objects returned from these methods will not block when accessed.
99     See documentation in :mod:`pyutils.parallelize.smart_future` for
100     more details.
101
102 """
103
104 import atexit
105 import functools
106 import typing
107 from enum import Enum
108
109
110 class Method(Enum):
111     """How should we parallelize; by threads, processes or remote workers?"""
112
113     THREAD = 1
114     PROCESS = 2
115     REMOTE = 3
116
117
118 def parallelize(
119     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
120 ) -> typing.Callable:
121     """This is a decorator that was created to make multi-threading,
122     multi-processing and remote machine parallelism simple in Python.
123
124     Sample usage::
125
126         from pyutils.parallelize import parallelize as p
127
128         @p.parallelize    # defaults to thread-mode
129         def my_function(a, b, c) -> int:
130             ...do some slow / expensive work, e.g., an http request
131
132         @p.parallelize(method=Method.PROCESS)
133         def my_other_function(d, e, f) -> str:
134             ...do more really expensive work, e.g., a network read
135
136         @p.parallelize(method=Method.REMOTE)
137         def my_other_other_function(g, h) -> int:
138             ...this work will be distributed to a remote machine pool
139
140     This decorator will invoke the wrapped function on:
141
142         - `Method.THREAD` (default): a background thread
143         - `Method.PROCESS`: a background process
144         - `Method.REMOTE`: a process on a remote host
145
146     The wrapped function returns immediately with a value that is
147     wrapped in a
148     :class:`pyutils.parallelize.smart_future.SmartFuture`.  This value
149     will block if it is either read directly (via a call to
150     :meth:`_resolve`) or indirectly (by using the result in an
151     expression, printing it, hashing it, passing it a function
152     argument, etc...).  See comments on :class:`SmartFuture` for
153     details.  The value can be safely stored (without hashing) or
154     passed as an argument without causing it to block waiting on a
155     result.  There are some convenience methods for dealing with
156     collections of :class:`SmartFuture` objects defined in
157     :file:`smart_future.py`, namely
158     :meth:`pyutils.parallelize.smart_future.wait_any` and
159     :meth:`pyutils.parallelize.smart_future.wait_all`.
160
161     .. warning::
162         You may stack @parallelized methods and it will "work".
163         That said, having multiple layers of :code:`Method.PROCESS` or
164         :code:`Method.REMOTE` will prove to be problematic because each
165         process in the stack will use its own independent pool which will
166         likely overload your machine with processes or your network with
167         remote processes beyond the control mechanisms built into one
168         instance of the pool.  Be careful.
169
170     .. warning::
171         There is non-trivial overhead of pickling code and
172         copying it over the network when you use :code:`Method.REMOTE`.  There's
173         a smaller but still considerable cost of creating a new process
174         and passing code to/from it when you use :code:`Method.PROCESS`.
175
176     """
177
178     def wrapper(funct: typing.Callable):
179         @functools.wraps(funct)
180         def inner_wrapper(*args, **kwargs):
181             from pyutils.parallelize import executors, smart_future
182
183             # Look for as of yet unresolved arguments in _funct's
184             # argument list and resolve them now.
185             newargs = []
186             for arg in args:
187                 newargs.append(smart_future.SmartFuture.resolve(arg))
188             newkwargs = {}
189             for kw in kwargs:
190                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
191
192             executor = None
193             if method == Method.PROCESS:
194                 executor = executors.DefaultExecutors().process_pool()
195             elif method == Method.THREAD:
196                 executor = executors.DefaultExecutors().thread_pool()
197             elif method == Method.REMOTE:
198                 executor = executors.DefaultExecutors().remote_pool()
199             assert executor is not None
200             atexit.register(executors.DefaultExecutors().shutdown)
201
202             future = executor.submit(funct, *newargs, **newkwargs)
203
204             # Wrap the future that's returned in a SmartFuture object
205             # so that callers do not need to call .result(), they can
206             # just use is as normal.
207             return smart_future.SmartFuture(future)
208
209         return inner_wrapper
210
211     if _funct is None:
212         return wrapper
213     else:
214         return wrapper(_funct)