Make remote workers die if no longer needed; cleanups in executors.
authorScott Gasch <[email protected]>
Wed, 17 Nov 2021 22:40:34 +0000 (14:40 -0800)
committerScott Gasch <[email protected]>
Wed, 17 Nov 2021 22:40:34 +0000 (14:40 -0800)
executors.py
remote_worker.py

index fe8d9d0d8e749b0aa85609d04c3444e35b6e89d3..a0273264339ef3b158c1370f457f110842decff1 100644 (file)
@@ -22,7 +22,7 @@ from overrides import overrides
 from ansi import bg, fg, underline, reset
 import argparse_utils
 import config
-from exec_utils import run_silently, cmd_in_background
+from exec_utils import run_silently, cmd_in_background, cmd_with_timeout
 from decorator_utils import singleton
 import histogram as hist
 
@@ -524,6 +524,27 @@ class RemoteExecutor(BaseExecutor):
         self.status = RemoteExecutorStatus(self.worker_count)
         self.total_bundles_submitted = 0
 
+    def bundle_prefix(self, bundle: BundleDetails) -> str:
+        colorz = [
+            fg('violet red'),
+            fg('red'),
+            fg('orange'),
+            fg('peach orange'),
+            fg('yellow'),
+            fg('marigold yellow'),
+            fg('green yellow'),
+            fg('tea green'),
+            fg('cornflower blue'),
+            fg('turquoise blue'),
+            fg('tropical blue'),
+            fg('lavender purple'),
+            fg('medium purple'),
+        ]
+        c = colorz[int(bundle.uuid[-2:], 16) % len(colorz)]
+        fname = bundle.fname if bundle.fname is not None else 'nofname'
+        machine = bundle.machine if bundle.machine is not None else 'nomachine'
+        return f'{c}{bundle.uuid[-8:]}/{fname}/{machine}{reset()}'
+
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
 
@@ -621,7 +642,7 @@ class RemoteExecutor(BaseExecutor):
         username = bundle.username = worker.username
         fname = bundle.fname
         self.status.record_acquire_worker(worker, uuid)
-        logger.debug(f'{uuid}/{fname}: Running bundle on {worker}...')
+        logger.debug(f'{self.bundle_prefix(bundle)}: Running bundle on {worker}...')
 
         # Before we do any work, make sure the bundle is still viable.
         if self.check_if_cancelled(bundle):
