X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=cdbb811a6acd2cb9af12fefe49c61b65adba5ad0;hb=e8671a716da868332d3ac1f66d4d2f7f8d33fc28;hp=c11bd546cc3b1b0afcefff8c3215aab2707db147;hpb=ed8fa2b10b0177b15b7423263bdd390efde2f0c8;p=python_utils.git diff --git a/executors.py b/executors.py index c11bd54..cdbb811 100644 --- a/executors.py +++ b/executors.py @@ -745,10 +745,6 @@ class RemoteExecutor(BaseExecutor): xfer_latency = time.time() - start_ts logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: - logger.exception(e) - logger.error( - f'{bundle}: failed to send instructions to worker machine?!?' - ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, @@ -760,19 +756,30 @@ class RemoteExecutor(BaseExecutor): if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. + logger.exception(e) + logger.error( + f'{bundle}: Failed to send instructions to the worker machine?! ' + + 'This is not expected; we\'re the original bundle so this shouldn\'t ' + + 'be a race condition. Attempting an emergency retry...' + ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. + logger.warning( + f'{bundle}: Failed to send instructions to the worker machine... ' + + 'We\'re a backup and this may be caused by the original (or some ' + + 'other backup) already finishing this work. Ignoring this.' + ) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' - f'"source py39-venv/bin/activate &&' + f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') @@ -889,7 +896,7 @@ class RemoteExecutor(BaseExecutor): if is_original: logger.debug(f"{bundle}: Unpickling {result_file}.") try: - with open(f'{result_file}', 'rb') as rb: + with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: @@ -1112,28 +1119,8 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', - weight = 14, - count = 4, - ), - ) - if self.ping('video.house'): - logger.info('Found video.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'video.house', - weight = 1, - count = 4, - ), - ) - if self.ping('wannabe.house'): - logger.info('Found wannabe.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'wannabe.house', - weight = 2, - count = 4, + weight = 25, + count = 6, ), ) if self.ping('meerkat.cabin'): @@ -1146,14 +1133,24 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('kiosk.house'): - logger.info('Found kiosk.house') + # if self.ping('kiosk.house'): + # logger.info('Found kiosk.house') + # pool.append( + # RemoteWorkerRecord( + # username = 'pi', + # machine = 'kiosk.house', + # weight = 1, + # count = 2, + # ), + # ) + if self.ping('hero.house'): + logger.info('Found hero.house') pool.append( RemoteWorkerRecord( - username = 'pi', - machine = 'kiosk.house', - weight = 1, - count = 2, + username = 'scott', + machine = 'hero.house', + weight = 30, + count = 10, ), ) if self.ping('puma.cabin'): @@ -1162,8 +1159,18 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', - weight = 12, - count = 4, + weight = 25, + count = 6, + ), + ) + if self.ping('backup.house'): + logger.info('Found backup.house') + pool.append( + RemoteWorkerRecord( + username = 'scott', + machine = 'backup.house', + weight = 3, + count = 2, ), )