Initial revision
[python_utils.git] / parallelize.py
1 #!/usr/bin/env python3
2
3 """A decorator to help with dead simple parallelization."""
4
5 from enum import Enum
6 import functools
7 import typing
8
9 import executors
10 import smart_future
11
12 ps_count = 0
13 thread_count = 0
14 remote_count = 0
15
16
17 class Method(Enum):
18     THREAD = 1
19     PROCESS = 2
20     REMOTE = 3
21
22
23 def parallelize(
24         _funct: typing.Optional[typing.Callable] = None,
25         *,
26         method: Method = Method.THREAD
27 ) -> typing.Callable:
28     """Usage:
29
30     @parallelize    # defaults to thread-mode
31     def my_function(a, b, c) -> int:
32         ...do some slow / expensive work, e.g., an http request
33
34     @parallelize(method=Method.PROCESS)
35     def my_other_function(d, e, f) -> str:
36         ...do more really expensice work, e.g., a network read
37
38     @parallelize(method=Method.REMOTE)
39     def my_other_other_function(g, h) -> int:
40         ...this work will be distributed to a remote machine pool
41
42     This decorator will invoke the wrapped function on:
43
44         Method.THREAD (default): a background thread
45         Method.PROCESS: a background process
46         Method.REMOTE: a process on a remote host
47
48     The wrapped function returns immediately with a value that is
49     wrapped in a SmartFuture.  This value will block if it is either
50     read directly (via a call to result._resolve) or indirectly (by
51     using the result in an expression, printing it, hashing it,
52     passing it a function argument, etc...).  See comments on the
53     SmartFuture class for details.
54
55     Note: you may stack @parallelized methods and it will "work".
56     That said, having multiple layers of Method.PROCESS or
57     Method.REMOTE may prove to be problematic because each process in
58     the stack will use its own independent pool which may overload
59     your machine with processes or your network with remote processes
60     beyond the control mechanisms built into one instance of the pool.
61     Be careful.
62     """
63     def wrapper(funct: typing.Callable):
64
65         @functools.wraps(funct)
66         def inner_wrapper(*args, **kwargs):
67             # Look for as of yet unresolved arguments in _funct's
68             # argument list and resolve them now.
69             newargs = []
70             for arg in args:
71                 newargs.append(smart_future.SmartFuture.resolve(arg))
72             newkwargs = {}
73             for kw in kwargs:
74                 newkwargs[kw] = smart_future.SmartFuture.resolve(
75                     kwargs[kw]
76                 )
77
78             executor = None
79             if method == Method.PROCESS:
80                 executor = executors.DefaultExecutors().process_pool()
81             elif method == Method.THREAD:
82                 executor = executors.DefaultExecutors().thread_pool()
83             elif method == Method.REMOTE:
84                 executor = executors.DefaultExecutors().remote_pool()
85             assert executor is not None
86
87             future = executor.submit(funct, *newargs, **newkwargs)
88
89             # Wrap the future that's returned in a SmartFuture object
90             # so that callers do not need to call .result(), they can
91             # just use is as normal.
92             return smart_future.SmartFuture(future)
93
94         return inner_wrapper
95
96     if _funct is None:
97         return wrapper
98     else:
99         return wrapper(_funct)