@@ -630,7 +651,7 @@ class RemoteExecutor(BaseExecutor):
             except Exception as e:
                 logger.exception(e)
                 logger.error(
-                    f'{uuid}/{fname}: bundle says it\'s cancelled upfront but no results?!'
+                    f'{self.bundle_prefix(bundle)}: bundle says it\'s cancelled upfront but no results?!'
                 )
                 assert bundle.worker is not None
                 self.status.record_release_worker(
@@ -662,14 +683,14 @@ class RemoteExecutor(BaseExecutor):
             try:
                 cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
-                logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.")
+                logger.info(f"{self.bundle_prefix(bundle)}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.")
+                logger.info(f"{self.bundle_prefix(bundle)}: Copying done to {worker} in {xfer_latency:.1f}s.")
             except Exception as e:
                 logger.exception(e)
                 logger.error(
-                    f'{uuid}/{fname}: failed to send instructions to worker machine?!?'
+                    f'{self.bundle_prefix(bundle)}: failed to send instructions to worker machine?!?'
                 )
                 assert bundle.worker is not None
                 self.status.record_release_worker(
@@ -697,10 +718,10 @@ class RemoteExecutor(BaseExecutor):
                f'"source py39-venv/bin/activate &&'
                f' /home/scott/lib/python_modules/remote_worker.py'
                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
-        logger.debug(f'{uuid}/{fname}: Executing {cmd} in the background to kick off work...')
+        logger.debug(f'{self.bundle_prefix(bundle)}: Executing {cmd} in the background to kick off work...')
         p = cmd_in_background(cmd, silent=True)
         bundle.pid = pid = p.pid
-        logger.debug(f'{uuid}/{fname}: Local ssh process pid={pid}; remote worker is {machine}.')
+        logger.debug(f'{self.bundle_prefix(bundle)}: Local ssh process pid={pid}; remote worker is {machine}.')
         return self.wait_for_process(p, bundle, 0)
 
     def wait_for_process(self, p: subprocess.Popen, bundle: BundleDetails, depth: int) -> Any:
@@ -732,12 +753,12 @@ class RemoteExecutor(BaseExecutor):
                 self.heartbeat()
                 if self.check_if_cancelled(bundle):
                     logger.info(
-                        f'{uuid}/{fname}: another worker finished bundle, checking it out...'
+                        f'{self.bundle_prefix(bundle)}: another worker finished bundle, checking it out...'
                     )
                     break
             else:
                 logger.info(
-                    f"{uuid}/{fname}: pid {pid} ({machine}) our ssh finished, checking it out..."
+                    f"{self.bundle_prefix(bundle)}: pid {pid} ({machine}) our ssh finished, checking it out..."
                 )
                 p = None
                 break
@@ -759,10 +780,10 @@ class RemoteExecutor(BaseExecutor):
         # Otherwise, time for an emergency reschedule.
         except Exception as e:
             logger.exception(e)
-            logger.error(f'{uuid}/{fname}: Something unexpected just happened...')
+            logger.error(f'{self.bundle_prefix(bundle)}: Something unexpected just happened...')
             if p is not None:
                 logger.warning(
-                    f"{uuid}/{fname}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+                    f"{self.bundle_prefix(bundle)}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
                 )
                 return self.wait_for_process(p, bundle, depth + 1)
             else:
@@ -795,7 +816,7 @@ class RemoteExecutor(BaseExecutor):
                 if bundle.hostname not in bundle.machine:
                     cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
-                        f"{uuid}/{fname}: Fetching results from {username}@{machine} via {cmd}"
+                        f"{self.bundle_prefix(bundle)}: Fetching results from {username}@{machine} via {cmd}"
                     )
 
                     # If either of these throw they are handled in
@@ -813,7 +834,7 @@ class RemoteExecutor(BaseExecutor):
         # if one of the backups finished first; it still must read the
         # result from disk.
         if is_original:
-            logger.debug(f"{uuid}/{fname}: Unpickling {result_file}.")
+            logger.debug(f"{self.bundle_prefix(bundle)}: Unpickling {result_file}.")
             try:
                 with open(f'{result_file}', 'rb') as rb:
                     serialized = rb.read()
@@ -845,7 +866,7 @@ class RemoteExecutor(BaseExecutor):
             if bundle.backup_bundles is not None:
                 for backup in bundle.backup_bundles:
                     logger.debug(
-                        f'{uuid}/{fname}: Notifying backup {backup.uuid} that it\'s cancelled'
+                        f'{self.bundle_prefix(bundle)}: Notifying backup {backup.uuid} that it\'s cancelled'
                     )
                     backup.is_cancelled.set()
 
@@ -860,7 +881,7 @@ class RemoteExecutor(BaseExecutor):
             # Tell the original to stop if we finished first.
             if not was_cancelled:
                 logger.debug(
-                    f'{uuid}/{fname}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
+                    f'{self.bundle_prefix(bundle)}: Notifying original {bundle.src_bundle.uuid} we beat them to it.'
                 )
                 bundle.src_bundle.is_cancelled.set()
 
@@ -906,7 +927,7 @@ class RemoteExecutor(BaseExecutor):
             failure_count = 0,
         )
         self.status.record_bundle_details(bundle)
-        logger.debug(f'{uuid}/{fname}: Created original bundle')
+        logger.debug(f'{self.bundle_prefix(bundle)}: Created an original bundle')
         return bundle
 
     def create_backup_bundle(self, src_bundle: BundleDetails):
@@ -937,7 +958,7 @@ class RemoteExecutor(BaseExecutor):
         )
         src_bundle.backup_bundles.append(backup_bundle)
         self.status.record_bundle_details_already_locked(backup_bundle)
