Make parallelize remember to shutdown the default executors atexit.
authorScott <[email protected]>
Mon, 24 Jan 2022 22:51:17 +0000 (14:51 -0800)
committerScott <[email protected]>
Mon, 24 Jan 2022 22:51:17 +0000 (14:51 -0800)
parallelize.py

index d9c202faf00d56cf4cfc43e36c821998c249c7c4..698a7eca130d2ffc64a9619c89bdc2be5eb34ba9 100644 (file)
@@ -2,6 +2,7 @@
 
 """A decorator to help with dead simple parallelization."""
 
 
 """A decorator to help with dead simple parallelization."""
 
+import atexit
 from enum import Enum
 import functools
 import typing
 from enum import Enum
 import functools
 import typing
@@ -14,23 +15,21 @@ class Method(Enum):
 
 
 def parallelize(
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
 
     This decorator will invoke the wrapped function on:
 
@@ -52,9 +51,14 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
     """
-    def wrapper(funct: typing.Callable):
 
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
             import executors
@@ -67,9 +71,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
 
             executor = None
             if method == Method.PROCESS:
@@ -79,6 +81,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)
 
 
             future = executor.submit(funct, *newargs, **newkwargs)