import threading
import time
from typing import Any, Callable, Dict, List, Optional, Set
+import warnings
import cloudpickle # type: ignore
from overrides import overrides
help='Maximum number of failures before giving up on a bundle',
)
-RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
-SSH = 'ssh -oForwardX11=no'
-
+SSH = '/usr/bin/ssh -oForwardX11=no'
+SCP = '/usr/bin/scp'
def make_cloud_pickle(fun, *args, **kwargs):
logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
worker.count -= 1
logger.debug(f'Selected worker {worker}')
return worker
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
return None
if x >= len(self.workers):
x = 0
if x == self.index:
- logger.warning("Couldn't find a worker; go fish.")
+ msg = 'Unexpectedly could not find a worker, retrying...'
+ logger.warning(msg)
return None
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
xfer_latency = time.time() - start_ts
logger.info(f"{bundle}: Copying done to {worker} in {xfer_latency:.1f}s.")
except Exception as e:
- logger.exception(e)
- logger.error(
- f'{bundle}: failed to send instructions to worker machine?!?'
- )
assert bundle.worker is not None
self.status.record_release_worker(
bundle.worker,
if is_original:
# Weird. We tried to copy the code to the worker and it failed...
# And we're the original bundle. We have to retry.
+ logger.exception(e)
+ logger.error(
+ f'{bundle}: Failed to send instructions to the worker machine?! ' +
+ 'This is not expected; we\'re the original bundle so this shouldn\'t ' +
+ 'be a race condition. Attempting an emergency retry...'
+ )
return self.emergency_retry_nasty_bundle(bundle)
else:
# This is actually expected; we're a backup.
# There's a race condition where someone else
# already finished the work and removed the source
# code file before we could copy it. No biggie.
+ msg = f'{bundle}: Failed to send instructions to the worker machine... '
+ msg += 'We\'re a backup and this may be caused by the original (or some '
+ msg += 'other backup) already finishing this work. Ignoring this.'
+ logger.warning(msg)
return None
# Kick off the work. Note that if this fails we let
# wait_for_process deal with it.
self.status.record_processing_began(uuid)
cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
- f'"source py39-venv/bin/activate &&'
+ f'"source py38-venv/bin/activate &&'
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
logger.exception(e)
logger.error(f'{bundle}: Something unexpected just happened...')
if p is not None:
- logger.warning(
- f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
- )
+ msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+ logger.warning(msg)
return self.wait_for_process(p, bundle, depth + 1)
else:
self.status.record_release_worker(
if not was_cancelled:
assert bundle.machine is not None
if bundle.hostname not in bundle.machine:
- cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
+ cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
)
if is_original:
logger.debug(f"{bundle}: Unpickling {result_file}.")
try:
- with open(f'{result_file}', 'rb') as rb:
+ with open(result_file, 'rb') as rb:
serialized = rb.read()
result = cloudpickle.loads(serialized)
except Exception as e:
def create_original_bundle(self, pickle, fname: str):
from string_utils import generate_uuid
- uuid = generate_uuid(as_hex=True)
+ uuid = generate_uuid(omit_dashes=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
return None
else:
- logger.warning(
- f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
- )
+ msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
+ logger.warning(msg)
+ warnings.warn(msg)
return self.launch(bundle, avoid_last_machine)
@overrides
RemoteWorkerRecord(
username = 'scott',
machine = 'cheetah.house',
- weight = 14,
- count = 4,
- ),
- )
- if self.ping('video.house'):
- logger.info('Found video.house')
- pool.append(
- RemoteWorkerRecord(
- username = 'scott',
- machine = 'video.house',
- weight = 1,
- count = 4,
- ),
- )
- if self.ping('wannabe.house'):
- logger.info('Found wannabe.house')
- pool.append(
- RemoteWorkerRecord(
- username = 'scott',
- machine = 'wannabe.house',
- weight = 2,
- count = 4,
+ weight = 25,
+ count = 6,
),
)
if self.ping('meerkat.cabin'):
count = 2,
),
)
- if self.ping('kiosk.house'):
- logger.info('Found kiosk.house')
+ # if self.ping('kiosk.house'):
+ # logger.info('Found kiosk.house')
+ # pool.append(
+ # RemoteWorkerRecord(
+ # username = 'pi',
+ # machine = 'kiosk.house',
+ # weight = 1,
+ # count = 2,
+ # ),
+ # )
+ if self.ping('hero.house'):
+ logger.info('Found hero.house')
pool.append(
RemoteWorkerRecord(
- username = 'pi',
- machine = 'kiosk.house',
- weight = 1,
- count = 2,
+ username = 'scott',
+ machine = 'hero.house',
+ weight = 30,
+ count = 10,
),
)
if self.ping('puma.cabin'):
RemoteWorkerRecord(
username = 'scott',
machine = 'puma.cabin',
- weight = 12,
- count = 4,
+ weight = 25,
+ count = 6,
+ ),
+ )
+ if self.ping('backup.house'):
+ logger.info('Found backup.house')
+ pool.append(
+ RemoteWorkerRecord(
+ username = 'scott',
+ machine = 'backup.house',
+ weight = 3,
+ count = 2,
),
)