Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / executors.py
index 1077667147051e427b5fad045774e32c55d1a142..e07933f454909d5a543e340320a578aab528d9ad 100644 (file)
@@ -1,6 +1,8 @@
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
+# © Copyright 2021-2022, Scott Gasch
+
 """Defines three executors: a thread executor for doing work using a
 threadpool, a process executor for doing work in other processes on
 the same machine and a remote executor for farming out work to other
@@ -31,6 +33,7 @@ from overrides import overrides
 import argparse_utils
 import config
 import histogram as hist
+import string_utils
 from ansi import bg, fg, reset, underline
 from decorator_utils import singleton
 from exec_utils import cmd_in_background, cmd_with_timeout, run_silently
@@ -619,8 +622,8 @@ class RemoteExecutor(BaseExecutor):
         num_idle_workers = self.worker_count - self.task_count
         now = time.time()
         if (
-            num_done > 2
-            and num_idle_workers > 1
+            num_done >= 2
+            and num_idle_workers > 0
             and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
@@ -753,6 +756,7 @@ class RemoteExecutor(BaseExecutor):
 
     def launch(self, bundle: BundleDetails, override_avoid_machine=None) -> Any:
         """Find a worker for bundle or block until one is available."""
+
         self.adjust_task_count(+1)
         uuid = bundle.uuid
         hostname = bundle.hostname
@@ -800,13 +804,10 @@ class RemoteExecutor(BaseExecutor):
                     )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
-                    # Expected(?).  We're a backup and our bundle is
-                    # cancelled before we even got started.  Something
-                    # went bad in process_work_result (I acutually don't
-                    # see what?) but probably not worth worrying
-                    # about.  Let the original thread worry about
-                    # either finding the results or complaining about
-                    # it.
+                    # We're a backup and our bundle is cancelled
+                    # before we even got started.  Do nothing and let
+                    # the original bundle's thread worry about either
+                    # finding the results or complaining about it.
                     return None
 
         # Send input code / data to worker machine if it's not local.
@@ -821,8 +822,9 @@ class RemoteExecutor(BaseExecutor):
             except Exception as e:
                 self.release_worker(bundle)
                 if is_original:
-                    # Weird.  We tried to copy the code to the worker and it failed...
-                    # And we're the original bundle.  We have to retry.
+                    # Weird.  We tried to copy the code to the worker
+                    # and it failed...  And we're the original bundle.
+                    # We have to retry.
                     logger.exception(e)
                     logger.error(
                         "%s: Failed to send instructions to the worker machine?! "
@@ -835,7 +837,7 @@ class RemoteExecutor(BaseExecutor):
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
-                    # code file before we could copy it.  No biggie.
+                    # code_file before we could copy it.  Ignore.
                     logger.warning(
                         '%s: Failed to send instructions to the worker machine... '
                         'We\'re a backup and this may be caused by the original (or '
@@ -953,6 +955,7 @@ class RemoteExecutor(BaseExecutor):
                         else:
                             break
 
+                    # Cleanup remote /tmp files.
                     run_silently(
                         f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
                     )
@@ -965,7 +968,8 @@ class RemoteExecutor(BaseExecutor):
         # original is also the only job that may delete result_file
         # from disk.  Note that the original may have been cancelled
         # if one of the backups finished first; it still must read the
-        # result from disk.
+        # result from disk.  It still does that here with is_cancelled
+        # set.
         if is_original:
             logger.debug("%s: Unpickling %s.", bundle, result_file)
             try:
@@ -1015,9 +1019,7 @@ class RemoteExecutor(BaseExecutor):
         return result
 
     def create_original_bundle(self, pickle, fname: str):
-        from string_utils import generate_uuid
-
-        uuid = generate_uuid(omit_dashes=True)
+        uuid = string_utils.generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
@@ -1051,6 +1053,7 @@ class RemoteExecutor(BaseExecutor):
         return bundle
 
     def create_backup_bundle(self, src_bundle: BundleDetails):
+        assert self.status.lock.locked()
         assert src_bundle.backup_bundles is not None
         n = len(src_bundle.backup_bundles)
         uuid = src_bundle.uuid + f'_backup#{n}'
@@ -1092,7 +1095,8 @@ class RemoteExecutor(BaseExecutor):
 
         # Results from backups don't matter; if they finish first
         # they will move the result_file to this machine and let
-        # the original pick them up and unpickle them.
+        # the original pick them up and unpickle them (and return
+        # a result).
 
     def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
         is_original = bundle.src_bundle is None
@@ -1191,7 +1195,7 @@ class DefaultExecutors(object):
                         username='scott',
                         machine='cheetah.house',
                         weight=24,
-                        count=6,
+                        count=5,
                     ),
                 )
             if self.ping('meerkat.cabin'):
@@ -1211,7 +1215,7 @@ class DefaultExecutors(object):
                         username='scott',
                         machine='wannabe.house',
                         weight=14,
-                        count=8,
+                        count=2,
                     ),
                 )
             if self.ping('puma.cabin'):
@@ -1221,7 +1225,7 @@ class DefaultExecutors(object):
                         username='scott',
                         machine='puma.cabin',
                         weight=24,
-                        count=6,
+                        count=5,
                     ),
                 )
             if self.ping('backup.house'):
@@ -1239,7 +1243,7 @@ class DefaultExecutors(object):
             for record in pool:
                 if record.machine == platform.node() and record.count > 1:
                     logger.info('Reducing workload for %s.', record.machine)
-                    record.count = 1
+                    record.count = max(int(record.count / 2), 1)
 
             policy = WeightedRandomRemoteWorkerSelectionPolicy()
             policy.register_worker_pool(pool)