Ran black code formatter on everything.
[python_utils.git] / parallelize.py
index 334dc4e992c56aebcad796d2f6696bd63b14653e..98f883c5716ae8fa3d67101b1d38d20dece7a0b6 100644 (file)
@@ -2,17 +2,11 @@
 
 """A decorator to help with dead simple parallelization."""
 
+import atexit
 from enum import Enum
 import functools
 import typing
 
-import executors
-import smart_future
-
-ps_count = 0
-thread_count = 0
-remote_count = 0
-
 
 class Method(Enum):
     THREAD = 1
@@ -21,23 +15,23 @@ class Method(Enum):
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None,
+    *,
+    method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
@@ -59,11 +53,19 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
-    def wrapper(funct: typing.Callable):
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
+            import executors
+            import smart_future
+
             # Look for as of yet unresolved arguments in _funct's
             # argument list and resolve them now.
             newargs = []
@@ -71,9 +73,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
@@ -83,6 +83,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)