-        logger.debug(f'{uuid}/{src_bundle.fname}: Created backup bundle')
+        logger.debug(f'{self.bundle_prefix(bundle)}: Created a backup bundle')
         return backup_bundle
 
     def schedule_backup_for_bundle(self,
@@ -968,18 +989,18 @@ class RemoteExecutor(BaseExecutor):
 
         if bundle.failure_count > retry_limit:
             logger.error(
-                f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.'
+                f'{self.bundle_prefix(bundle)}: Tried this bundle too many times already ({retry_limit}x); giving up.'
             )
             if is_original:
                 raise RemoteExecutorException(
-                    f'{uuid}: This bundle can\'t be completed despite several backups and retries'
+                    f'{self.bundle_prefix(bundle)}: This bundle can\'t be completed despite several backups and retries'
                 )
             else:
-                logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')
+                logger.error(f'{self.bundle_prefix(bundle)}: At least it\'s only a backup; better luck with the others.')
             return None
         else:
             logger.warning(
-                f'>>> Emergency rescheduling {uuid} because of unexected errors (wtf?!) <<<'
+                f'>>> Emergency rescheduling {self.bundle_prefix(bundle)} because of unexected errors (wtf?!) <<<'
             )
             return self.launch(bundle, avoid_last_machine)
 
@@ -1009,12 +1030,14 @@ class DefaultExecutors(object):
 
     def ping(self, host) -> bool:
         logger.debug(f'RUN> ping -c 1 {host}')
-        command = ['ping', '-c', '1', host]
-        return subprocess.call(
-            command,
-            stdout=subprocess.DEVNULL,
-            stderr=subprocess.DEVNULL,
-        ) == 0
+        try:
+            x = cmd_with_timeout(
+                f'ping -c 1 {host} >/dev/null 2>/dev/null',
+                timeout_seconds=1.0
+            )
+            return x == 0
+        except Exception:
+            return False
 
     def thread_pool(self) -> ThreadExecutor:
         if self.thread_executor is None:
index 84f8d56fa33318b507f3f11618c663861718182b..bf8de6c66a36767ac267cfdd2bffe38317cbace0 100755 (executable)
@@ -6,6 +6,7 @@ results.
 
 import logging
 import os
+import platform
 import signal
 import threading
 import sys
@@ -50,16 +51,23 @@ cfg.add_argument(
 
 @background_thread
 def watch_for_cancel(terminate_event: threading.Event) -> None:
+    if platform.node() == 'VIDEO-COMPUTER':
+        logger.warning('Background thread not allowed on retarded computers, sorry.')
+        return
+    logger.debug('Starting up background thread...')
     p = psutil.Process(os.getpid())
     while True:
         saw_sshd = False
         ancestors = p.parents()
         for ancestor in ancestors:
             name = ancestor.name()
+            pid = ancestor.pid
+            logger.debug(f'Ancestor process {name} (pid={pid})')
             if 'ssh' in name.lower():
                 saw_sshd = True
                 break
         if not saw_sshd:
+            logger.error('Did not see sshd in our ancestors list?!  Committing suicide.')
             os.system('pstree')
             os.kill(os.getpid(), signal.SIGTERM)
             time.sleep(5.0)
@@ -75,6 +83,8 @@ def main() -> None:
     in_file = config.config['code_file']
     out_file = config.config['result_file']
 
+    (thread, stop_thread) = watch_for_cancel()
+
     logger.debug(f'Reading {in_file}.')
     try:
         with open(in_file, 'rb') as rb:
@@ -82,6 +92,7 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical(f'Problem reading {in_file}.  Aborting.')
+        stop_thread.set()
         sys.exit(-1)
 
     logger.debug(f'Deserializing {in_file}.')
@@ -90,6 +101,7 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical(f'Problem deserializing {in_file}.  Aborting.')
+        stop_thread.set()
         sys.exit(-1)
 
     logger.debug('Invoking user code...')
@@ -104,6 +116,7 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical(f'Could not serialize result ({type(ret)}).  Aborting.')
+        stop_thread.set()
         sys.exit(-1)
 
     logger.debug(f'Writing {out_file}.')
@@ -113,8 +126,12 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical(f'Error writing {out_file}.  Aborting.')
+        stop_thread.set()
         sys.exit(-1)
 
+    stop_thread.set()
+    thread.join()
+
 
 if __name__ == '__main__':
     main()