results.
"""
+import logging
import os
import platform
import signal
-import sys
import threading
+import sys
import time
import cloudpickle # type: ignore
import psutil # type: ignore
+import argparse_utils
import bootstrap
import config
from thread_utils import background_thread
+logger = logging.getLogger(__file__)
+
cfg = config.add_commandline_args(
f"Remote Worker ({__file__})",
"Helper to run pickled code remotely and return results",
metavar='FILENAME',
help='The location where we should write the computation results.'
)
+cfg.add_argument(
+ '--watch_for_cancel',
+ action=argparse_utils.ActionNoYes,
+ default=False,
+ help='Should we watch for the cancellation of our parent ssh process?'
+)
@background_thread
def watch_for_cancel(terminate_event: threading.Event) -> None:
+ if platform.node() == 'VIDEO-COMPUTER':
+ logger.warning('Background thread not allowed on retarded computers, sorry.')
+ return
+ logger.debug('Starting up background thread...')
p = psutil.Process(os.getpid())
while True:
saw_sshd = False
ancestors = p.parents()
for ancestor in ancestors:
name = ancestor.name()
- if 'ssh' in name or 'Ssh' in name:
+ pid = ancestor.pid
+ logger.debug(f'Ancestor process {name} (pid={pid})')
+ if 'ssh' in name.lower():
saw_sshd = True
break
-
if not saw_sshd:
+ logger.error('Did not see sshd in our ancestors list?! Committing suicide.')
os.system('pstree')
os.kill(os.getpid(), signal.SIGTERM)
+ time.sleep(5.0)
+ os.kill(os.getpid(), signal.SIGKILL)
+ sys.exit(-1)
if terminate_event.is_set():
return
time.sleep(1.0)
@bootstrap.initialize
def main() -> None:
- hostname = platform.node()
-
- # Windows-Linux is retarded.
- if hostname != 'VIDEO-COMPUTER':
- (thread, terminate_event) = watch_for_cancel()
-
in_file = config.config['code_file']
out_file = config.config['result_file']
- with open(in_file, 'rb') as rb:
- serialized = rb.read()
-
- fun, args, kwargs = cloudpickle.loads(serialized)
+ (thread, stop_thread) = watch_for_cancel()
+
+ logger.debug(f'Reading {in_file}.')
+ try:
+ with open(in_file, 'rb') as rb:
+ serialized = rb.read()
+ except Exception as e:
+ logger.exception(e)
+ logger.critical(f'Problem reading {in_file}. Aborting.')
+ stop_thread.set()
+ sys.exit(-1)
+
+ logger.debug(f'Deserializing {in_file}.')
+ try:
+ fun, args, kwargs = cloudpickle.loads(serialized)
+ except Exception as e:
+ logger.exception(e)
+ logger.critical(f'Problem deserializing {in_file}. Aborting.')
+ stop_thread.set()
+ sys.exit(-1)
+
+ logger.debug('Invoking user code...')
+ start = time.time()
ret = fun(*args, **kwargs)
-
- serialized = cloudpickle.dumps(ret)
- with open(out_file, 'wb') as wb:
- wb.write(serialized)
-
- # Windows-Linux is retarded.
- if hostname != 'VIDEO-COMPUTER':
- terminate_event.set()
- thread.join()
- sys.exit(0)
+ end = time.time()
+ logger.debug(f'User code took {end - start:.1f}s')
+
+ logger.debug('Serializing results')
+ try:
+ serialized = cloudpickle.dumps(ret)
+ except Exception as e:
+ logger.exception(e)
+ logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
+ stop_thread.set()
+ sys.exit(-1)
+
+ logger.debug(f'Writing {out_file}.')
+ try:
+ with open(out_file, 'wb') as wb:
+ wb.write(serialized)
+ except Exception as e:
+ logger.exception(e)
+ logger.critical(f'Error writing {out_file}. Aborting.')
+ stop_thread.set()
+ sys.exit(-1)
+
+ stop_thread.set()
+ thread.join()
if __name__ == '__main__':