Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / parallelize.py
index 334dc4e992c56aebcad796d2f6696bd63b14653e..77d7649fe956a1542c1a60ce99ca96365ef59ce0 100644 (file)
@@ -1,43 +1,40 @@
 #!/usr/bin/env python3
 
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
 """A decorator to help with dead simple parallelization."""
 
 """A decorator to help with dead simple parallelization."""
 
-from enum import Enum
+
+import atexit
 import functools
 import typing
 import functools
 import typing
-
-import executors
-import smart_future
-
-ps_count = 0
-thread_count = 0
-remote_count = 0
+from enum import Enum
 
 
 class Method(Enum):
 
 
 class Method(Enum):
+    """How should we parallelize; by threads, processes or remote workers?"""
+
     THREAD = 1
     PROCESS = 2
     REMOTE = 3
 
 
 def parallelize(
     THREAD = 1
     PROCESS = 2
     REMOTE = 3
 
 
 def parallelize(
-        _funct: typing.Optional[typing.Callable] = None,
-        *,
-        method: Method = Method.THREAD
+    _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD
 ) -> typing.Callable:
     """Usage:
 
 ) -> typing.Callable:
     """Usage:
 
-    @parallelize    # defaults to thread-mode
-    def my_function(a, b, c) -> int:
-        ...do some slow / expensive work, e.g., an http request
+        @parallelize    # defaults to thread-mode
+        def my_function(a, b, c) -> int:
+            ...do some slow / expensive work, e.g., an http request
 
 
-    @parallelize(method=Method.PROCESS)
-    def my_other_function(d, e, f) -> str:
-        ...do more really expensice work, e.g., a network read
+        @parallelize(method=Method.PROCESS)
+        def my_other_function(d, e, f) -> str:
+            ...do more really expensice work, e.g., a network read
 
 
-    @parallelize(method=Method.REMOTE)
-    def my_other_other_function(g, h) -> int:
-        ...this work will be distributed to a remote machine pool
+        @parallelize(method=Method.REMOTE)
+        def my_other_other_function(g, h) -> int:
+            ...this work will be distributed to a remote machine pool
 
     This decorator will invoke the wrapped function on:
 
 
     This decorator will invoke the wrapped function on:
 
@@ -59,11 +56,19 @@ def parallelize(
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
     your machine with processes or your network with remote processes
     beyond the control mechanisms built into one instance of the pool.
     Be careful.
+
+    Also note: there is a non trivial overhead of pickling code and
+    scp'ing it over the network when you use Method.REMOTE.  There's
+    a smaller but still considerable cost of creating a new process
+    and passing code to/from it when you use Method.PROCESS.
     """
     """
-    def wrapper(funct: typing.Callable):
 
 
+    def wrapper(funct: typing.Callable):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
         @functools.wraps(funct)
         def inner_wrapper(*args, **kwargs):
+            import executors
+            import smart_future
+
             # Look for as of yet unresolved arguments in _funct's
             # argument list and resolve them now.
             newargs = []
             # Look for as of yet unresolved arguments in _funct's
             # argument list and resolve them now.
             newargs = []
@@ -71,9 +76,7 @@ def parallelize(
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
                 newargs.append(smart_future.SmartFuture.resolve(arg))
             newkwargs = {}
             for kw in kwargs:
-                newkwargs[kw] = smart_future.SmartFuture.resolve(
-                    kwargs[kw]
-                )
+                newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw])
 
             executor = None
             if method == Method.PROCESS:
 
             executor = None
             if method == Method.PROCESS:
@@ -83,6 +86,7 @@ def parallelize(
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
             elif method == Method.REMOTE:
                 executor = executors.DefaultExecutors().remote_pool()
             assert executor is not None
+            atexit.register(executors.DefaultExecutors().shutdown)
 
             future = executor.submit(funct, *newargs, **newkwargs)
 
 
             future = executor.submit(funct, *newargs, **newkwargs)