3 # © Copyright 2021-2022, Scott Gasch
5 """A simple utility to unpickle some code from the filesystem, run it,
6 pickle the results, and save them back on the filesystem. This file
7 helps :mod:`pyutils.parallelize.parallelize` and
8 :mod:`pyutils.parallelize.executors` implement the
9 :class:`pyutils.parallelize.executors.RemoteExecutor` that distributes
10 work to different machines when code is marked with the
11 `@parallelize(method=Method.REMOTE)` decorator.
14 Please don't unpickle (or run!) code you do not know! This
15 helper is designed to be run with your own code.
17 See details in :mod:`pyutils.parallelize.parallelize` for instructions
18 about how to set this up.
27 from typing import Optional
29 import cloudpickle # type: ignore
30 import psutil # type: ignore
32 from pyutils import argparse_utils, bootstrap, config
33 from pyutils.parallelize.thread_utils import background_thread
34 from pyutils.stopwatch import Timer
36 logger = logging.getLogger(__file__)
38 cfg = config.add_commandline_args(
39 f"Remote Worker ({__file__})",
40 "Helper to run pickled code remotely and return results",
47 help='The location of the bundle of code to execute.',
54 help='The location where we should write the computation results.',
58 action=argparse_utils.ActionNoYes,
60 help='Should we watch for the cancellation of our parent ssh process?',
65 def _watch_for_cancel(terminate_event: threading.Event) -> None:
66 logger.debug('Starting up background thread...')
67 p = psutil.Process(os.getpid())
70 ancestors = p.parents()
71 for ancestor in ancestors:
72 name = ancestor.name()
74 logger.debug('Ancestor process %s (pid=%d)', name, pid)
75 if 'ssh' in name.lower():
80 'Did not see sshd in our ancestors list?! Committing suicide.'
83 os.kill(os.getpid(), signal.SIGTERM)
85 os.kill(os.getpid(), signal.SIGKILL)
87 if terminate_event.is_set():
92 def _cleanup_and_exit(
93 thread: Optional[threading.Thread],
94 stop_event: Optional[threading.Event],
97 if stop_event is not None:
99 assert thread is not None
104 @bootstrap.initialize
106 """Remote worker entry point."""
108 in_file = config.config['code_file']
109 assert in_file and type(in_file) == str
110 out_file = config.config['result_file']
111 assert out_file and type(out_file) == str
115 if config.config['watch_for_cancel']:
116 thread, stop_event = _watch_for_cancel()
118 logger.debug('Reading %s.', in_file)
120 with open(in_file, 'rb') as rb:
121 serialized = rb.read()
122 except Exception as e:
124 logger.critical('Problem reading %s. Aborting.', in_file)
125 _cleanup_and_exit(thread, stop_event, 1)
127 logger.debug('Deserializing %s', in_file)
129 fun, args, kwargs = cloudpickle.loads(serialized)
130 except Exception as e:
132 logger.critical('Problem deserializing %s. Aborting.', in_file)
133 _cleanup_and_exit(thread, stop_event, 2)
135 logger.debug('Invoking user-defined code...')
137 ret = fun(*args, **kwargs)
138 logger.debug('User code took %.1fs', t())
140 logger.debug('Serializing results')
142 serialized = cloudpickle.dumps(ret)
143 except Exception as e:
145 logger.critical('Could not serialize result (%s). Aborting.', type(ret))
146 _cleanup_and_exit(thread, stop_event, 3)
148 logger.debug('Writing %s', out_file)
150 with open(out_file, 'wb') as wb:
152 except Exception as e:
154 logger.critical('Error writing %s. Aborting.', out_file)
155 _cleanup_and_exit(thread, stop_event, 4)
156 _cleanup_and_exit(thread, stop_event, 0)
159 if __name__ == '__main__':