3 """A simple utility to unpickle some code, run it, and pickle the
15 import cloudpickle # type: ignore
16 import psutil # type: ignore
21 from thread_utils import background_thread
24 logger = logging.getLogger(__file__)
26 cfg = config.add_commandline_args(
27 f"Remote Worker ({__file__})",
28 "Helper to run pickled code remotely and return results",
35 help='The location of the bundle of code to execute.'
42 help='The location where we should write the computation results.'
46 action=argparse_utils.ActionNoYes,
48 help='Should we watch for the cancellation of our parent ssh process?'
53 def watch_for_cancel(terminate_event: threading.Event) -> None:
54 if platform.node() == 'VIDEO-COMPUTER':
55 logger.warning('Background thread not allowed on retarded computers, sorry.')
57 logger.debug('Starting up background thread...')
58 p = psutil.Process(os.getpid())
61 ancestors = p.parents()
62 for ancestor in ancestors:
63 name = ancestor.name()
65 logger.debug(f'Ancestor process {name} (pid={pid})')
66 if 'ssh' in name.lower():
70 logger.error('Did not see sshd in our ancestors list?! Committing suicide.')
72 os.kill(os.getpid(), signal.SIGTERM)
74 os.kill(os.getpid(), signal.SIGKILL)
76 if terminate_event.is_set():
83 in_file = config.config['code_file']
84 out_file = config.config['result_file']
86 if config.config['watch_for_cancel']:
87 (thread, stop_thread) = watch_for_cancel()
89 logger.debug(f'Reading {in_file}.')
91 with open(in_file, 'rb') as rb:
92 serialized = rb.read()
93 except Exception as e:
95 logger.critical(f'Problem reading {in_file}. Aborting.')
99 logger.debug(f'Deserializing {in_file}.')
101 fun, args, kwargs = cloudpickle.loads(serialized)
102 except Exception as e:
104 logger.critical(f'Problem deserializing {in_file}. Aborting.')
108 logger.debug('Invoking user code...')
110 ret = fun(*args, **kwargs)
112 logger.debug(f'User code took {end - start:.1f}s')
114 logger.debug('Serializing results')
116 serialized = cloudpickle.dumps(ret)
117 except Exception as e:
119 logger.critical(f'Could not serialize result ({type(ret)}). Aborting.')
123 logger.debug(f'Writing {out_file}.')
125 with open(out_file, 'wb') as wb:
127 except Exception as e:
129 logger.critical(f'Error writing {out_file}. Aborting.')
137 if __name__ == '__main__':