3 # © Copyright 2021-2023, Scott Gasch
5 """A simple utility to unpickle some code from the filesystem, run it,
6 pickle the results, and save them back on the filesystem. This file
7 helps :mod:`pyutils.parallelize.parallelize` and
8 :mod:`pyutils.parallelize.executors` implement the
9 :class:`pyutils.parallelize.executors.RemoteExecutor` that distributes
10 work to different machines when code is marked with the
11 `@parallelize(method=Method.REMOTE)` decorator.
14 Please don't unpickle (or run!) code you do not know! This
15 helper is designed to be run with your own code.
17 See details in :mod:`pyutils.parallelize.parallelize` for instructions
18 about how to set this up.
27 from typing import Optional
29 import cloudpickle # type: ignore
30 import psutil # type: ignore
32 from pyutils import argparse_utils, bootstrap, config
33 from pyutils.parallelize.thread_utils import background_thread
34 from pyutils.stopwatch import Timer
36 logger = logging.getLogger(__file__)
38 cfg = config.add_commandline_args(
39 f"Remote Worker ({__file__})",
40 "Helper to run pickled code remotely and return results",
47 help="The location of the bundle of code to execute.",
54 help="The location where we should write the computation results.",
58 action=argparse_utils.ActionNoYes,
60 help="Should we watch for the cancellation of our parent ssh process?",
65 def _watch_for_cancel(terminate_event: threading.Event) -> None:
66 logger.debug("Starting up background thread...")
67 p = psutil.Process(os.getpid())
70 ancestors = p.parents()
71 for ancestor in ancestors:
72 name = ancestor.name()
74 logger.debug("Ancestor process %s (pid=%d)", name, pid)
75 if "ssh" in name.lower():
80 "Did not see sshd in our ancestors list?! Committing suicide."
83 os.kill(os.getpid(), signal.SIGTERM)
85 os.kill(os.getpid(), signal.SIGKILL)
87 if terminate_event.is_set():
92 def _cleanup_and_exit(
93 thread: Optional[threading.Thread],
94 stop_event: Optional[threading.Event],
97 if stop_event is not None:
99 assert thread is not None
104 @bootstrap.initialize
106 """Remote worker entry point."""
108 in_file = config.config["code_file"]
109 assert in_file and isinstance(in_file, str)
110 out_file = config.config["result_file"]
111 assert out_file and isinstance(out_file, str)
115 if config.config["watch_for_cancel"]:
116 thread, stop_event = _watch_for_cancel()
118 logger.debug("Reading %s.", in_file)
120 with open(in_file, "rb") as rb:
121 serialized = rb.read()
123 logger.exception("Problem reading %s; aborting.", in_file)
124 _cleanup_and_exit(thread, stop_event, 1)
126 logger.debug("Deserializing %s", in_file)
128 fun, args, kwargs = cloudpickle.loads(serialized)
130 logger.exception("Problem deserializing %s. Aborting.", in_file)
131 _cleanup_and_exit(thread, stop_event, 2)
133 logger.debug("Invoking user-defined code...")
135 ret = fun(*args, **kwargs)
136 logger.debug("User code took %.1fs", t())
138 logger.debug("Serializing results")
140 serialized = cloudpickle.dumps(ret)
142 logger.exception("Could not serialize result (%s). Aborting.", type(ret))
143 _cleanup_and_exit(thread, stop_event, 3)
145 logger.debug("Writing %s", out_file)
147 with open(out_file, "wb") as wb:
150 logger.exception("Error writing %s. Aborting.", out_file)
151 _cleanup_and_exit(thread, stop_event, 4)
152 _cleanup_and_exit(thread, stop_event, 0)
155 if __name__ == "__main__":