Bug and readability fix.
[pyutils.git] / src / pyutils / parallelize / executors.py
index fe00b5427c0876b04fba223241c2072ae6a1de4f..e8f8d40dd30b1eb1ca52c95d0ab282e43e17ea79 100644 (file)
@@ -49,18 +49,26 @@ import time
 import warnings
 from abc import ABC, abstractmethod
 from collections import defaultdict
-from dataclasses import dataclass, fields
+from dataclasses import dataclass
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
 from overrides import overrides
 
-import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, math_utils, persistent, string_utils
+import pyutils.types.histogram as hist
+from pyutils import (
+    argparse_utils,
+    config,
+    dataclass_utils,
+    math_utils,
+    persistent,
+    string_utils,
+)
 from pyutils.ansi import bg, fg, reset, underline
 from pyutils.decorator_utils import singleton
 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
 from pyutils.parallelize.thread_utils import background_thread
+from pyutils.types import type_utils
 
 logger = logging.getLogger(__name__)
 
@@ -106,7 +114,7 @@ parser.add_argument(
     type=str,
     metavar='PATH_TO_REMOTE_WORKER_PY',
     help='Path to remote_worker.py on remote machines',
-    default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+    default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
 )
 
 
@@ -385,7 +393,7 @@ class BundleDetails:
     machine: Optional[str]
     """The remote machine running this bundle or None if none (yet)"""
 
-    hostname: str
+    controller: str
     """The controller machine"""
 
     code_file: str
@@ -580,7 +588,8 @@ class RemoteExecutorStatus:
         total_finished = len(self.finished_bundle_timings)
         total_in_flight = self.total_in_flight()
         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
-        qall = None
+        qall_median = None
+        qall_p95 = None
         if len(self.finished_bundle_timings) > 1:
             qall_median = self.finished_bundle_timings.get_median()
             qall_p95 = self.finished_bundle_timings.get_percentile(95)
@@ -634,8 +643,8 @@ class RemoteExecutorStatus:
                             if details is not None:
                                 details.slower_than_local_p95 = False
 
-                    if qall is not None:
-                        if sec > qall[1]:
+                    if qall_p95 is not None:
+                        if sec > qall_p95:
                             ret += f'{bg("red")}>∀p95{reset()} '
                             if details is not None:
                                 details.slower_than_global_p95 = True
@@ -654,7 +663,7 @@ class RemoteExecutorStatus:
 
 
 class RemoteWorkerSelectionPolicy(ABC):
-    """A policy for selecting a remote worker base class."""
+    """An interface definition of a policy for selecting a remote worker."""
 
     def __init__(self):
         self.workers: Optional[List[RemoteWorkerRecord]] = None
@@ -754,38 +763,78 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 
 class RemoteExecutor(BaseExecutor):
