3 # © Copyright 2021-2022, Scott Gasch
5 """A decorator to help with dead simple parallelization. See usage
8 This will just work with `Method.THREAD` and `Method.PROCESS` but to
9 use `Method.REMOTE` you need to do some setup work. You need to
10 configure a pool of workers. Each worker should run the same version
11 of Python, ideally in identically configured virtual environments.
12 And you need to be able to ssh into each machine using key-based
13 authentication (i.e. non-iteractively) and run python. List machines
14 in the location set by `--remote_worker_records_file` (see
15 :file:executors.h for flag and an example JSON file under examples).
27 """How should we parallelize; by threads, processes or remote workers?"""
35 _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
37 """This is a decorator that was created to make multi-threading,
38 multi-processing and remote machine parallelism simple in python.
42 @parallelize # defaults to thread-mode
43 def my_function(a, b, c) -> int:
44 ...do some slow / expensive work, e.g., an http request
46 @parallelize(method=Method.PROCESS)
47 def my_other_function(d, e, f) -> str:
48 ...do more really expensive work, e.g., a network read
50 @parallelize(method=Method.REMOTE)
51 def my_other_other_function(g, h) -> int:
52 ...this work will be distributed to a remote machine pool
54 This decorator will invoke the wrapped function on::
56 Method.THREAD (default): a background thread
57 Method.PROCESS: a background process
58 Method.REMOTE: a process on a remote host
60 The wrapped function returns immediately with a value that is
61 wrapped in a :class:`SmartFuture`. This value will block if it is
62 either read directly (via a call to :meth:`_resolve`) or indirectly
63 (by using the result in an expression, printing it, hashing it,
64 passing it a function argument, etc...). See comments on
65 :class:`SmartFuture` for details.
68 You may stack @parallelized methods and it will "work".
69 That said, having multiple layers of :code:`Method.PROCESS` or
70 :code:`Method.REMOTE` will prove to be problematic because each process in
71 the stack will use its own independent pool which may overload
72 your machine with processes or your network with remote processes
73 beyond the control mechanisms built into one instance of the pool.
77 There is non-trivial overhead of pickling code and
78 copying it over the network when you use :code:`Method.REMOTE`. There's
79 a smaller but still considerable cost of creating a new process
80 and passing code to/from it when you use :code:`Method.PROCESS`.
83 def wrapper(funct: typing.Callable):
84 @functools.wraps(funct)
85 def inner_wrapper(*args, **kwargs):
86 from pyutils.parallelize import executors, smart_future
88 # Look for as of yet unresolved arguments in _funct's
89 # argument list and resolve them now.
92 newargs.append(smart_future.SmartFuture.resolve(arg))
95 newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
98 if method == Method.PROCESS:
99 executor = executors.DefaultExecutors().process_pool()
100 elif method == Method.THREAD:
101 executor = executors.DefaultExecutors().thread_pool()
102 elif method == Method.REMOTE:
103 executor = executors.DefaultExecutors().remote_pool()
104 assert executor is not None
105 atexit.register(executors.DefaultExecutors().shutdown)
107 future = executor.submit(funct, *newargs, **newkwargs)
109 # Wrap the future that's returned in a SmartFuture object
110 # so that callers do not need to call .result(), they can
111 # just use is as normal.
112 return smart_future.SmartFuture(future)
119 return wrapper(_funct)