Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc...
[python_utils.git] / executors.py
index 0b4d80ed1b8b0b387b4b6a7363c13c268e265a5c..d5049a264317c2f764d2068e7108a65d858f7cb2 100644 (file)
@@ -17,6 +17,7 @@ import time
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+from overrides import overrides
 
 from ansi import bg, fg, underline, reset
 import argparse_utils
@@ -121,6 +122,7 @@ class ThreadExecutor(BaseExecutor):
         self.histogram.add_item(duration)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -135,6 +137,7 @@ class ThreadExecutor(BaseExecutor):
             *newargs,
             **kwargs)
 
+    @overrides
     def shutdown(self,
                  wait = True) -> None:
         logger.debug(f'Shutting down threadpool executor {self.title}')
@@ -163,6 +166,7 @@ class ProcessExecutor(BaseExecutor):
         self.adjust_task_count(-1)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -181,6 +185,7 @@ class ProcessExecutor(BaseExecutor):
         )
         return result
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         logger.debug(f'Shutting down processpool executor {self.title}')
         self._process_executor.shutdown(wait)
@@ -476,6 +481,7 @@ class RemoteExecutor(BaseExecutor):
                  policy: RemoteWorkerSelectionPolicy) -> None:
         super().__init__()
         self.workers = workers
+        self.policy = policy
         self.worker_count = 0
         for worker in self.workers:
             self.worker_count += worker.count
@@ -483,7 +489,6 @@ class RemoteExecutor(BaseExecutor):
             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
             logger.critical(msg)
             raise Exception(msg)
-        self.policy = policy
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
         self._helper_executor = fut.ThreadPoolExecutor(
@@ -493,7 +498,7 @@ class RemoteExecutor(BaseExecutor):
         self.status = RemoteExecutorStatus(self.worker_count)
         self.total_bundles_submitted = 0
         logger.debug(
-            f'Creating remote processpool with {self.worker_count} remote endpoints.'
+            f'Creating remote processpool with {self.worker_count} remote worker threads.'
         )
 
     def is_worker_available(self) -> bool:
@@ -813,6 +818,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -822,6 +828,7 @@ class RemoteExecutor(BaseExecutor):
         self.total_bundles_submitted += 1
         return self._helper_executor.submit(self.launch, bundle)
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         self._helper_executor.shutdown(wait)
         logging.debug(f'Shutting down RemoteExecutor {self.title}')