# © Copyright 2021-2022, Scott Gasch
-"""A simple utility to unpickle some code, run it, and pickle the
-results. Please don't unpickle (or run!) code you do not know.
+"""A simple utility to unpickle some code from the filesystem, run it,
+pickle the results, and save them back on the filesystem. This file
+helps :mod:`pyutils.parallelize.parallelize` and
+:mod:`pyutils.parallelize.executors` implement the
+:class:`pyutils.parallelize.executors.RemoteExecutor` that distributes
+work to different machines when code is marked with the
+`@parallelize(method=Method.REMOTE)` decorator.
+
+.. warning::
+ Please don't unpickle (or run!) code you do not know! This
+ helper is designed to be run with your own code.
+
+See details in :mod:`pyutils.parallelize.parallelize` for instructions
+about how to set this up.
"""
import logging
@background_thread
-def watch_for_cancel(terminate_event: threading.Event) -> None:
+def _watch_for_cancel(terminate_event: threading.Event) -> None:
logger.debug('Starting up background thread...')
p = psutil.Process(os.getpid())
while True:
time.sleep(1.0)
-def cleanup_and_exit(
+def _cleanup_and_exit(
thread: Optional[threading.Thread],
- stop_thread: Optional[threading.Event],
+ stop_event: Optional[threading.Event],
exit_code: int,
) -> None:
- if stop_thread is not None:
- stop_thread.set()
+ if stop_event is not None:
+ stop_event.set()
assert thread is not None
thread.join()
sys.exit(exit_code)
@bootstrap.initialize
def main() -> None:
+ """Remote worker entry point."""
+
in_file = config.config['code_file']
+ assert in_file and type(in_file) == str
out_file = config.config['result_file']
+ assert out_file and type(out_file) == str
thread = None
- stop_thread = None
+ stop_event = None
if config.config['watch_for_cancel']:
- (thread, stop_thread) = watch_for_cancel()
+ thread, stop_event = _watch_for_cancel()
logger.debug('Reading %s.', in_file)
try:
except Exception as e:
logger.exception(e)
logger.critical('Problem reading %s. Aborting.', in_file)
- cleanup_and_exit(thread, stop_thread, 1)
+ _cleanup_and_exit(thread, stop_event, 1)
logger.debug('Deserializing %s', in_file)
try:
except Exception as e:
logger.exception(e)
logger.critical('Problem deserializing %s. Aborting.', in_file)
- cleanup_and_exit(thread, stop_thread, 2)
+ _cleanup_and_exit(thread, stop_event, 2)
- logger.debug('Invoking user code...')
+ logger.debug('Invoking user-defined code...')
with Timer() as t:
ret = fun(*args, **kwargs)
logger.debug('User code took %.1fs', t())
except Exception as e:
logger.exception(e)
logger.critical('Could not serialize result (%s). Aborting.', type(ret))
- cleanup_and_exit(thread, stop_thread, 3)
+ _cleanup_and_exit(thread, stop_event, 3)
logger.debug('Writing %s', out_file)
try:
except Exception as e:
logger.exception(e)
logger.critical('Error writing %s. Aborting.', out_file)
- cleanup_and_exit(thread, stop_thread, 4)
- cleanup_and_exit(thread, stop_thread, 0)
+ _cleanup_and_exit(thread, stop_event, 4)
+ _cleanup_and_exit(thread, stop_event, 0)
if __name__ == '__main__':