Ignore ANTLR generated files.
[python_utils.git] / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with dead simple parallelization."""
6
7
8 import atexit
9 import functools
10 import typing
11 from enum import Enum
12
13
14 class Method(Enum):
15     """How should we parallelize; by threads, processes or remote workers?"""
16
17     THREAD = 1
18     PROCESS = 2
19     REMOTE = 3
20
21
22 def parallelize(
23     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
24 ) -> typing.Callable:
25     """Usage:
26
27         @parallelize    # defaults to thread-mode
28         def my_function(a, b, c) -> int:
29             ...do some slow / expensive work, e.g., an http request
30
31         @parallelize(method=Method.PROCESS)
32         def my_other_function(d, e, f) -> str:
33             ...do more really expensice work, e.g., a network read
34
35         @parallelize(method=Method.REMOTE)
36         def my_other_other_function(g, h) -> int:
37             ...this work will be distributed to a remote machine pool
38
39     This decorator will invoke the wrapped function on:
40
41         Method.THREAD (default): a background thread
42         Method.PROCESS: a background process
43         Method.REMOTE: a process on a remote host
44
45     The wrapped function returns immediately with a value that is
46     wrapped in a SmartFuture.  This value will block if it is either
47     read directly (via a call to result._resolve) or indirectly (by
48     using the result in an expression, printing it, hashing it,
49     passing it a function argument, etc...).  See comments on the
50     SmartFuture class for details.
51
52     Note: you may stack @parallelized methods and it will "work".
53     That said, having multiple layers of Method.PROCESS or
54     Method.REMOTE may prove to be problematic because each process in
55     the stack will use its own independent pool which may overload
56     your machine with processes or your network with remote processes
57     beyond the control mechanisms built into one instance of the pool.
58     Be careful.
59
60     Also note: there is a non trivial overhead of pickling code and
61     scp'ing it over the network when you use Method.REMOTE.  There's
62     a smaller but still considerable cost of creating a new process
63     and passing code to/from it when you use Method.PROCESS.
64     """
65
66     def wrapper(funct: typing.Callable):
67         @functools.wraps(funct)
68         def inner_wrapper(*args, **kwargs):
69             import executors
70             import smart_future
71
72             # Look for as of yet unresolved arguments in _funct's
73             # argument list and resolve them now.
74             newargs = []
75             for arg in args:
76                 newargs.append(smart_future.SmartFuture.resolve(arg))
77             newkwargs = {}
78             for kw in kwargs:
79                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
80
81             executor = None
82             if method == Method.PROCESS:
83                 executor = executors.DefaultExecutors().process_pool()
84             elif method == Method.THREAD:
85                 executor = executors.DefaultExecutors().thread_pool()
86             elif method == Method.REMOTE:
87                 executor = executors.DefaultExecutors().remote_pool()
88             assert executor is not None
89             atexit.register(executors.DefaultExecutors().shutdown)
90
91             future = executor.submit(funct, *newargs, **newkwargs)
92
93             # Wrap the future that's returned in a SmartFuture object
94             # so that callers do not need to call .result(), they can
95             # just use is as normal.
96             return smart_future.SmartFuture(future)
97
98         return inner_wrapper
99
100     if _funct is None:
101         return wrapper
102     else:
103         return wrapper(_funct)