More mypy cleanup... ugh.
[python_utils.git] / executors.py
index 20aa9d2e325e5b5e54a77836f6117b91d99c36be..e95ed716043b4962cd939b6d25885fd87826466a 100644 (file)
@@ -74,22 +74,47 @@ def make_cloud_pickle(fun, *args, **kwargs):
 class BaseExecutor(ABC):
     def __init__(self, *, title=''):
         self.title = title
-        self.task_count = 0
         self.histogram = hist.SimpleHistogram(
             hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
         )
+        self.task_count = 0
 
     @abstractmethod
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
         pass
 
     @abstractmethod
-    def shutdown(self, wait: bool = True) -> None:
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
         pass
 
+    def shutdown_if_idle(self, *, quiet: bool = False) -> bool:
+        """Shutdown the executor and return True if the executor is idle
+        (i.e. there are no pending or active tasks).  Return False
+        otherwise.  Note: this should only be called by the launcher
+        process.
+
+        """
+        if self.task_count == 0:
+            self.shutdown(wait=True, quiet=quiet)
+            return True
+        return False
+
     def adjust_task_count(self, delta: int) -> None:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
         self.task_count += delta
-        logger.debug(f'Executor current task count is {self.task_count}')
+        logger.debug(f'Adjusted task count by {delta} to {self.task_count}')
+
+    def get_task_count(self) -> int:
+        """Change the task count.  Note: do not call this method from a
+        worker, it should only be called by the launcher process /
+        thread / machine.
+
+        """
+        return self.task_count
 
 
 class ThreadExecutor(BaseExecutor):
@@ -104,34 +129,39 @@ class ThreadExecutor(BaseExecutor):
         self._thread_pool_executor = fut.ThreadPoolExecutor(
             max_workers=workers, thread_name_prefix="thread_executor_helper"
         )
+        self.already_shutdown = False
 
+    # This is run on a different thread; do not adjust task count here.
     def run_local_bundle(self, fun, *args, **kwargs):
         logger.debug(f"Running local bundle at {fun.__name__}")
-        start = time.time()
         result = fun(*args, **kwargs)
-        end = time.time()
-        self.adjust_task_count(-1)
-        duration = end - start
-        logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
-        self.histogram.add_item(duration)
         return result
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         self.adjust_task_count(+1)
         newargs = []
         newargs.append(function)
         for arg in args:
             newargs.append(arg)
-        return self._thread_pool_executor.submit(
+        start = time.time()
+        result = self._thread_pool_executor.submit(
             self.run_local_bundle, *newargs, **kwargs
         )
+        result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
+        return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logger.debug(f'Shutting down threadpool executor {self.title}')
-        print(self.histogram)
-        self._thread_pool_executor.shutdown(wait)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug(f'Shutting down threadpool executor {self.title}')
+            self._thread_pool_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
 
 class ProcessExecutor(BaseExecutor):
@@ -146,28 +176,35 @@ class ProcessExecutor(BaseExecutor):
         self._process_executor = fut.ProcessPoolExecutor(
             max_workers=workers,
         )
+        self.already_shutdown = False
 
+    # This is run in another process; do not adjust task count here.
     def run_cloud_pickle(self, pickle):
         fun, args, kwargs = cloudpickle.loads(pickle)
         logger.debug(f"Running pickled bundle at {fun.__name__}")
         result = fun(*args, **kwargs)
-        self.adjust_task_count(-1)
         return result
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         start = time.time()
         self.adjust_task_count(+1)
         pickle = make_cloud_pickle(function, *args, **kwargs)
         result = self._process_executor.submit(self.run_cloud_pickle, pickle)
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+        result.add_done_callback(lambda _: self.adjust_task_count(-1))
         return result
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logger.debug(f'Shutting down processpool executor {self.title}')
-        self._process_executor.shutdown(wait)
-        print(self.histogram)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logger.debug(f'Shutting down processpool executor {self.title}')
+            self._process_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
     def __getstate__(self):
         state = self.__dict__.copy()
@@ -521,6 +558,7 @@ class RemoteExecutor(BaseExecutor):
             self.heartbeat_thread,
             self.heartbeat_stop_event,
         ) = self.run_periodic_heartbeat()
+        self.already_shutdown = False
 
     @background_thread
     def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
@@ -1063,18 +1101,23 @@ class RemoteExecutor(BaseExecutor):
 
     @overrides
     def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
+        if self.already_shutdown:
+            raise Exception('Submitted work after shutdown.')
         pickle = make_cloud_pickle(function, *args, **kwargs)
         bundle = self.create_original_bundle(pickle, function.__name__)
         self.total_bundles_submitted += 1
         return self._helper_executor.submit(self.launch, bundle)
 
     @overrides
-    def shutdown(self, wait=True) -> None:
-        logging.debug(f'Shutting down RemoteExecutor {self.title}')
-        self.heartbeat_stop_event.set()
-        self.heartbeat_thread.join()
-        self._helper_executor.shutdown(wait)
-        print(self.histogram)
+    def shutdown(self, *, wait: bool = True, quiet: bool = False) -> None:
+        if not self.already_shutdown:
+            logging.debug(f'Shutting down RemoteExecutor {self.title}')
+            self.heartbeat_stop_event.set()
+            self.heartbeat_thread.join()
+            self._helper_executor.shutdown(wait)
+            if not quiet:
+                print(self.histogram.__repr__(label_formatter='%ds'))
+            self.already_shutdown = True
 
 
 @singleton
@@ -1172,11 +1215,11 @@ class DefaultExecutors(object):
 
     def shutdown(self) -> None:
         if self.thread_executor is not None:
-            self.thread_executor.shutdown()
+            self.thread_executor.shutdown(wait=True, quiet=True)
             self.thread_executor = None
         if self.process_executor is not None:
-            self.process_executor.shutdown()
+            self.process_executor.shutdown(wait=True, quiet=True)
             self.process_executor = None
         if self.remote_executor is not None:
-            self.remote_executor.shutdown()
+            self.remote_executor.shutdown(wait=True, quiet=True)
             self.remote_executor = None