#!/usr/bin/env python3
-from __future__ import annotations
+"""This is a blueprint for training sklearn ML models."""
-from abc import ABC, abstractmethod
+from __future__ import annotations
import datetime
import glob
import logging
import pickle
import random
import sys
-from types import SimpleNamespace
-from typing import Any, List, NamedTuple, Optional, Set, Tuple
import warnings
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from types import SimpleNamespace
+from typing import Any, List, Optional, Set, Tuple
import numpy as np
from sklearn.model_selection import train_test_split # type:ignore
from sklearn.preprocessing import MinMaxScaler # type: ignore
-from ansi import bold, reset
import argparse_utils
import config
-from decorator_utils import timed
import executors
import parallelize as par
+from ansi import bold, reset
+from decorator_utils import timed
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
- f"ML Model Trainer ({__file__})", "Arguments related to training an ML model"
+ f"ML Model Trainer ({__file__})",
+ "Arguments related to training an ML model",
)
parser.add_argument(
"--ml_trainer_quiet",
class InputSpec(SimpleNamespace):
+ """A collection of info needed to train the model provided by the
+ caller."""
+
file_glob: str
feature_count: int
features_to_skip: Set[str]
)
-class OutputSpec(NamedTuple):
- model_filename: Optional[str]
- model_info_filename: Optional[str]
- scaler_filename: Optional[str]
- training_score: float
- test_score: float
+@dataclass
+class OutputSpec:
+ """Info about the results of training returned to the caller."""
+
+ model_filename: Optional[str] = None
+ model_info_filename: Optional[str] = None
+ scaler_filename: Optional[str] = None
+ training_score: np.float64 = np.float64(0.0)
+ test_score: np.float64 = np.float64(0.0)
class TrainingBlueprint(ABC):
+ """The blueprint for doing the actual training."""
+
def __init__(self):
self.y_train = None
self.y_test = None
y = np.array(y_)
print("Doing random test/train split...")
- X_train, X_test, self.y_train, self.y_test = self.test_train_split(
+ X_train, X_test, self.y_train, self.y_test = TrainingBlueprint.test_train_split(
X,
y,
)
print("Scaling training data...")
- scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data(
+ scaler, self.X_train_scaled, self.X_test_scaled = TrainingBlueprint.scale_data(
X_train,
X_test,
)
modelid_to_params[model.get_id()] = str(params)
best_model = None
- best_score = None
- best_test_score = None
- best_training_score = None
+ best_score: Optional[np.float64] = None
+ best_test_score: Optional[np.float64] = None
+ best_training_score: Optional[np.float64] = None
best_params = None
for model in smart_future.wait_any(models):
params = modelid_to_params[model.get_id()]
if isinstance(model, smart_future.SmartFuture):
model = model._resolve()
if model is not None:
- training_score, test_score = self.evaluate_model(
+ training_score, test_score = TrainingBlueprint.evaluate_model(
model,
self.X_train_scaled,
self.y_train,
print(msg)
logger.info(msg)
+ assert best_training_score is not None
+ assert best_test_score is not None
+ assert best_params is not None
(
scaler_filename,
model_filename,
)
@par.parallelize(method=par.Method.THREAD)
- def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
+ def read_files_from_list(self, files: List[str]) -> Tuple[List, List]:
# All features
X = []
try:
(key, value) = line.split(self.spec.key_value_delimiter)
except Exception:
- logger.debug(
- f"WARNING: bad line in file {filename} '{line}', skipped"
- )
+ logger.debug("WARNING: bad line in file %s '%s', skipped", filename, line)
continue
key = key.strip()
value = value.strip()
- if (
- self.spec.features_to_skip is not None
- and key in self.spec.features_to_skip
- ):
- logger.debug(f"Skipping feature {key}")
+ if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
+ logger.debug("Skipping feature %s", key)
continue
- value = self.normalize_feature(value)
+ value = TrainingBlueprint.normalize_feature(value)
if key == self.spec.label:
y.append(value)
results = []
all_files = glob.glob(self.spec.file_glob)
self.total_file_count = len(all_files)
- for n, files in enumerate(list_utils.shard(all_files, 500)):
+ for files in list_utils.shard(all_files, 500):
file_list = list(files)
- results.append(self.read_files_from_list(file_list, n))
+ results.append(self.read_files_from_list(file_list))
for result in smart_future.wait_any(results, callback=self.make_progress_graph):
result = result._resolve()
print(" " * 80 + "\n")
return (X, y)
- def normalize_feature(self, value: str) -> Any:
+ @staticmethod
+ def normalize_feature(value: str) -> Any:
if value in ("False", "None"):
ret = 0
elif value == "True":
ret = int(value)
return ret
- def test_train_split(self, X, y) -> List:
+ @staticmethod
+ def test_train_split(X, y) -> List:
logger.debug("Performing test/train split")
return train_test_split(
X,
random_state=random.randrange(0, 1000),
)
- def scale_data(
- self, X_train: np.ndarray, X_test: np.ndarray
- ) -> Tuple[Any, np.ndarray, np.ndarray]:
+ @staticmethod
+ def scale_data(X_train: np.ndarray, X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
logger.debug("Scaling data")
scaler = MinMaxScaler()
scaler.fit(X_train)
# Note: children should implement. Consider using @parallelize.
@abstractmethod
- def train_model(
- self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray
- ) -> Any:
+ def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
pass
+ @staticmethod
def evaluate_model(
- self,
model: Any,
X_train_scaled: np.ndarray,
y_train: np.ndarray,
training_score = model.score(X_train_scaled, y_train) * 100.0
test_score = model.score(X_test_scaled, y_test) * 100.0
logger.info(
- f"Model evaluation results: test_score={test_score:.5f}, "
- f"train_score={training_score:.5f}"
+ "Model evaluation results: test_score=%.5f, train_score=%.5f",
+ test_score,
+ training_score,
)
return (training_score, test_score)
self.spec.persist_percentage_threshold is not None
and test_score > self.spec.persist_percentage_threshold
) or (
- not self.spec.quiet
- and input_utils.yn_response("Write the model? [y,n]: ") == "y"
+ not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
):
scaler_filename = f"{self.spec.basename}_scaler.sav"
- with open(scaler_filename, "wb") as f:
- pickle.dump(scaler, f)
+ with open(scaler_filename, "wb") as fb:
+ pickle.dump(scaler, fb)
msg = f"Wrote {scaler_filename}"
print(msg)
logger.info(msg)
model_filename = f"{self.spec.basename}_model.sav"
- with open(model_filename, "wb") as f:
- pickle.dump(model, f)
+ with open(model_filename, "wb") as fb:
+ pickle.dump(model, fb)
msg = f"Wrote {model_filename}"
print(msg)
logger.info(msg)