import threading
import time
from typing import Any, Callable, Dict, List, Optional, Set
+import warnings
import cloudpickle # type: ignore
from overrides import overrides
worker.count -= 1
logger.debug(f'Selected worker {worker}')
return worker
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
return None
if x >= len(self.workers):
x = 0
if x == self.index:
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
return None
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
- logger.warning(
- f'{bundle}: Failed to send instructions to the worker machine... ' +
- 'We\'re a backup and this may be caused by the original (or some ' +
- 'other backup) already finishing this work. Ignoring this.'
- )
+ msg = f'{bundle}: Failed to send instructions to the worker machine... '
+ msg += 'We\'re a backup and this may be caused by the original (or some '
+ msg += 'other backup) already finishing this work. Ignoring this.'
+ logger.warning(msg)
return None
# Kick off the work. Note that if this fails we let
logger.exception(e)
logger.error(f'{bundle}: Something unexpected just happened...')
if p is not None:
- logger.warning(
- f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- )
+ msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+ logger.warning(msg)
return self.wait_for_process(p, bundle, depth + 1)
else:
self.status.record_release_worker(
def create_original_bundle(self, pickle, fname: str):
from string_utils import generate_uuid
- uuid = generate_uuid(as_hex=True)
+ uuid = generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
return None
else:
- logger.warning(
- f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
- )
+ msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
+ logger.warning(msg)
+ warnings.warn(msg)
return self.launch(bundle, avoid_last_machine)
@overrides