X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=parallelize.py;h=cd3eff4e9c539c25d7d5cf3deb52a494bd2bd0f0;hb=822454f580c1ff9eb207b8da46cdfae24e30cde1;hp=334dc4e992c56aebcad796d2f6696bd63b14653e;hpb=497fb9e21f45ec08e1486abaee6dfa7b20b8a691;p=python_utils.git diff --git a/parallelize.py b/parallelize.py index 334dc4e..cd3eff4 100644 --- a/parallelize.py +++ b/parallelize.py @@ -2,17 +2,12 @@ """A decorator to help with dead simple parallelization.""" + +import atexit from enum import Enum import functools import typing -import executors -import smart_future - -ps_count = 0 -thread_count = 0 -remote_count = 0 - class Method(Enum): THREAD = 1 @@ -21,23 +16,21 @@ class Method(Enum): def parallelize( - _funct: typing.Optional[typing.Callable] = None, - *, - method: Method = Method.THREAD + _funct: typing.Optional[typing.Callable] = None, *, method: Method = Method.THREAD ) -> typing.Callable: """Usage: - @parallelize # defaults to thread-mode - def my_function(a, b, c) -> int: - ...do some slow / expensive work, e.g., an http request + @parallelize # defaults to thread-mode + def my_function(a, b, c) -> int: + ...do some slow / expensive work, e.g., an http request - @parallelize(method=Method.PROCESS) - def my_other_function(d, e, f) -> str: - ...do more really expensice work, e.g., a network read + @parallelize(method=Method.PROCESS) + def my_other_function(d, e, f) -> str: + ...do more really expensice work, e.g., a network read - @parallelize(method=Method.REMOTE) - def my_other_other_function(g, h) -> int: - ...this work will be distributed to a remote machine pool + @parallelize(method=Method.REMOTE) + def my_other_other_function(g, h) -> int: + ...this work will be distributed to a remote machine pool This decorator will invoke the wrapped function on: @@ -59,11 +52,19 @@ def parallelize( your machine with processes or your network with remote processes beyond the control mechanisms built into one instance of the pool. Be careful. + + Also note: there is a non trivial overhead of pickling code and + scp'ing it over the network when you use Method.REMOTE. There's + a smaller but still considerable cost of creating a new process + and passing code to/from it when you use Method.PROCESS. """ - def wrapper(funct: typing.Callable): + def wrapper(funct: typing.Callable): @functools.wraps(funct) def inner_wrapper(*args, **kwargs): + import executors + import smart_future + # Look for as of yet unresolved arguments in _funct's # argument list and resolve them now. newargs = [] @@ -71,9 +72,7 @@ def parallelize( newargs.append(smart_future.SmartFuture.resolve(arg)) newkwargs = {} for kw in kwargs: - newkwargs[kw] = smart_future.SmartFuture.resolve( - kwargs[kw] - ) + newkwargs[kw] = smart_future.SmartFuture.resolve(kwargs[kw]) executor = None if method == Method.PROCESS: @@ -83,6 +82,7 @@ def parallelize( elif method == Method.REMOTE: executor = executors.DefaultExecutors().remote_pool() assert executor is not None + atexit.register(executors.DefaultExecutors().shutdown) future = executor.submit(funct, *newargs, **newkwargs)