Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with dead simple parallelization."""
6
7
8 import atexit
9 import functools
10 import typing
11 from enum import Enum
12
13
14 class Method(Enum):
15     """How should we parallelize; by threads, processes or remote workers?"""
16
17     THREAD = 1
18     PROCESS = 2
19     REMOTE = 3
20
21
22 def parallelize(
23     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
24 ) -> typing.Callable:
25     """This is a decorator that was created to make multi-threading,
26     multi-processing and remote machine parallelism simple in python.
27
28     Sample usage::
29
30         @parallelize    # defaults to thread-mode
31         def my_function(a, b, c) -> int:
32             ...do some slow / expensive work, e.g., an http request
33
34         @parallelize(method=Method.PROCESS)
35         def my_other_function(d, e, f) -> str:
36             ...do more really expensive work, e.g., a network read
37
38         @parallelize(method=Method.REMOTE)
39         def my_other_other_function(g, h) -> int:
40             ...this work will be distributed to a remote machine pool
41
42     This decorator will invoke the wrapped function on::
43
44         Method.THREAD (default): a background thread
45         Method.PROCESS: a background process
46         Method.REMOTE: a process on a remote host
47
48     The wrapped function returns immediately with a value that is
49     wrapped in a :class:`SmartFuture`.  This value will block if it is
50     either read directly (via a call to :meth:`_resolve`) or indirectly
51     (by using the result in an expression, printing it, hashing it,
52     passing it a function argument, etc...).  See comments on
53     :class:`SmartFuture` for details.
54
55     .. warning::
56         You may stack @parallelized methods and it will "work".
57         That said, having multiple layers of :code:`Method.PROCESS` or
58         :code:`Method.REMOTE` will prove to be problematic because each process in
59         the stack will use its own independent pool which may overload
60         your machine with processes or your network with remote processes
61         beyond the control mechanisms built into one instance of the pool.
62         Be careful.
63
64     .. note::
65         There is non-trivial overhead of pickling code and
66         copying it over the network when you use :code:`Method.REMOTE`.  There's
67         a smaller but still considerable cost of creating a new process
68         and passing code to/from it when you use :code:`Method.PROCESS`.
69     """
70
71     def wrapper(funct: typing.Callable):
72         @functools.wraps(funct)
73         def inner_wrapper(*args, **kwargs):
74             import executors
75             import smart_future
76
77             # Look for as of yet unresolved arguments in _funct's
78             # argument list and resolve them now.
79             newargs = []
80             for arg in args:
81                 newargs.append(smart_future.SmartFuture.resolve(arg))
82             newkwargs = {}
83             for kw in kwargs:
84                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
85
86             executor = None
87             if method == Method.PROCESS:
88                 executor = executors.DefaultExecutors().process_pool()
89             elif method == Method.THREAD:
90                 executor = executors.DefaultExecutors().thread_pool()
91             elif method == Method.REMOTE:
92                 executor = executors.DefaultExecutors().remote_pool()
93             assert executor is not None
94             atexit.register(executors.DefaultExecutors().shutdown)
95
96             future = executor.submit(funct, *newargs, **newkwargs)
97
98             # Wrap the future that's returned in a SmartFuture object
99             # so that callers do not need to call .result(), they can
100             # just use is as normal.
101             return smart_future.SmartFuture(future)
102
103         return inner_wrapper
104
105     if _funct is None:
106         return wrapper
107     else:
108         return wrapper(_funct)