Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / parallelize / parallelize.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A decorator to help with dead simple parallelization."""
6
7
8 import atexit
9 import functools
10 import typing
11 from enum import Enum
12
13
14 class Method(Enum):
15     """How should we parallelize; by threads, processes or remote workers?"""
16
17     THREAD = 1
18     PROCESS = 2
19     REMOTE = 3
20
21
22 def parallelize(
23     _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
24 ) -> typing.Callable:
25     """This is a decorator that was created to make multi-threading,
26     multi-processing and remote machine parallelism simple in python.
27
28     Sample usage::
29
30         @parallelize    # defaults to thread-mode
31         def my_function(a, b, c) -> int:
32             ...do some slow / expensive work, e.g., an http request
33
34         @parallelize(method=Method.PROCESS)
35         def my_other_function(d, e, f) -> str:
36             ...do more really expensive work, e.g., a network read
37
38         @parallelize(method=Method.REMOTE)
39         def my_other_other_function(g, h) -> int:
40             ...this work will be distributed to a remote machine pool
41
42     This decorator will invoke the wrapped function on::
43
44         Method.THREAD (default): a background thread
45         Method.PROCESS: a background process
46         Method.REMOTE: a process on a remote host
47
48     The wrapped function returns immediately with a value that is
49     wrapped in a :class:`SmartFuture`.  This value will block if it is
50     either read directly (via a call to :meth:`_resolve`) or indirectly
51     (by using the result in an expression, printing it, hashing it,
52     passing it a function argument, etc...).  See comments on
53     :class:`SmartFuture` for details.
54
55     .. warning::
56         You may stack @parallelized methods and it will "work".
57         That said, having multiple layers of :code:`Method.PROCESS` or
58         :code:`Method.REMOTE` will prove to be problematic because each process in
59         the stack will use its own independent pool which may overload
60         your machine with processes or your network with remote processes
61         beyond the control mechanisms built into one instance of the pool.
62         Be careful.
63
64     .. note::
65         There is non-trivial overhead of pickling code and
66         copying it over the network when you use :code:`Method.REMOTE`.  There's
67         a smaller but still considerable cost of creating a new process
68         and passing code to/from it when you use :code:`Method.PROCESS`.
69     """
70
71     def wrapper(funct: typing.Callable):
72         @functools.wraps(funct)
73         def inner_wrapper(*args, **kwargs):
74             from pyutils.parallelize import executors, smart_future
75
76             # Look for as of yet unresolved arguments in _funct's
77             # argument list and resolve them now.
78             newargs = []
79             for arg in args:
80                 newargs.append(smart_future.SmartFuture.resolve(arg))
81             newkwargs = {}
82             for kw in kwargs:
83                 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
84
85             executor = None
86             if method == Method.PROCESS:
87                 executor = executors.DefaultExecutors().process_pool()
88             elif method == Method.THREAD:
89                 executor = executors.DefaultExecutors().thread_pool()
90             elif method == Method.REMOTE:
91                 executor = executors.DefaultExecutors().remote_pool()
92             assert executor is not None
93             atexit.register(executors.DefaultExecutors().shutdown)
94
95             future = executor.submit(funct, *newargs, **newkwargs)
96
97             # Wrap the future that's returned in a SmartFuture object
98             # so that callers do not need to call .result(), they can
99             # just use is as normal.
100             return smart_future.SmartFuture(future)
101
102         return inner_wrapper
103
104     if _funct is None:
105         return wrapper
106     else:
107         return wrapper(_funct)