More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with dead simple parallelization.  When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
8
9     @parallelize    # defaults to thread-mode
10     def my_function(a, b, c) -> int:
11         ...do some slow / expensive work, e.g., an http request
12
13     @parallelize(method=Method.PROCESS)
14     def my_other_function(d, e, f) -> str:
15         ...do more really expensive work, e.g., a network read
16
17     @parallelize(method=Method.REMOTE)
18     def my_other_other_function(g, h) -> int:
19         ...this work will be distributed to a remote machine pool
20
21 This will just work out of the box with `Method.THREAD` (the default)
22 and `Method.PROCESS` but in otder to use `Method.REMOTE` you need to
23 do some setup work:
24
25     1. To use this stuff you need to hook into :mod:`pyutils.config`
26        so that this code can see commandline arguments.
27
28     2. You need to create and configure a pool of worker machines.
29        All of these machines should run the same version of Python,
30        ideally in a virtual environment (venv) with the same
31        Python dependencies installed.  Different versions of code
32        or of the interpreter itself can cause issues with running
33        cloudpicked code.
34
35     3. You need an account that can ssh into any / all of these
36        machines non-interactively and run Python in the aforementioned
37        virtual environment.  This likely means setting up ssh with
38        key-based authentication.
39
40     4. You need to tell this parallelization framework about the pool
41        of machines where it can dispatch work by creating a JSON based
42        configuration file.  The location of this file defaults to
43        :file:`.remote_worker_records` in your home directory but can
44        be overridden via the `--remote_worker_records_file`
45        commandline argument.  An example JSON configuration `can be
46        found under examples
47        <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
48
49     5. Finally, you will also need tell the
50        :class:`executors.RemoteExecutor` how to invoke the
51        :file:`remote_worker.py` on remote machines by passing its path
52        on remote worker machines in your setup via the
53        `--remote_worker_helper_path` commandline flag.
54
55 """
56
57 import atexit
58 import functools
59 import typing
60 from enum import Enum
61
62
63 class Method(Enum):
64     """How should we parallelize; by threads, processes or remote workers?"""
65
66     THREAD = 1
67     PROCESS = 2
68     REMOTE = 3
69
70
71 def parallelize(
72     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
73 ) -> typing.Callable:
74     """This is a decorator that was created to make multi-threading,
75     multi-processing and remote machine parallelism simple in Python.
76
77     Sample usage::
78
79         @parallelize    # defaults to thread-mode
80         def my_function(a, b, c) -> int:
81             ...do some slow / expensive work, e.g., an http request
82
83         @parallelize(method=Method.PROCESS)
84         def my_other_function(d, e, f) -> str:
85             ...do more really expensive work, e.g., a network read
86
87         @parallelize(method=Method.REMOTE)
88         def my_other_other_function(g, h) -> int:
89             ...this work will be distributed to a remote machine pool
90
91     This decorator will invoke the wrapped function on::
92
93         Method.THREAD (default): a background thread
94         Method.PROCESS: a background process
95         Method.REMOTE: a process on a remote host
96
97     The wrapped function returns immediately with a value that is
98     wrapped in a :class:`SmartFuture`.  This value will block if it is
99     either read directly (via a call to :meth:`_resolve`) or
100     indirectly (by using the result in an expression, printing it,
101     hashing it, passing it a function argument, etc...).  See comments
102     on :class:`SmartFuture` for details.  The value can be safely
103     stored (without hashing) or passed as an argument without causing
104     it to block waiting on a result.  There are some convenience
105     methods for dealing with collections of :class:`SmartFuture`
106     objects defined in :file:`smart_future.py`, namely
107     :meth:`smart_future.wait_any` and :meth:`smart_future.wait_all`.
108
109     .. warning::
110         You may stack @parallelized methods and it will "work".
111         That said, having multiple layers of :code:`Method.PROCESS` or
112         :code:`Method.REMOTE` will prove to be problematic because each process in
113         the stack will use its own independent pool which may overload
114         your machine with processes or your network with remote processes
115         beyond the control mechanisms built into one instance of the pool.
116         Be careful.
117
118     .. warning::
119         There is non-trivial overhead of pickling code and
120         copying it over the network when you use :code:`Method.REMOTE`.  There's
121         a smaller but still considerable cost of creating a new process
122         and passing code to/from it when you use :code:`Method.PROCESS`.
123
124     """
125
126     def wrapper(funct: typing.Callable):
127         @functools.wraps(funct)
128         def inner_wrapper(*args, **kwargs):
129             from pyutils.parallelize import executors, smart_future
130
131             # Look for as of yet unresolved arguments in _funct's
132             # argument list and resolve them now.
133             newargs = []
134             for arg in args:
135                 newargs.append(smart_future.SmartFuture.resolve(arg))
136             newkwargs = {}
137             for kw in kwargs:
138                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
139
140             executor = None
141             if method == Method.PROCESS:
142                 executor = executors.DefaultExecutors().process_pool()
143             elif method == Method.THREAD:
144                 executor = executors.DefaultExecutors().thread_pool()
145             elif method == Method.REMOTE:
146                 executor = executors.DefaultExecutors().remote_pool()
147             assert executor is not None
148             atexit.register(executors.DefaultExecutors().shutdown)
149
150             future = executor.submit(funct, *newargs, **newkwargs)
151
152             # Wrap the future that's returned in a SmartFuture object
153             # so that callers do not need to call .result(), they can
154             # just use is as normal.
155             return smart_future.SmartFuture(future)
156
157         return inner_wrapper
158
159     if _funct is None:
160         return wrapper
161     else:
162         return wrapper(_funct)