import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
-from dataclasses import dataclass, fields
+from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
from overrides import overrides
-import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, math_utils, persistent, string_utils
+import pyutils.types.histogram as hist
+from pyutils import (
+ argparse_utils,
+ config,
+ dataclass_utils,
+ math_utils,
+ persistent,
+ string_utils,
+)
from pyutils.ansi import bg, fg, reset, underline
from pyutils.decorator_utils import singleton
from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
from pyutils.parallelize.thread_utils import background_thread
+from pyutils.types import type_utils
logger = logging.getLogger(__name__)
type=str,
metavar='PATH_TO_REMOTE_WORKER_PY',
help='Path to remote_worker.py on remote machines',
- default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+ default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
)
machine: Optional[str]
"""The remote machine running this bundle or None if none (yet)"""
- hostname: str
+ controller: str
"""The controller machine"""
code_file: str
total_finished = len(self.finished_bundle_timings)
total_in_flight = self.total_in_flight()
ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
- qall = None
+ qall_median = None
+ qall_p95 = None
if len(self.finished_bundle_timings) > 1:
qall_median = self.finished_bundle_timings.get_median()
qall_p95 = self.finished_bundle_timings.get_percentile(95)
if details is not None:
details.slower_than_local_p95 = False
- if qall is not None:
- if sec > qall[1]:
+ if qall_p95 is not None:
+ if sec > qall_p95:
ret += f'{bg("red")}>∀p95{reset()} '
if details is not None:
details.slower_than_global_p95 = True
class RemoteWorkerSelectionPolicy(ABC):
- """A policy for selecting a remote worker base class."""
+ """An interface definition of a policy for selecting a remote worker."""
def __init__(self):
self.workers: Optional[List[RemoteWorkerRecord]] = None
class RemoteExecutor(BaseExecutor):
- """An executor that uses processes on remote machines to do work. This
- works by creating "bundles" of work with pickled code in each to be
- executed. Each bundle is assigned a remote worker based on some policy
- heuristics. Once assigned to a remote worker, a local subprocess is
- created. It copies the pickled code to the remote machine via ssh/scp
- and then starts up work on the remote machine again using ssh to invoke
- the :file:`remote_worker.py` (`--remote_worker_helper_path`). When
- the work is complete, the local subprocess copies the results back to
- the local machine via ssh/scp.
-
- So there is essentially one "controller" machine (which may also be
- in the remote executor pool and therefore do task work in addition to
- controlling) and N worker machines. This code runs on the controller
- whereas on the worker machines we invoke pickled user code via a
- shim in :file:`remote_worker.py`.
-
- Some redundancy and safety provisions are made; e.g. slower than
- expected tasks have redundant backups created and if a task fails
- repeatedly we consider it poisoned and give up on it.
+ """An executor that uses processes on remote machines to do work.
+ To do so, it requires that a pool of remote workers to be properly
+ configured. See instructions in
+ :class:`pyutils.parallelize.parallelize`.
+
+ Each machine in a worker pool has a *weight* and a *count*. A
+ *weight* captures the relative speed of a processor on that worker
+ and a *count* captures the number of synchronous tasks the worker
+ can accept (i.e. the number of cpus on the machine).
+
+ To dispatch work to a remote machine, this class pickles the code
+ to be executed remotely using `cloudpickle`. For that to work,
+ the remote machine should be running the same version of Python as
+ this machine, ideally in a virtual environment with the same
+ import libraries installed. Differences in operating system
+ and/or processor architecture don't seem to matter for most code,
+ though.
+
+ .. warning::
+
+ Mismatches in Python version or in the version numbers of
+ third-party libraries between machines can cause problems
+ when trying to unpickle and run code remotely.
+
+ Work to be dispatched is represented in this code by creating a
+ "bundle". Each bundle is assigned to a remote worker based on
+ heuristics captured in a :class:`RemoteWorkerSelectionPolicy`. In
+ general, it attempts to load all workers in the pool and maximize
+ throughput. Once assigned to a remote worker, pickled code is
+ copied to that worker via `scp` and a remote command is issued via
+ `ssh` to execute a :file:`remote_worker.py` process on the remote
+ machine. This process unpickles the code, runs it, and produces a
+ result which is then copied back to the local machine (again via
+ `scp`) where it can be processed by local code.
+
+ You can and probably must override the path of
+ :file:`remote_worker.py` on your pool machines using the
+ `--remote_worker_helper_path` commandline argument (or by just
+ changing the default in code, see above in this file's code).
+
+ During remote work execution, this local machine acts as a
+ controller dispatching all work to the network, copying pickled
+ tasks out, and copying results back in. It may also be a worker
+ in the pool but do not underestimate the cost of being a
+ controller -- it takes some cpu and a lot of network bandwidth.
+ The work dispatcher logic attempts to detect when a controller is
+ also a worker and reduce its load.
+
+ Some redundancy and safety provisions are made when scheduling
+ tasks to the worker pool; e.g. slower than expected tasks have
+ redundant backups tasks created, especially if there are otherwise
+ idle workers. If a task fails repeatedly, the dispatcher consider
+ it poisoned and give up on it.
.. warning::
- The network overhead / latency of copying work from the
- controller machine to the remote workers is relatively high.
This executor probably only makes sense to use with
computationally expensive tasks such as jobs that will execute
for ~30 seconds or longer.
+ The network overhead and latency of copying work from the
+ controller (local) machine to the remote workers and copying
+ results back again is relatively high. Especially at startup,
+ the network can become a bottleneck. Future versions of this
+ code may attempt to split the responsibility of being a
+ controller (distributing work to pool machines).
+
Instructions for how to set this up are provided in
:class:`pyutils.parallelize.parallelize`.
See also :class:`ProcessExecutor` and :class:`ThreadExecutor`.
+
"""
def __init__(
self.adjust_task_count(+1)
uuid = bundle.uuid
- hostname = bundle.hostname
+ controller = bundle.controller
avoid_machine = override_avoid_machine
is_original = bundle.src_bundle is None
return None
# Send input code / data to worker machine if it's not local.
- if hostname not in machine:
+ if controller not in machine:
try:
cmd = (
f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
bundle.end_ts = time.time()
if not was_cancelled:
assert bundle.machine is not None
- if bundle.hostname not in bundle.machine:
+ if bundle.controller not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
"%s: Fetching results back from %s@%s via %s",
worker=None,
username=None,
machine=None,
- hostname=platform.node(),
+ controller=platform.node(),
code_file=code_file,
result_file=result_file,
pid=0,
worker=None,
username=None,
machine=None,
- hostname=src_bundle.hostname,
+ controller=src_bundle.controller,
code_file=src_bundle.code_file,
result_file=src_bundle.result_file,
pid=0,
raise RemoteExecutorException(
f'{bundle}: This bundle can\'t be completed despite several backups and retries',
)
- else:
- logger.error(
- '%s: At least it\'s only a backup; better luck with the others.',
- bundle,
- )
+ logger.error(
+ '%s: At least it\'s only a backup; better luck with the others.',
+ bundle,
+ )
return None
else:
msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
self.remote_worker_pool = []
for record in json_remote_worker_pool['remote_worker_records']:
self.remote_worker_pool.append(
- self.dataclassFromDict(RemoteWorkerRecord, record)
+ dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
)
assert len(self.remote_worker_pool) > 0
- @staticmethod
- def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
- fieldSet = {f.name for f in fields(clsName) if f.init}
- filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
- return clsName(**filteredArgDict)
-
@overrides
def get_remote_workers(self) -> List[RemoteWorkerRecord]:
return self.remote_worker_pool
@staticmethod
@overrides
def get_filename() -> str:
- return config.config['remote_worker_records_file']
+ return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
@staticmethod
@overrides