Small bugfixes; also, add a new machine to the remote executor pool.
[python_utils.git] / executors.py
index 3cb0a916c080128e63a23600db76c07e93956ec9..6723bb9e2df4f1990f9780f15a813e6a4b3f0548 100644 (file)
@@ -17,6 +17,7 @@ import time
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
+from overrides import overrides
 
 from ansi import bg, fg, underline, reset
 import argparse_utils
@@ -121,6 +122,7 @@ class ThreadExecutor(BaseExecutor):
         self.histogram.add_item(duration)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -135,6 +137,7 @@ class ThreadExecutor(BaseExecutor):
             *newargs,
             **kwargs)
 
+    @overrides
     def shutdown(self,
                  wait = True) -> None:
         logger.debug(f'Shutting down threadpool executor {self.title}')
@@ -163,6 +166,7 @@ class ProcessExecutor(BaseExecutor):
         self.adjust_task_count(-1)
         return result
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -181,6 +185,7 @@ class ProcessExecutor(BaseExecutor):
         )
         return result
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         logger.debug(f'Shutting down processpool executor {self.title}')
         self._process_executor.shutdown(wait)
@@ -813,6 +818,7 @@ class RemoteExecutor(BaseExecutor):
         # they will move the result_file to this machine and let
         # the original pick them up and unpickle them.
 
+    @overrides
     def submit(self,
                function: Callable,
                *args,
@@ -822,6 +828,7 @@ class RemoteExecutor(BaseExecutor):
         self.total_bundles_submitted += 1
         return self._helper_executor.submit(self.launch, bundle)
 
+    @overrides
     def shutdown(self, wait=True) -> None:
         self._helper_executor.shutdown(wait)
         logging.debug(f'Shutting down RemoteExecutor {self.title}')
@@ -836,6 +843,7 @@ class DefaultExecutors(object):
         self.remote_executor: Optional[RemoteExecutor] = None
 
     def ping(self, host) -> bool:
+        logger.debug(f'RUN> ping -c 1 {host}')
         command = ['ping', '-c', '1', host]
         return subprocess.call(
             command,
@@ -854,9 +862,11 @@ class DefaultExecutors(object):
         return self.process_executor
 
     def remote_pool(self) -> RemoteExecutor:
+        logger.info('Looking for some helper machines...')
         if self.remote_executor is None:
             pool: List[RemoteWorkerRecord] = []
             if self.ping('cheetah.house'):
+                logger.info('Found cheetah.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -866,6 +876,7 @@ class DefaultExecutors(object):
                     ),
                 )
             if self.ping('video.house'):
+                logger.info('Found video.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -875,6 +886,7 @@ class DefaultExecutors(object):
                     ),
                 )
             if self.ping('wannabe.house'):
+                logger.info('Found wannabe.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -884,6 +896,7 @@ class DefaultExecutors(object):
                     ),
                 )
             if self.ping('meerkat.cabin'):
+                logger.info('Found meerkat.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -893,6 +906,7 @@ class DefaultExecutors(object):
                     ),
                 )
             if self.ping('backup.house'):
+                logger.info('Found backup.house')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -901,7 +915,18 @@ class DefaultExecutors(object):
                         count = 4,
                     ),
                 )
+            if self.ping('kiosk.house'):
+                logger.info('Found kiosk.house')
+                pool.append(
+                    RemoteWorkerRecord(
+                        username = 'pi',
+                        machine = 'kiosk.house',
+                        weight = 1,
+                        count = 2,
+                    ),
+                )
             if self.ping('puma.cabin'):
+                logger.info('Found puma.cabin')
                 pool.append(
                     RemoteWorkerRecord(
                         username = 'scott',
@@ -910,6 +935,13 @@ class DefaultExecutors(object):
                         count = 4,
                     ),
                 )
+
+            # The controller machine has a lot to do; go easy on it.
+            for record in pool:
+                if record.machine == platform.node() and record.count > 1:
+                    logger.info(f'Reducing workload for {record.machine}.')
+                    record.count = 1
+
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)
             self.remote_executor = RemoteExecutor(pool, policy)