X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=ml%2Fmodel_trainer.py;h=07f7b99292c9c9a3c3ac3a685c40ca59ee1b9582;hb=903843730a9916105352c729e94136a755b5e529;hp=12ccb3c6c0508e61081d6791b12f9a8f5a5571f2;hpb=713a609bd19d491de03debf8a4a6ddf2540b13dc;p=python_utils.git diff --git a/ml/model_trainer.py b/ml/model_trainer.py index 12ccb3c..07f7b99 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -1,7 +1,10 @@ #!/usr/bin/env python3 -from __future__ import annotations +# © Copyright 2021-2022, Scott Gasch + +"""This is a blueprint for training sklearn ML models.""" +from __future__ import annotations import datetime import glob import logging @@ -9,10 +12,12 @@ import os import pickle import random import sys +import time import warnings from abc import ABC, abstractmethod +from dataclasses import dataclass from types import SimpleNamespace -from typing import Any, List, NamedTuple, Optional, Set, Tuple +from typing import Any, List, Optional, Set, Tuple import numpy as np from sklearn.model_selection import train_test_split # type:ignore @@ -25,7 +30,7 @@ import parallelize as par from ansi import bold, reset from decorator_utils import timed -logger = logging.getLogger(__file__) +logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"ML Model Trainer ({__file__})", @@ -56,6 +61,9 @@ group.add_argument( class InputSpec(SimpleNamespace): + """A collection of info needed to train the model provided by the + caller.""" + file_glob: str feature_count: int features_to_skip: Set[str] @@ -78,15 +86,20 @@ class InputSpec(SimpleNamespace): ) -class OutputSpec(NamedTuple): - model_filename: Optional[str] - model_info_filename: Optional[str] - scaler_filename: Optional[str] - training_score: np.float64 - test_score: np.float64 +@dataclass +class OutputSpec: + """Info about the results of training returned to the caller.""" + + model_filename: Optional[str] = None + model_info_filename: Optional[str] = None + scaler_filename: Optional[str] = None + training_score: np.float64 = np.float64(0.0) + test_score: np.float64 = np.float64(0.0) class TrainingBlueprint(ABC): + """The blueprint for doing the actual training.""" + def __init__(self): self.y_train = None self.y_test = None @@ -112,13 +125,13 @@ class TrainingBlueprint(ABC): y = np.array(y_) print("Doing random test/train split...") - X_train, X_test, self.y_train, self.y_test = self.test_train_split( + X_train, X_test, self.y_train, self.y_test = TrainingBlueprint.test_train_split( X, y, ) print("Scaling training data...") - scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data( + scaler, self.X_train_scaled, self.X_test_scaled = TrainingBlueprint.scale_data( X_train, X_test, ) @@ -131,6 +144,7 @@ class TrainingBlueprint(ABC): models.append(model) modelid_to_params[model.get_id()] = str(params) + all_models = {} best_model = None best_score: Optional[np.float64] = None best_test_score: Optional[np.float64] = None @@ -141,7 +155,7 @@ class TrainingBlueprint(ABC): if isinstance(model, smart_future.SmartFuture): model = model._resolve() if model is not None: - training_score, test_score = self.evaluate_model( + training_score, test_score = TrainingBlueprint.evaluate_model( model, self.X_train_scaled, self.y_train, @@ -149,6 +163,7 @@ class TrainingBlueprint(ABC): self.y_test, ) score = (training_score + test_score * 20) / 21 + all_models[params] = (score, training_score, test_score) if not self.spec.quiet: print( f"{bold()}{params}{reset()}: " @@ -165,15 +180,22 @@ class TrainingBlueprint(ABC): if not self.spec.quiet: print(f"New best score {best_score:.2f}% with params {params}") - if not self.spec.quiet: - executors.DefaultExecutors().shutdown() - msg = f"Done training; best test set score was: {best_test_score:.1f}%" - print(msg) - logger.info(msg) - + executors.DefaultExecutors().shutdown() assert best_training_score is not None assert best_test_score is not None assert best_params is not None + + if not self.spec.quiet: + time.sleep(1.0) + print('Done training...') + for params in all_models: + msg = f'{bold()}{params}{reset()}: score={all_models[params][0]:.2f}% ' + msg += f'({all_models[params][2]:.2f}% test, ' + msg += f'{all_models[params][1]:.2f}% train)' + if params == best_params: + msg += f'{bold()} <-- winner{reset()}' + print(msg) + ( scaler_filename, model_filename, @@ -195,7 +217,7 @@ class TrainingBlueprint(ABC): ) @par.parallelize(method=par.Method.THREAD) - def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]: + def read_files_from_list(self, files: List[str]) -> Tuple[List, List]: # All features X = [] @@ -218,16 +240,16 @@ class TrainingBlueprint(ABC): try: (key, value) = line.split(self.spec.key_value_delimiter) except Exception: - logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped") + logger.debug("WARNING: bad line in file %s '%s', skipped", filename, line) continue key = key.strip() value = value.strip() if self.spec.features_to_skip is not None and key in self.spec.features_to_skip: - logger.debug(f"Skipping feature {key}") + logger.debug("Skipping feature %s", key) continue - value = self.normalize_feature(value) + value = TrainingBlueprint.normalize_feature(value) if key == self.spec.label: y.append(value) @@ -256,9 +278,9 @@ class TrainingBlueprint(ABC): def make_progress_graph(self) -> None: if not self.spec.quiet: - from text_utils import progress_graph + from text_utils import bar_graph - progress_graph(self.file_done_count, self.total_file_count) + bar_graph(self.file_done_count, self.total_file_count) @timed def read_input_files(self): @@ -274,9 +296,9 @@ class TrainingBlueprint(ABC): results = [] all_files = glob.glob(self.spec.file_glob) self.total_file_count = len(all_files) - for n, files in enumerate(list_utils.shard(all_files, 500)): + for files in list_utils.shard(all_files, 500): file_list = list(files) - results.append(self.read_files_from_list(file_list, n)) + results.append(self.read_files_from_list(file_list)) for result in smart_future.wait_any(results, callback=self.make_progress_graph): result = result._resolve() @@ -288,7 +310,8 @@ class TrainingBlueprint(ABC): print(" " * 80 + "\n") return (X, y) - def normalize_feature(self, value: str) -> Any: + @staticmethod + def normalize_feature(value: str) -> Any: if value in ("False", "None"): ret = 0 elif value == "True": @@ -299,7 +322,8 @@ class TrainingBlueprint(ABC): ret = int(value) return ret - def test_train_split(self, X, y) -> List: + @staticmethod + def test_train_split(X, y) -> List: logger.debug("Performing test/train split") return train_test_split( X, @@ -307,9 +331,8 @@ class TrainingBlueprint(ABC): random_state=random.randrange(0, 1000), ) - def scale_data( - self, X_train: np.ndarray, X_test: np.ndarray - ) -> Tuple[Any, np.ndarray, np.ndarray]: + @staticmethod + def scale_data(X_train: np.ndarray, X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]: logger.debug("Scaling data") scaler = MinMaxScaler() scaler.fit(X_train) @@ -320,8 +343,8 @@ class TrainingBlueprint(ABC): def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any: pass + @staticmethod def evaluate_model( - self, model: Any, X_train_scaled: np.ndarray, y_train: np.ndarray, @@ -332,8 +355,9 @@ class TrainingBlueprint(ABC): training_score = model.score(X_train_scaled, y_train) * 100.0 test_score = model.score(X_test_scaled, y_test) * 100.0 logger.info( - f"Model evaluation results: test_score={test_score:.5f}, " - f"train_score={training_score:.5f}" + "Model evaluation results: test_score=%.5f, train_score=%.5f", + test_score, + training_score, ) return (training_score, test_score)