More cleanup, yey!
[python_utils.git] / parallelize.py
index 08220951a000e3ee9c24dcc908f54af1067ee265..b2a1cedb988d98d78cf7654e3f65d4dfaa8beff6 100644 (file)
@@ -2,39 +2,37 @@
 
 """A decorator to help with dead simple parallelization."""
 
 
 """A decorator to help with dead simple parallelization."""
 
-from enum import Enum
+
+import atexit
 import functools
 import typing
 import functools
 import typing
-
-ps_count = 0
-thread_count = 0
-remote_count = 0
+from enum import Enum
 
 
 class Method(Enum):
 
 
 class Method(Enum):
+    """How should we parallelize; by threads, processes or remote workers?"""
+
     THREAD = 1
     PROCESS = 2
     REMOTE = 3
 
 
 def parallelize(
     THREAD = 1
     PROCESS = 2
     REMOTE = 3
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
 
     This decorator will invoke the wrapped function on:
 
@@ -56,9 +54,14 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
     """
-    def wrapper(funct: typing.Callable):
 
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
@@ -71,9 +74,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
 
             executor = None
             if method == Method.PROCESS:
@@ -83,6 +84,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)
 
 
             future = executor.submit(funct, *newargs, **newkwargs)