More documentation improvements.
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with simple parallelization.  When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
8
9     from pyutils.parallelize import parallelize as p
10
11     @p.parallelize    # defaults to thread-mode
12     def my_function(a, b, c) -> int:
13         ...do some slow / expensive work, e.g., an http request
14
15     @p.parallelize(method=Method.PROCESS)
16     def my_other_function(d, e, f) -> str:
17         ...do more really expensive work, e.g., a network read
18
19     @p.parallelize(method=Method.REMOTE)
20     def my_other_other_function(g, h) -> int:
21         ...this work will be distributed to a remote machine pool
22
23 This will just work out of the box with `Method.THREAD` (the default)
24 and `Method.PROCESS` but in order to use `Method.REMOTE` you need to
25 do some setup work:
26
27     1. To use `@parallelize(method=Method.REMOTE)` with your code you
28        need to hook your code into :mod:`pyutils.config` to enable
29        commandline flags from `pyutil` files.  You can do this by
30        either wrapping your main entry point with the
31        :meth:`pyutils.bootstrap.initialize` decorator or just calling
32        `config.parse()` early in your program.  See instructions in
33        :mod:`pyutils.bootstrap` and :mod:`pyutils.config` for
34        more information.
35
36     2. You need to create and configure a pool of worker machines.
37        All of these machines should run the same version of Python,
38        ideally in a virtual environment (venv) with the same
39        Python dependencies installed.  See: https://docs.python.org/3/library/venv.html
40
41        .. warning::
42
43            Different versions of code, libraries, or of the interpreter
44            itself can cause issues with running cloudpicked code.
45
46     3. You need an account that can ssh into any / all of these pool
47        machines non-interactively to perform tasks such as copying
48        code to the worker machine and running Python in the
49        aforementioned virtual environment.  This likely means setting
50        up `ssh` / `scp` with key-based authentication.
51        See: https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys-2
52
53     4. You need to tell this parallelization framework about the pool
54        of machines you created by editing a JSON-based configuration
55        file.  The location of this file defaults to
56        :file:`.remote_worker_records` in your home directory but can
57        be overridden via the `--remote_worker_records_file`
58        commandline argument.  An example JSON configuration `can be
59        found under examples
60        <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
61
62     5. Finally, you will also need tell the
63        :class:`pyutils.parallelize.executors.RemoteExecutor` how to
64        invoke the :file:`remote_worker.py` on remote machines by
65        passing its path on remote worker machines in your setup via
66        the `--remote_worker_helper_path` commandline flag (or,
67        honestly, if you made it this far, just update the default in
68        this code -- find `executors.py` under `site-packages` in your
69        virtual environment and update the default value of the
70        `--remote_worker_helper_path` flag)
71
72     If you're trying to set this up and struggling, email me at
73     [email protected].  I'm happy to help.
74
75 """
76
77 import atexit
78 import functools
79 import typing
80 from enum import Enum
81
82
83 class Method(Enum):
84     """How should we parallelize; by threads, processes or remote workers?"""
85
86     THREAD = 1
87     PROCESS = 2
88     REMOTE = 3
89
90
91 def parallelize(
92     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
93 ) -> typing.Callable:
94     """This is a decorator that was created to make multi-threading,
95     multi-processing and remote machine parallelism simple in Python.
96
97     Sample usage::
98
99         from pyutils.parallelize import parallelize as p
100
101         @p.parallelize    # defaults to thread-mode
102         def my_function(a, b, c) -> int:
103             ...do some slow / expensive work, e.g., an http request
104
105         @p.parallelize(method=Method.PROCESS)
106         def my_other_function(d, e, f) -> str:
107             ...do more really expensive work, e.g., a network read
108
109         @p.parallelize(method=Method.REMOTE)
110         def my_other_other_function(g, h) -> int:
111             ...this work will be distributed to a remote machine pool
112
113     This decorator will invoke the wrapped function on:
114
115         - `Method.THREAD` (default): a background thread
116         - `Method.PROCESS`: a background process
117         - `Method.REMOTE`: a process on a remote host
118
119     The wrapped function returns immediately with a value that is
120     wrapped in a
121     :class:`pyutils.parallelize.smart_future.SmartFuture`.  This value
122     will block if it is either read directly (via a call to
123     :meth:`_resolve`) or indirectly (by using the result in an
124     expression, printing it, hashing it, passing it a function
125     argument, etc...).  See comments on :class:`SmartFuture` for
126     details.  The value can be safely stored (without hashing) or
127     passed as an argument without causing it to block waiting on a
128     result.  There are some convenience methods for dealing with
129     collections of :class:`SmartFuture` objects defined in
130     :file:`smart_future.py`, namely
131     :meth:`pyutils.parallelize.smart_future.wait_any` and
132     :meth:`pyutils.parallelize.smart_future.wait_all`.
133
134     .. warning::
135         You may stack @parallelized methods and it will "work".
136         That said, having multiple layers of :code:`Method.PROCESS` or
137         :code:`Method.REMOTE` will prove to be problematic because each
138         process in the stack will use its own independent pool which will
139         likely overload your machine with processes or your network with
140         remote processes beyond the control mechanisms built into one
141         instance of the pool.  Be careful.
142
143     .. warning::
144         There is non-trivial overhead of pickling code and
145         copying it over the network when you use :code:`Method.REMOTE`.  There's
146         a smaller but still considerable cost of creating a new process
147         and passing code to/from it when you use :code:`Method.PROCESS`.
148
149     """
150
151     def wrapper(funct: typing.Callable):
152         @functools.wraps(funct)
153         def inner_wrapper(*args, **kwargs):
154             from pyutils.parallelize import executors, smart_future
155
156             # Look for as of yet unresolved arguments in _funct's
157             # argument list and resolve them now.
158             newargs = []
159             for arg in args:
160                 newargs.append(smart_future.SmartFuture.resolve(arg))
161             newkwargs = {}
162             for kw in kwargs:
163                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
164
165             executor = None
166             if method == Method.PROCESS:
167                 executor = executors.DefaultExecutors().process_pool()
168             elif method == Method.THREAD:
169                 executor = executors.DefaultExecutors().thread_pool()
170             elif method == Method.REMOTE:
171                 executor = executors.DefaultExecutors().remote_pool()
172             assert executor is not None
173             atexit.register(executors.DefaultExecutors().shutdown)
174
175             future = executor.submit(funct, *newargs, **newkwargs)
176
177             # Wrap the future that's returned in a SmartFuture object
178             # so that callers do not need to call .result(), they can
179             # just use is as normal.
180             return smart_future.SmartFuture(future)
181
182         return inner_wrapper
183
184     if _funct is None:
185         return wrapper
186     else:
187         return wrapper(_funct)