X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=cdbb811a6acd2cb9af12fefe49c61b65adba5ad0;hb=5f75cf834725ac26b289cc5f157af0cb71cd5f0e;hp=92c5b3464154259fe4753702ebbd98fc84db11c9;hpb=b29be4f1750fd20bd2eada88e751dfae85817882;p=python_utils.git diff --git a/executors.py b/executors.py index 92c5b34..cdbb811 100644 --- a/executors.py +++ b/executors.py @@ -745,10 +745,6 @@ class RemoteExecutor(BaseExecutor): xfer_latency = time.time() - start_ts logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.") except Exception as e: - logger.exception(e) - logger.error( - f'{bundle}: failed to send instructions to worker machine?!?' - ) assert bundle.worker is not None self.status.record_release_worker( bundle.worker, @@ -760,19 +756,30 @@ class RemoteExecutor(BaseExecutor): if is_original: # Weird. We tried to copy the code to the worker and it failed... # And we're the original bundle. We have to retry. + logger.exception(e) + logger.error( + f'{bundle}: Failed to send instructions to the worker machine?! ' + + 'This is not expected; we\'re the original bundle so this shouldn\'t ' + + 'be a race condition. Attempting an emergency retry...' + ) return self.emergency_retry_nasty_bundle(bundle) else: # This is actually expected; we're a backup. # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. + logger.warning( + f'{bundle}: Failed to send instructions to the worker machine... ' + + 'We\'re a backup and this may be caused by the original (or some ' + + 'other backup) already finishing this work. Ignoring this.' + ) return None # Kick off the work. Note that if this fails we let # wait_for_process deal with it. self.status.record_processing_began(uuid) cmd = (f'{SSH} {bundle.username}@{bundle.machine} ' - f'"source py39-venv/bin/activate &&' + f'"source py38-venv/bin/activate &&' f' /home/scott/lib/python_modules/remote_worker.py' f' --code_file {bundle.code_file} --result_file {bundle.result_file}"') logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...') @@ -889,7 +896,7 @@ class RemoteExecutor(BaseExecutor): if is_original: logger.debug(f"{bundle}: Unpickling {result_file}.") try: - with open(f'{result_file}', 'rb') as rb: + with open(result_file, 'rb') as rb: serialized = rb.read() result = cloudpickle.loads(serialized) except Exception as e: @@ -1112,30 +1119,10 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'cheetah.house', - weight = 14, + weight = 25, count = 6, ), ) - if self.ping('video.house'): - logger.info('Found video.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'video.house', - weight = 1, - count = 4, - ), - ) - if self.ping('gorilla.house'): - logger.info('Found gorilla.house') - pool.append( - RemoteWorkerRecord( - username = 'scott', - machine = 'gorilla.house', - weight = 2, - count = 4, - ), - ) if self.ping('meerkat.cabin'): logger.info('Found meerkat.cabin') pool.append( @@ -1146,16 +1133,16 @@ class DefaultExecutors(object): count = 2, ), ) - if self.ping('kiosk.house'): - logger.info('Found kiosk.house') - pool.append( - RemoteWorkerRecord( - username = 'pi', - machine = 'kiosk.house', - weight = 1, - count = 2, - ), - ) + # if self.ping('kiosk.house'): + # logger.info('Found kiosk.house') + # pool.append( + # RemoteWorkerRecord( + # username = 'pi', + # machine = 'kiosk.house', + # weight = 1, + # count = 2, + # ), + # ) if self.ping('hero.house'): logger.info('Found hero.house') pool.append( @@ -1172,18 +1159,18 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'puma.cabin', - weight = 12, + weight = 25, count = 6, ), ) - if self.ping('puma.house'): - logger.info('Found puma.house') + if self.ping('backup.house'): + logger.info('Found backup.house') pool.append( RemoteWorkerRecord( username = 'scott', - machine = 'puma.house', - weight = 12, - count = 6, + machine = 'backup.house', + weight = 3, + count = 2, ), )