3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with dead simple parallelization. When decorated
6 functions are invoked they execute on a background thread, process or
7 remote machine depending on the style of decoration::
9 @parallelize # defaults to thread-mode
10 def my_function(a, b, c) -> int:
11 ...do some slow / expensive work, e.g., an http request
13 @parallelize(method=Method.PROCESS)
14 def my_other_function(d, e, f) -> str:
15 ...do more really expensive work, e.g., a network read
17 @parallelize(method=Method.REMOTE)
18 def my_other_other_function(g, h) -> int:
19 ...this work will be distributed to a remote machine pool
21 This will just work out of the box with `Method.THREAD` (the default)
22 and `Method.PROCESS` but in otder to use `Method.REMOTE` you need to
25 1. To use this stuff you need to hook into :mod:`pyutils.config`
26 so that this code can see commandline arguments.
28 2. You need to create and configure a pool of worker machines.
29 All of these machines should run the same version of Python,
30 ideally in a virtual environment (venv) with the same
31 Python dependencies installed. Different versions of code
32 or of the interpreter itself can cause issues with running
35 3. You need an account that can ssh into any / all of these
36 machines non-interactively and run Python in the aforementioned
37 virtual environment. This likely means setting up ssh with
38 key-based authentication.
40 4. You need to tell this parallelization framework about the pool
41 of machines where it can dispatch work by creating a JSON based
42 configuration file. The location of this file defaults to
43 :file:`.remote_worker_records` in your home directory but can
44 be overridden via the `--remote_worker_records_file`
45 commandline argument. An example JSON configuration `can be
47 <https://wannabe.guru.org/gitweb/?p=pyutils.git;a=blob_plain;f=examples/parallelize_config/.remote_worker_records;hb=HEAD>`_.
49 5. Finally, you will also need tell the
50 :class:`executors.RemoteExecutor` how to invoke the
51 :file:`remote_worker.py` on remote machines by passing its path
52 on remote worker machines in your setup via the
53 `--remote_worker_helper_path` commandline flag.
64 """How should we parallelize; by threads, processes or remote workers?"""
72 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
74 """This is a decorator that was created to make multi-threading,
75 multi-processing and remote machine parallelism simple in Python.
79 @parallelize # defaults to thread-mode
80 def my_function(a, b, c) -> int:
81 ...do some slow / expensive work, e.g., an http request
83 @parallelize(method=Method.PROCESS)
84 def my_other_function(d, e, f) -> str:
85 ...do more really expensive work, e.g., a network read
87 @parallelize(method=Method.REMOTE)
88 def my_other_other_function(g, h) -> int:
89 ...this work will be distributed to a remote machine pool
91 This decorator will invoke the wrapped function on::
93 Method.THREAD (default): a background thread
94 Method.PROCESS: a background process
95 Method.REMOTE: a process on a remote host
97 The wrapped function returns immediately with a value that is
98 wrapped in a :class:`SmartFuture`. This value will block if it is
99 either read directly (via a call to :meth:`_resolve`) or
100 indirectly (by using the result in an expression, printing it,
101 hashing it, passing it a function argument, etc...). See comments
102 on :class:`SmartFuture` for details. The value can be safely
103 stored (without hashing) or passed as an argument without causing
104 it to block waiting on a result. There are some convenience
105 methods for dealing with collections of :class:`SmartFuture`
106 objects defined in :file:`smart_future.py`, namely
107 :meth:`smart_future.wait_any` and :meth:`smart_future.wait_all`.
110 You may stack @parallelized methods and it will "work".
111 That said, having multiple layers of :code:`Method.PROCESS` or
112 :code:`Method.REMOTE` will prove to be problematic because each process in
113 the stack will use its own independent pool which may overload
114 your machine with processes or your network with remote processes
115 beyond the control mechanisms built into one instance of the pool.
119 There is non-trivial overhead of pickling code and
120 copying it over the network when you use :code:`Method.REMOTE`. There's
121 a smaller but still considerable cost of creating a new process
122 and passing code to/from it when you use :code:`Method.PROCESS`.
126 def wrapper(funct: typing.Callable):
127 @functools.wraps(funct)
128 def inner_wrapper(*args, **kwargs):
129 from pyutils.parallelize import executors, smart_future
131 # Look for as of yet unresolved arguments in _funct's
132 # argument list and resolve them now.
135 newargs.append(smart_future.SmartFuture.resolve(arg))
138 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
141 if method == Method.PROCESS:
142 executor = executors.DefaultExecutors().process_pool()
143 elif method == Method.THREAD:
144 executor = executors.DefaultExecutors().thread_pool()
145 elif method == Method.REMOTE:
146 executor = executors.DefaultExecutors().remote_pool()
147 assert executor is not None
148 atexit.register(executors.DefaultExecutors().shutdown)
150 future = executor.submit(funct, *newargs, **newkwargs)
152 # Wrap the future that's returned in a SmartFuture object
153 # so that callers do not need to call .result(), they can
154 # just use is as normal.
155 return smart_future.SmartFuture(future)
162 return wrapper(_funct)