3 # © Copyright 2021-2022, Scott Gasch
5 """A simple utility to unpickle some code, run it, and pickle the
6 results. Please don't unpickle (or run!) code you do not know.
8 This script is used by code in parallelize, namely the
9 :class:`RemoteExecutor`, to schedule work on a remote machine.
10 The code in :file:`parallelize.py` uses a user-defined configuration
11 to schedule work this way. See that file for setup instructions.
20 from typing import Optional
22 import cloudpickle # type: ignore
23 import psutil # type: ignore
25 from pyutils import argparse_utils, bootstrap, config
26 from pyutils.parallelize.thread_utils import background_thread
27 from pyutils.stopwatch import Timer
29 logger = logging.getLogger(__file__)
31 cfg = config.add_commandline_args(
32 f"Remote Worker ({__file__})",
33 "Helper to run pickled code remotely and return results",
40 help='The location of the bundle of code to execute.',
47 help='The location where we should write the computation results.',
51 action=argparse_utils.ActionNoYes,
53 help='Should we watch for the cancellation of our parent ssh process?',
58 def watch_for_cancel(terminate_event: threading.Event) -> None:
59 logger.debug('Starting up background thread...')
60 p = psutil.Process(os.getpid())
63 ancestors = p.parents()
64 for ancestor in ancestors:
65 name = ancestor.name()
67 logger.debug('Ancestor process %s (pid=%d)', name, pid)
68 if 'ssh' in name.lower():
73 'Did not see sshd in our ancestors list?! Committing suicide.'
76 os.kill(os.getpid(), signal.SIGTERM)
78 os.kill(os.getpid(), signal.SIGKILL)
80 if terminate_event.is_set():
86 thread: Optional[threading.Thread],
87 stop_thread: Optional[threading.Event],
90 if stop_thread is not None:
92 assert thread is not None
99 in_file = config.config['code_file']
100 assert in_file and type(in_file) == str
101 out_file = config.config['result_file']
102 assert out_file and type(out_file) == str
106 if config.config['watch_for_cancel']:
107 (thread, stop_thread) = watch_for_cancel()
109 logger.debug('Reading %s.', in_file)
111 with open(in_file, 'rb') as rb:
112 serialized = rb.read()
113 except Exception as e:
115 logger.critical('Problem reading %s. Aborting.', in_file)
116 cleanup_and_exit(thread, stop_thread, 1)
118 logger.debug('Deserializing %s', in_file)
120 fun, args, kwargs = cloudpickle.loads(serialized)
121 except Exception as e:
123 logger.critical('Problem deserializing %s. Aborting.', in_file)
124 cleanup_and_exit(thread, stop_thread, 2)
126 logger.debug('Invoking user code...')
128 ret = fun(*args, **kwargs)
129 logger.debug('User code took %.1fs', t())
131 logger.debug('Serializing results')
133 serialized = cloudpickle.dumps(ret)
134 except Exception as e:
136 logger.critical('Could not serialize result (%s). Aborting.', type(ret))
137 cleanup_and_exit(thread, stop_thread, 3)
139 logger.debug('Writing %s', out_file)
141 with open(out_file, 'wb') as wb:
143 except Exception as e:
145 logger.critical('Error writing %s. Aborting.', out_file)
146 cleanup_and_exit(thread, stop_thread, 4)
147 cleanup_and_exit(thread, stop_thread, 0)
150 if __name__ == '__main__':