-    """An executor that uses processes on remote machines to do work.  This
-    works by creating "bundles" of work with pickled code in each to be
-    executed.  Each bundle is assigned a remote worker based on some policy
-    heuristics.  Once assigned to a remote worker, a local subprocess is
-    created.  It copies the pickled code to the remote machine via ssh/scp
-    and then starts up work on the remote machine again using ssh to invoke
-    the :file:`remote_worker.py` (`--remote_worker_helper_path`).  When
-    the work is complete, the local subprocess copies the results back to
-    the local machine via ssh/scp.
-
-    So there is essentially one "controller" machine (which may also be
-    in the remote executor pool and therefore do task work in addition to
-    controlling) and N worker machines.  This code runs on the controller
-    whereas on the worker machines we invoke pickled user code via a
-    shim in :file:`remote_worker.py`.
-
-    Some redundancy and safety provisions are made; e.g. slower than
-    expected tasks have redundant backups created and if a task fails
-    repeatedly we consider it poisoned and give up on it.
+    """An executor that uses processes on remote machines to do work.
+    To do so, it requires that a pool of remote workers to be properly
+    configured.  See instructions in
+    :class:`pyutils.parallelize.parallelize`.
+
+    Each machine in a worker pool has a *weight* and a *count*.  A
+    *weight* captures the relative speed of a processor on that worker
+    and a *count* captures the number of synchronous tasks the worker
+    can accept (i.e. the number of cpus on the machine).
+
+    To dispatch work to a remote machine, this class pickles the code
+    to be executed remotely using `cloudpickle`.  For that to work,
+    the remote machine should be running the same version of Python as
+    this machine, ideally in a virtual environment with the same
+    import libraries installed.  Differences in operating system
+    and/or processor architecture don't seem to matter for most code,
+    though.
+
+    .. warning::
+
+        Mismatches in Python version or in the version numbers of
+        third-party libraries between machines can cause problems
+        when trying to unpickle and run code remotely.
+
+    Work to be dispatched is represented in this code by creating a
+    "bundle".  Each bundle is assigned to a remote worker based on
+    heuristics captured in a :class:`RemoteWorkerSelectionPolicy`.  In
+    general, it attempts to load all workers in the pool and maximize
+    throughput.  Once assigned to a remote worker, pickled code is
+    copied to that worker via `scp` and a remote command is issued via
+    `ssh` to execute a :file:`remote_worker.py` process on the remote
+    machine.  This process unpickles the code, runs it, and produces a
+    result which is then copied back to the local machine (again via
+    `scp`) where it can be processed by local code.
+
+    You can and probably must override the path of
+    :file:`remote_worker.py` on your pool machines using the
+    `--remote_worker_helper_path` commandline argument (or by just
+    changing the default in code, see above in this file's code).
+
+    During remote work execution, this local machine acts as a
+    controller dispatching all work to the network, copying pickled
+    tasks out, and copying results back in.  It may also be a worker
+    in the pool but do not underestimate the cost of being a
+    controller -- it takes some cpu and a lot of network bandwidth.
+    The work dispatcher logic attempts to detect when a controller is
+    also a worker and reduce its load.
+
+    Some redundancy and safety provisions are made when scheduling
+    tasks to the worker pool; e.g. slower than expected tasks have
+    redundant backups tasks created, especially if there are otherwise
+    idle workers.  If a task fails repeatedly, the dispatcher consider
+    it poisoned and give up on it.
 
     .. warning::
 
-        The network overhead / latency of copying work from the
-        controller machine to the remote workers is relatively high.
         This executor probably only makes sense to use with
         computationally expensive tasks such as jobs that will execute
         for ~30 seconds or longer.
 
+        The network overhead and latency of copying work from the
+        controller (local) machine to the remote workers and copying
+        results back again is relatively high.  Especially at startup,
+        the network can become a bottleneck.  Future versions of this
+        code may attempt to split the responsibility of being a
+        controller (distributing work to pool machines).
+
     Instructions for how to set this up are provided in
     :class:`pyutils.parallelize.parallelize`.
 
     See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+
     """
 
     def __init__(
@@ -1019,7 +1068,7 @@ class RemoteExecutor(BaseExecutor):
 
         self.adjust_task_count(+1)
         uuid = bundle.uuid
-        hostname = bundle.hostname
+        controller = bundle.controller
         avoid_machine = override_avoid_machine
         is_original = bundle.src_bundle is None
 
@@ -1073,7 +1122,7 @@ class RemoteExecutor(BaseExecutor):
                     return None
 
         # Send input code / data to worker machine if it's not local.
-        if hostname not in machine:
+        if controller not in machine:
             try:
                 cmd = (
                     f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
@@ -1226,7 +1275,7 @@ class RemoteExecutor(BaseExecutor):
             bundle.end_ts = time.time()
             if not was_cancelled:
                 assert bundle.machine is not None
-                if bundle.hostname not in bundle.machine:
+                if bundle.controller not in bundle.machine:
                     cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
                         "%s: Fetching results back from %s@%s via %s",
@@ -1339,7 +1388,7 @@ class RemoteExecutor(BaseExecutor):
             worker=None,
             username=None,
             machine=None,
-            hostname=platform.node(),
+            controller=platform.node(),
             code_file=code_file,
             result_file=result_file,
             pid=0,
@@ -1373,7 +1422,7 @@ class RemoteExecutor(BaseExecutor):
             worker=None,
             username=None,
             machine=None,
-            hostname=src_bundle.hostname,
+            controller=src_bundle.controller,
             code_file=src_bundle.code_file,
             result_file=src_bundle.result_file,
             pid=0,
@@ -1437,11 +1486,10 @@ class RemoteExecutor(BaseExecutor):
                 raise RemoteExecutorException(
                     f'{bundle}: This bundle can\'t be completed despite several backups and retries',
                 )
-            else:
-                logger.error(
-                    '%s: At least it\'s only a backup; better luck with the others.',
-                    bundle,
-                )
+            logger.error(
+                '%s: At least it\'s only a backup; better luck with the others.',
+                bundle,
+            )
             return None
         else:
             msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
@@ -1487,16 +1535,10 @@ class ConfigRemoteWorkerPoolProvider(
         self.remote_worker_pool = []
         for record in json_remote_worker_pool['remote_worker_records']:
             self.remote_worker_pool.append(
-                self.dataclassFromDict(RemoteWorkerRecord, record)
+                dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
             )
         assert len(self.remote_worker_pool) > 0
 
-    @staticmethod
-    def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
-        fieldSet = {f.name for f in fields(clsName) if f.init}
-        filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
-        return clsName(**filteredArgDict)
-
     @overrides
     def get_remote_workers(self) -> List[RemoteWorkerRecord]:
         return self.remote_worker_pool
@@ -1508,7 +1550,7 @@ class ConfigRemoteWorkerPoolProvider(
     @staticmethod
     @overrides
     def get_filename() -> str:
-        return config.config['remote_worker_records_file']
+        return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
 
     @staticmethod
     @overrides