More type annotations.
[python_utils.git] / parallelize.py
index 08220951a000e3ee9c24dcc908f54af1067ee265..cd3eff4e9c539c25d7d5cf3deb52a494bd2bd0f0 100644 (file)
@@ -2,14 +2,12 @@
 
 """A decorator to help with dead simple parallelization."""
 
 
 """A decorator to help with dead simple parallelization."""
 
+
+import atexit
 from enum import Enum
 import functools
 import typing
 
 from enum import Enum
 import functools
 import typing
 
-ps_count = 0
-thread_count = 0
-remote_count = 0
-
 
 class Method(Enum):
     THREAD = 1
 
 class Method(Enum):
     THREAD = 1
@@ -18,23 +16,21 @@ class Method(Enum):
 
 
 def parallelize(
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
 
     This decorator will invoke the wrapped function on:
 
@@ -56,9 +52,14 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
     """
-    def wrapper(funct: typing.Callable):
 
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
@@ -71,9 +72,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
 
             executor = None
             if method == Method.PROCESS:
@@ -83,6 +82,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)
 
 
             future = executor.submit(funct, *newargs, **newkwargs)