import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
-from dataclasses import dataclass, fields
+from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
from overrides import overrides
-import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, math_utils, persistent, string_utils
+import pyutils.types.histogram as hist
+from pyutils import (
+ argparse_utils,
+ config,
+ dataclass_utils,
+ math_utils,
+ persistent,
+ string_utils,
+)
from pyutils.ansi import bg, fg, reset, underline
from pyutils.decorator_utils import singleton
from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
from pyutils.parallelize.thread_utils import background_thread
+from pyutils.types import type_utils
logger = logging.getLogger(__name__)
type=str,
metavar='PATH_TO_REMOTE_WORKER_PY',
help='Path to remote_worker.py on remote machines',
- default='source py39-venv/bin/activate && /home/scott/lib/release/pyutils/src/pyutils/remote_worker.py',
+ default=f'source py39-venv/bin/activate && {os.environ["HOME"]}/pyutils/src/pyutils/remote_worker.py',
)
machine: Optional[str]
"""The remote machine running this bundle or None if none (yet)"""
- hostname: str
+ controller: str
"""The controller machine"""
code_file: str
total_finished = len(self.finished_bundle_timings)
total_in_flight = self.total_in_flight()
ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
- qall = None
+ qall_median = None
+ qall_p95 = None
if len(self.finished_bundle_timings) > 1:
qall_median = self.finished_bundle_timings.get_median()
qall_p95 = self.finished_bundle_timings.get_percentile(95)
if details is not None:
details.slower_than_local_p95 = False
- if qall is not None:
- if sec > qall[1]:
+ if qall_p95 is not None:
+ if sec > qall_p95:
ret += f'{bg("red")}>∀p95{reset()} '
if details is not None:
details.slower_than_global_p95 = True
self.adjust_task_count(+1)
uuid = bundle.uuid
- hostname = bundle.hostname
+ controller = bundle.controller
avoid_machine = override_avoid_machine
is_original = bundle.src_bundle is None
return None
# Send input code / data to worker machine if it's not local.
- if hostname not in machine:
+ if controller not in machine:
try:
cmd = (
f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
bundle.end_ts = time.time()
if not was_cancelled:
assert bundle.machine is not None
- if bundle.hostname not in bundle.machine:
+ if bundle.controller not in bundle.machine:
cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
logger.info(
"%s: Fetching results back from %s@%s via %s",
worker=None,
username=None,
machine=None,
- hostname=platform.node(),
+ controller=platform.node(),
code_file=code_file,
result_file=result_file,
pid=0,
worker=None,
username=None,
machine=None,
- hostname=src_bundle.hostname,
+ controller=src_bundle.controller,
code_file=src_bundle.code_file,
result_file=src_bundle.result_file,
pid=0,
raise RemoteExecutorException(
f'{bundle}: This bundle can\'t be completed despite several backups and retries',
)
- else:
- logger.error(
- '%s: At least it\'s only a backup; better luck with the others.',
- bundle,
- )
+ logger.error(
+ '%s: At least it\'s only a backup; better luck with the others.',
+ bundle,
+ )
return None
else:
msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
self.remote_worker_pool = []
for record in json_remote_worker_pool['remote_worker_records']:
self.remote_worker_pool.append(
- self.dataclassFromDict(RemoteWorkerRecord, record)
+ dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
)
assert len(self.remote_worker_pool) > 0
- @staticmethod
- def dataclassFromDict(clsName, argDict: Dict[str, Any]) -> Any:
- fieldSet = {f.name for f in fields(clsName) if f.init}
- filteredArgDict = {k: v for k, v in argDict.items() if k in fieldSet}
- return clsName(**filteredArgDict)
-
@overrides
def get_remote_workers(self) -> List[RemoteWorkerRecord]:
return self.remote_worker_pool
@staticmethod
@overrides
def get_filename() -> str:
- return config.config['remote_worker_records_file']
+ return type_utils.unwrap_optional(config.config['remote_worker_records_file'])
@staticmethod
@overrides