Make rate_limited work and add doctest.
[python_utils.git] / executors.py
index c11bd546cc3b1b0afcefff8c3215aab2707db147..cdbb811a6acd2cb9af12fefe49c61b65adba5ad0 100644 (file)
@@ -745,10 +745,6 @@ class RemoteExecutor(BaseExecutor):
                 xfer_latency = time.time() - start_ts
                 logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
             except Exception as e:
-                logger.exception(e)
-                logger.error(
-                    f'{bundle}: failed to send instructions to worker machine?!?'
-                )
                 assert bundle.worker is not None
                 self.status.record_release_worker(
                     bundle.worker,
@@ -760,19 +756,30 @@ class RemoteExecutor(BaseExecutor):
                 if is_original:
                     # Weird.  We tried to copy the code to the worker and it failed...
                     # And we're the original bundle.  We have to retry.
+                    logger.exception(e)
+                    logger.error(
+                        f'{bundle}: Failed to send instructions to the worker machine?! ' +
+                        'This is not expected; we\'re the original bundle so this shouldn\'t ' +
+                        'be a race condition.  Attempting an emergency retry...'
+                    )
                     return self.emergency_retry_nasty_bundle(bundle)
                 else:
                     # This is actually expected; we're a backup.
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                     # code file before we could copy it.  No biggie.
+                    logger.warning(
+                        f'{bundle}: Failed to send instructions to the worker machine... ' +
+                        'We\'re a backup and this may be caused by the original (or some ' +
+                        'other backup) already finishing this work.  Ignoring this.'
+                    )
                     return None
 
         # Kick off the work.  Note that if this fails we let
         # wait_for_process deal with it.
         self.status.record_processing_began(uuid)
         cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
-               f'"source py39-venv/bin/activate &&'
+               f'"source py38-venv/bin/activate &&'
                f' /home/scott/lib/python_modules/remote_worker.py'
                f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
         logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
@@ -889,7 +896,7 @@ class RemoteExecutor(BaseExecutor):
         if is_original:
             logger.debug(f"{bundle}: Unpickling {result_file}.")
             try:
-                with open(f'{result_file}', 'rb') as rb:
+                with open(result_file, 'rb') as rb:
                     serialized = rb.read()
                 result = cloudpickle.loads(serialized)
             except Exception as e:
@@ -1112,28 +1119,8 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username = 'scott',
                         machine = 'cheetah.house',
-                        weight = 14,
-                        count = 4,
-                    ),
-                )
-            if self.ping('video.house'):
-                logger.info('Found video.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'video.house',
-                        weight = 1,
-                        count = 4,
-                    ),
-                )
-            if self.ping('wannabe.house'):
-                logger.info('Found wannabe.house')
-                pool.append(
-                    RemoteWorkerRecord(
-                        username = 'scott',
-                        machine = 'wannabe.house',
-                        weight = 2,
-                        count = 4,
+                        weight = 25,
+                        count = 6,
                     ),
                 )
             if self.ping('meerkat.cabin'):
@@ -1146,14 +1133,24 @@ class DefaultExecutors(object):
                         count = 2,
                     ),
                 )
-            if self.ping('kiosk.house'):
-                logger.info('Found kiosk.house')
+            # if self.ping('kiosk.house'):
+            #     logger.info('Found kiosk.house')
+            #     pool.append(
+            #         RemoteWorkerRecord(
+            #             username = 'pi',
+            #             machine = 'kiosk.house',
+            #             weight = 1,
+            #             count = 2,
+            #         ),
+            #     )
+            if self.ping('hero.house'):
+                logger.info('Found hero.house')
                 pool.append(
                     RemoteWorkerRecord(
-                        username = 'pi',
-                        machine = 'kiosk.house',
-                        weight = 1,
-                        count = 2,
+                        username = 'scott',
+                        machine = 'hero.house',
+                        weight = 30,
+                        count = 10,
                     ),
                 )
             if self.ping('puma.cabin'):
@@ -1162,8 +1159,18 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username = 'scott',
                         machine = 'puma.cabin',
-                        weight = 12,
-                        count = 4,
+                        weight = 25,
+                        count = 6,
+                    ),
+                )
+            if self.ping('backup.house'):
+                logger.info('Found backup.house')
+                pool.append(
+                    RemoteWorkerRecord(
+                        username = 'scott',
+                        machine = 'backup.house',
+                        weight = 3,
+                        count = 2,
                     ),
                 )