Remove hardcoded properties list from DeferredOperand; use a c'tor
[pyutils.git] / src / pyutils / remote_worker.py
index 54f562da181d8d21b9891c8b99cfcc8e2dcd43fe..af0968ca93f3b06ba362c3110cd12640aab0da0a 100755 (executable)
@@ -2,8 +2,20 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""A simple utility to unpickle some code, run it, and pickle the
-results.  Please don't unpickle (or run!) code you do not know.
+"""A simple utility to unpickle some code from the filesystem, run it,
+pickle the results, and save them back on the filesystem.  This file
+helps :mod:`pyutils.parallelize.parallelize` and
+:mod:`pyutils.parallelize.executors` implement the
+:class:`pyutils.parallelize.executors.RemoteExecutor` that distributes
+work to different machines when code is marked with the
+`@parallelize(method=Method.REMOTE)` decorator.
+
+.. warning::
+    Please don't unpickle (or run!) code you do not know!  This
+    helper is designed to be run with your own code.
+
+See details in :mod:`pyutils.parallelize.parallelize` for instructions
+about how to set this up.
 """
 
 import logging
@@ -50,7 +62,7 @@ cfg.add_argument(
 
 
 @background_thread
-def watch_for_cancel(terminate_event: threading.Event) -> None:
+def _watch_for_cancel(terminate_event: threading.Event) -> None:
     logger.debug('Starting up background thread...')
     p = psutil.Process(os.getpid())
     while True:
@@ -77,13 +89,13 @@ def watch_for_cancel(terminate_event: threading.Event) -> None:
         time.sleep(1.0)
 
 
-def cleanup_and_exit(
+def _cleanup_and_exit(
     thread: Optional[threading.Thread],
-    stop_thread: Optional[threading.Event],
+    stop_event: Optional[threading.Event],
     exit_code: int,
 ) -> None:
-    if stop_thread is not None:
-        stop_thread.set()
+    if stop_event is not None:
+        stop_event.set()
         assert thread is not None
         thread.join()
     sys.exit(exit_code)
@@ -91,15 +103,17 @@ def cleanup_and_exit(
 
 @bootstrap.initialize
 def main() -> None:
+    """Remote worker entry point."""
+
     in_file = config.config['code_file']
     assert in_file and type(in_file) == str
     out_file = config.config['result_file']
     assert out_file and type(out_file) == str
 
     thread = None
-    stop_thread = None
+    stop_event = None
     if config.config['watch_for_cancel']:
-        (thread, stop_thread) = watch_for_cancel()
+        thread, stop_event = _watch_for_cancel()
 
     logger.debug('Reading %s.', in_file)
     try:
@@ -108,7 +122,7 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical('Problem reading %s. Aborting.', in_file)
-        cleanup_and_exit(thread, stop_thread, 1)
+        _cleanup_and_exit(thread, stop_event, 1)
 
     logger.debug('Deserializing %s', in_file)
     try:
@@ -116,9 +130,9 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical('Problem deserializing %s. Aborting.', in_file)
-        cleanup_and_exit(thread, stop_thread, 2)
+        _cleanup_and_exit(thread, stop_event, 2)
 
-    logger.debug('Invoking user code...')
+    logger.debug('Invoking user-defined code...')
     with Timer() as t:
         ret = fun(*args, **kwargs)
     logger.debug('User code took %.1fs', t())
@@ -129,7 +143,7 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical('Could not serialize result (%s). Aborting.', type(ret))
-        cleanup_and_exit(thread, stop_thread, 3)
+        _cleanup_and_exit(thread, stop_event, 3)
 
     logger.debug('Writing %s', out_file)
     try:
@@ -138,8 +152,8 @@ def main() -> None:
     except Exception as e:
         logger.exception(e)
         logger.critical('Error writing %s. Aborting.', out_file)
-        cleanup_and_exit(thread, stop_thread, 4)
-    cleanup_and_exit(thread, stop_thread, 0)
+        _cleanup_and_exit(thread, stop_event, 4)
+    _cleanup_and_exit(thread, stop_event, 0)
 
 
 if __name__ == '__main__':