Making remote training work better.
[python_utils.git] / parallelize.py
1 #!/usr/bin/env python3
2
3 """A decorator to help with dead simple parallelization."""
4
5 from enum import Enum
6 import functools
7 import typing
8
9
10 class Method(Enum):
11     THREAD = 1
12     PROCESS = 2
13     REMOTE = 3
14
15
16 def parallelize(
17         _funct: typing.Optional[typing.Callable] = None,
18         *,
19         method: Method = Method.THREAD
20 ) -> typing.Callable:
21     """Usage:
22
23     @parallelize    # defaults to thread-mode
24     def my_function(a, b, c) -> int:
25         ...do some slow / expensive work, e.g., an http request
26
27     @parallelize(method=Method.PROCESS)
28     def my_other_function(d, e, f) -> str:
29         ...do more really expensice work, e.g., a network read
30
31     @parallelize(method=Method.REMOTE)
32     def my_other_other_function(g, h) -> int:
33         ...this work will be distributed to a remote machine pool
34
35     This decorator will invoke the wrapped function on:
36
37         Method.THREAD (default): a background thread
38         Method.PROCESS: a background process
39         Method.REMOTE: a process on a remote host
40
41     The wrapped function returns immediately with a value that is
42     wrapped in a SmartFuture.  This value will block if it is either
43     read directly (via a call to result._resolve) or indirectly (by
44     using the result in an expression, printing it, hashing it,
45     passing it a function argument, etc...).  See comments on the
46     SmartFuture class for details.
47
48     Note: you may stack @parallelized methods and it will "work".
49     That said, having multiple layers of Method.PROCESS or
50     Method.REMOTE may prove to be problematic because each process in
51     the stack will use its own independent pool which may overload
52     your machine with processes or your network with remote processes
53     beyond the control mechanisms built into one instance of the pool.
54     Be careful.
55     """
56     def wrapper(funct: typing.Callable):
57
58         @functools.wraps(funct)
59         def inner_wrapper(*args, **kwargs):
60             import executors
61             import smart_future
62
63             # Look for as of yet unresolved arguments in _funct's
64             # argument list and resolve them now.
65             newargs = []
66             for arg in args:
67                 newargs.append(smart_future.SmartFuture.resolve(arg))
68             newkwargs = {}
69             for kw in kwargs:
70                 newkwargs[kw] = smart_future.SmartFuture.resolve(
71                     kwargs[kw]
72                 )
73
74             executor = None
75             if method == Method.PROCESS:
76                 executor = executors.DefaultExecutors().process_pool()
77             elif method == Method.THREAD:
78                 executor = executors.DefaultExecutors().thread_pool()
79             elif method == Method.REMOTE:
80                 executor = executors.DefaultExecutors().remote_pool()
81             assert executor is not None
82
83             future = executor.submit(funct, *newargs, **newkwargs)
84
85             # Wrap the future that's returned in a SmartFuture object
86             # so that callers do not need to call .result(), they can
87             # just use is as normal.
88             return smart_future.SmartFuture(future)
89
90         return inner_wrapper
91
92     if _funct is None:
93         return wrapper
94     else:
95         return wrapper(_funct)