6d31174424c2b415866a0b539a6cb4d64579a7fc
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with dead simple parallelization.  See usage
6 below.
7
8 This will just work with `Method.THREAD` and `Method.PROCESS` but to
9 use `Method.REMOTE` you need to do some setup work.  You need to
10 configure a pool of workers.  Each worker should run the same version
11 of Python, ideally in identically configured virtual environments.
12 And you need to be able to ssh into each machine using key-based
13 authentication (i.e. non-iteractively) and run python.  List machines
14 in the location set by `--remote_worker_records_file` (see
15 :file:executors.h for flag and an example JSON file under examples).
16
17 """
18
19
20 import atexit
21 import functools
22 import typing
23 from enum import Enum
24
25
26 class Method(Enum):
27     """How should we parallelize; by threads, processes or remote workers?"""
28
29     THREAD = 1
30     PROCESS = 2
31     REMOTE = 3
32
33
34 def parallelize(
35     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
36 ) -> typing.Callable:
37     """This is a decorator that was created to make multi-threading,
38     multi-processing and remote machine parallelism simple in python.
39
40     Sample usage::
41
42         @parallelize    # defaults to thread-mode
43         def my_function(a, b, c) -> int:
44             ...do some slow / expensive work, e.g., an http request
45
46         @parallelize(method=Method.PROCESS)
47         def my_other_function(d, e, f) -> str:
48             ...do more really expensive work, e.g., a network read
49
50         @parallelize(method=Method.REMOTE)
51         def my_other_other_function(g, h) -> int:
52             ...this work will be distributed to a remote machine pool
53
54     This decorator will invoke the wrapped function on::
55
56         Method.THREAD (default): a background thread
57         Method.PROCESS: a background process
58         Method.REMOTE: a process on a remote host
59
60     The wrapped function returns immediately with a value that is
61     wrapped in a :class:`SmartFuture`.  This value will block if it is
62     either read directly (via a call to :meth:`_resolve`) or indirectly
63     (by using the result in an expression, printing it, hashing it,
64     passing it a function argument, etc...).  See comments on
65     :class:`SmartFuture` for details.
66
67     .. warning::
68         You may stack @parallelized methods and it will "work".
69         That said, having multiple layers of :code:`Method.PROCESS` or
70         :code:`Method.REMOTE` will prove to be problematic because each process in
71         the stack will use its own independent pool which may overload
72         your machine with processes or your network with remote processes
73         beyond the control mechanisms built into one instance of the pool.
74         Be careful.
75
76     .. note::
77         There is non-trivial overhead of pickling code and
78         copying it over the network when you use :code:`Method.REMOTE`.  There's
79         a smaller but still considerable cost of creating a new process
80         and passing code to/from it when you use :code:`Method.PROCESS`.
81     """
82
83     def wrapper(funct: typing.Callable):
84         @functools.wraps(funct)
85         def inner_wrapper(*args, **kwargs):
86             from pyutils.parallelize import executors, smart_future
87
88             # Look for as of yet unresolved arguments in _funct's
89             # argument list and resolve them now.
90             newargs = []
91             for arg in args:
92                 newargs.append(smart_future.SmartFuture.resolve(arg))
93             newkwargs = {}
94             for kw in kwargs:
95                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
96
97             executor = None
98             if method == Method.PROCESS:
99                 executor = executors.DefaultExecutors().process_pool()
100             elif method == Method.THREAD:
101                 executor = executors.DefaultExecutors().thread_pool()
102             elif method == Method.REMOTE:
103                 executor = executors.DefaultExecutors().remote_pool()
104             assert executor is not None
105             atexit.register(executors.DefaultExecutors().shutdown)
106
107             future = executor.submit(funct, *newargs, **newkwargs)
108
109             # Wrap the future that's returned in a SmartFuture object
110             # so that callers do not need to call .result(), they can
111             # just use is as normal.
112             return smart_future.SmartFuture(future)
113
114         return inner_wrapper
115
116     if _funct is None:
117         return wrapper
118     else:
119         return wrapper(_funct)