Ran black code formatter on everything.
[python_utils.git] / parallelize.py
index 334dc4e992c56aebcad796d2f6696bd63b14653e..98f883c5716ae8fa3d67101b1d38d20dece7a0b6 100644 (file)
@@ -2,17 +2,11 @@
 
 """A decorator to help with dead simple parallelization."""
 
 
 """A decorator to help with dead simple parallelization."""
 
+import atexit
 from enum import Enum
 import functools
 import typing
 
 from enum import Enum
 import functools
 import typing
 
-import executors
-import smart_future
-
-ps_count = 0
-thread_count = 0
-remote_count = 0
-
 
 class Method(Enum):
     THREAD = 1
 
 class Method(Enum):
     THREAD = 1
@@ -21,23 +15,23 @@ class Method(Enum):
 
 
 def parallelize(
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None,
+    *,
+    method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
 
     This decorator will invoke the wrapped function on:
 
@@ -59,11 +53,19 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
     """
-    def wrapper(funct: typing.Callable):
 
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
+            import executors
+            import smart_future
+
             # Look for as of yet unresolved arguments in _funct's
             # argument list and resolve them now.
             newargs = []
             # Look for as of yet unresolved arguments in _funct's
             # argument list and resolve them now.
             newargs = []
@@ -71,9 +73,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
 
             executor = None
             if method == Method.PROCESS:
@@ -83,6 +83,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)
 
 
             future = executor.submit(funct, *newargs, **newkwargs)