X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=6ccd7b675c760315d05158d68dad0768cc0f0871;hb=f33ea8b6b0f7dd862ae24f49a7e834c2bbb3d4dd;hp=c11bd546cc3b1b0afcefff8c3215aab2707db147;hpb=ed8fa2b10b0177b15b7423263bdd390efde2f0c8;p=python_utils.git diff --git a/executors.py b/executors.py index c11bd54..6ccd7b6 100644 --- a/executors.py +++ b/executors.py @@ -15,6 +15,7 @@ import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set +import warnings import cloudpickle # type: ignore from overrides import overrides @@ -490,7 +491,8 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -525,7 +527,8 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): if x >= len(self.workers): x = 0 if x == self.index: - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -745,10 +748,6 @@ class RemoteExecutor(BaseExecutor): xfer_latency = time.time() - start_ts logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: - logger.exception(e) - logger.error( - f'{bundle}: failed to send instructions to worker machine?!?' - ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, @@ -760,19 +759,29 @@ class RemoteExecutor(BaseExecutor): if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. + logger.exception(e) + logger.error( + f'{bundle}: Failed to send instructions to the worker machine?! ' + + 'This is not expected; we\'re the original bundle so this shouldn\'t ' + + 'be a race condition. Attempting an emergency retry...' + ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. + msg = f'{bundle}: Failed to send instructions to the worker machine... ' + msg += 'We\'re a backup and this may be caused by the original (or some ' + msg += 'other backup) already finishing this work. Ignoring this.' + logger.warning(msg) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' - f'"source py39-venv/bin/activate &&' + f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') @@ -837,9 +846,8 @@ class RemoteExecutor(BaseExecutor): logger.exception(e) logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: - logger.warning( - f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." - ) + msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + logger.warning(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( @@ -889,7 +897,7 @@ class RemoteExecutor(BaseExecutor): if is_original: logger.debug(f"{bundle}: Unpickling {result_file}.") try: - with open(f'{result_file}', 'rb') as rb: + with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: @@ -950,7 +958,7 @@ class RemoteExecutor(BaseExecutor): def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid - uuid = generate_uuid(as_hex=True) + uuid = generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' @@ -1052,9 +1060,9 @@ class RemoteExecutor(BaseExecutor): logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.') return None else: - logger.warning( - f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' - ) + msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' + logger.warning(msg) + warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides @@ -1112,28 +1120,8 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', - weight = 14, - count = 4, - ), - ) - if self.ping('video.house'): - logger.info('Found video.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'video.house', - weight = 1, - count = 4, - ), - ) - if self.ping('wannabe.house'): - logger.info('Found wannabe.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'wannabe.house', - weight = 2, - count = 4, + weight = 25, + count = 6, ), ) if self.ping('meerkat.cabin'): @@ -1146,14 +1134,24 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('kiosk.house'): - logger.info('Found kiosk.house') + # if self.ping('kiosk.house'): + # logger.info('Found kiosk.house') + # pool.append( + # RemoteWorkerRecord( + # username = 'pi', + # machine = 'kiosk.house', + # weight = 1, + # count = 2, + # ), + # ) + if self.ping('hero.house'): + logger.info('Found hero.house') pool.append( RemoteWorkerRecord( - username = 'pi', - machine = 'kiosk.house', - weight = 1, - count = 2, + username = 'scott', + machine = 'hero.house', + weight = 30, + count = 10, ), ) if self.ping('puma.cabin'): @@ -1162,8 +1160,18 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', - weight = 12, - count = 4, + weight = 25, + count = 6, + ), + ) + if self.ping('backup.house'): + logger.info('Found backup.house') + pool.append( + RemoteWorkerRecord( + username = 'scott', + machine = 'backup.house', + weight = 3, + count = 2, ), )