X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=ml%2Fmodel_trainer.py;h=12ccb3c6c0508e61081d6791b12f9a8f5a5571f2;hb=713a609bd19d491de03debf8a4a6ddf2540b13dc;hp=f9e132e18aa20ecf2461db55257b6037a0c13a4e;hpb=bef486c8c06e8d743a98b89910658a615acc8bbc;p=python_utils.git diff --git a/ml/model_trainer.py b/ml/model_trainer.py index f9e132e..12ccb3c 100644 --- a/ml/model_trainer.py +++ b/ml/model_trainer.py @@ -2,7 +2,6 @@ from __future__ import annotations -from abc import ABC, abstractmethod import datetime import glob import logging @@ -10,6 +9,8 @@ import os import pickle import random import sys +import warnings +from abc import ABC, abstractmethod from types import SimpleNamespace from typing import Any, List, NamedTuple, Optional, Set, Tuple @@ -17,27 +18,28 @@ import numpy as np from sklearn.model_selection import train_test_split # type:ignore from sklearn.preprocessing import MinMaxScaler # type: ignore -from ansi import bold, reset import argparse_utils import config -from decorator_utils import timed +import executors import parallelize as par +from ansi import bold, reset +from decorator_utils import timed logger = logging.getLogger(__file__) parser = config.add_commandline_args( f"ML Model Trainer ({__file__})", - "Arguments related to training an ML model" + "Arguments related to training an ML model", ) parser.add_argument( "--ml_trainer_quiet", action="store_true", - help="Don't prompt the user for anything." + help="Don't prompt the user for anything.", ) parser.add_argument( "--ml_trainer_delete", action="store_true", - help="Delete invalid/incomplete features files in addition to warning." + help="Delete invalid/incomplete features files in addition to warning.", ) group = parser.add_mutually_exclusive_group() group.add_argument( @@ -69,10 +71,10 @@ class InputSpec(SimpleNamespace): @staticmethod def populate_from_config() -> InputSpec: return InputSpec( - dry_run = config.config["ml_trainer_dry_run"], - quiet = config.config["ml_trainer_quiet"], - persist_percentage_threshold = config.config["ml_trainer_persist_threshold"], - delete_bad_inputs = config.config["ml_trainer_delete"], + dry_run=config.config["ml_trainer_dry_run"], + quiet=config.config["ml_trainer_quiet"], + persist_percentage_threshold=config.config["ml_trainer_persist_threshold"], + delete_bad_inputs=config.config["ml_trainer_delete"], ) @@ -80,8 +82,8 @@ class OutputSpec(NamedTuple): model_filename: Optional[str] model_info_filename: Optional[str] scaler_filename: Optional[str] - training_score: float - test_score: float + training_score: np.float64 + test_score: np.float64 class TrainingBlueprint(ABC): @@ -125,18 +127,14 @@ class TrainingBlueprint(ABC): models = [] modelid_to_params = {} for params in self.spec.training_parameters: - model = self.train_model( - params, - self.X_train_scaled, - self.y_train - ) + model = self.train_model(params, self.X_train_scaled, self.y_train) models.append(model) modelid_to_params[model.get_id()] = str(params) best_model = None - best_score = None - best_test_score = None - best_training_score = None + best_score: Optional[np.float64] = None + best_test_score: Optional[np.float64] = None + best_training_score: Optional[np.float64] = None best_params = None for model in smart_future.wait_any(models): params = modelid_to_params[model.get_id()] @@ -165,38 +163,39 @@ class TrainingBlueprint(ABC): best_model = model best_params = params if not self.spec.quiet: - print( - f"New best score {best_score:.2f}% with params {params}" - ) + print(f"New best score {best_score:.2f}% with params {params}") if not self.spec.quiet: + executors.DefaultExecutors().shutdown() msg = f"Done training; best test set score was: {best_test_score:.1f}%" print(msg) logger.info(msg) - scaler_filename, model_filename, model_info_filename = ( - self.maybe_persist_scaler_and_model( - best_training_score, - best_test_score, - best_params, - num_examples, - scaler, - best_model, - ) + + assert best_training_score is not None + assert best_test_score is not None + assert best_params is not None + ( + scaler_filename, + model_filename, + model_info_filename, + ) = self.maybe_persist_scaler_and_model( + best_training_score, + best_test_score, + best_params, + num_examples, + scaler, + best_model, ) return OutputSpec( - model_filename = model_filename, - model_info_filename = model_info_filename, - scaler_filename = scaler_filename, - training_score = best_training_score, - test_score = best_test_score, + model_filename=model_filename, + model_info_filename=model_info_filename, + scaler_filename=scaler_filename, + training_score=best_training_score, + test_score=best_test_score, ) @par.parallelize(method=par.Method.THREAD) - def read_files_from_list( - self, - files: List[str], - n: int - ) -> Tuple[List, List]: + def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]: # All features X = [] @@ -224,8 +223,7 @@ class TrainingBlueprint(ABC): key = key.strip() value = value.strip() - if (self.spec.features_to_skip is not None - and key in self.spec.features_to_skip): + if self.spec.features_to_skip is not None and key in self.spec.features_to_skip: logger.debug(f"Skipping feature {key}") continue @@ -246,23 +244,21 @@ class TrainingBlueprint(ABC): y.pop() if self.spec.delete_bad_inputs: - msg = f"WARNING: {filename}: missing features or label. DELETING." - print(msg, file=sys.stderr) + msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING." logger.warning(msg) + warnings.warn(msg) os.remove(filename) else: - msg = f"WARNING: {filename}: missing features or label. Skipped." - print(msg, file=sys.stderr) + msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping." logger.warning(msg) + warnings.warn(msg) return (X, y) def make_progress_graph(self) -> None: if not self.spec.quiet: from text_utils import progress_graph - progress_graph( - self.file_done_count, - self.total_file_count - ) + + progress_graph(self.file_done_count, self.total_file_count) @timed def read_input_files(self): @@ -311,9 +307,9 @@ class TrainingBlueprint(ABC): random_state=random.randrange(0, 1000), ) - def scale_data(self, - X_train: np.ndarray, - X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]: + def scale_data( + self, X_train: np.ndarray, X_test: np.ndarray + ) -> Tuple[Any, np.ndarray, np.ndarray]: logger.debug("Scaling data") scaler = MinMaxScaler() scaler.fit(X_train) @@ -321,19 +317,17 @@ class TrainingBlueprint(ABC): # Note: children should implement. Consider using @parallelize. @abstractmethod - def train_model(self, - parameters, - X_train_scaled: np.ndarray, - y_train: np.ndarray) -> Any: + def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any: pass def evaluate_model( - self, - model: Any, - X_train_scaled: np.ndarray, - y_train: np.ndarray, - X_test_scaled: np.ndarray, - y_test: np.ndarray) -> Tuple[np.float64, np.float64]: + self, + model: Any, + X_train_scaled: np.ndarray, + y_train: np.ndarray, + X_test_scaled: np.ndarray, + y_test: np.ndarray, + ) -> Tuple[np.float64, np.float64]: logger.debug("Evaluating the model") training_score = model.score(X_train_scaled, y_train) * 100.0 test_score = model.score(X_test_scaled, y_test) * 100.0 @@ -344,44 +338,45 @@ class TrainingBlueprint(ABC): return (training_score, test_score) def maybe_persist_scaler_and_model( - self, - training_score: np.float64, - test_score: np.float64, - params: str, - num_examples: int, - scaler: Any, - model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]: + self, + training_score: np.float64, + test_score: np.float64, + params: str, + num_examples: int, + scaler: Any, + model: Any, + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: if not self.spec.dry_run: import datetime_utils import input_utils import string_utils + now: datetime.datetime = datetime_utils.now_pacific() + info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} +Model params: {params} +Training examples: {num_examples} +Training set score: {training_score:.2f}% +Testing set score: {test_score:.2f}%""" + print(f'\n{info}\n') if ( - (self.spec.persist_percentage_threshold is not None and - test_score > self.spec.persist_percentage_threshold) - or - (not self.spec.quiet - and input_utils.yn_response("Write the model? [y,n]: ") == "y") + self.spec.persist_percentage_threshold is not None + and test_score > self.spec.persist_percentage_threshold + ) or ( + not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y" ): scaler_filename = f"{self.spec.basename}_scaler.sav" - with open(scaler_filename, "wb") as f: - pickle.dump(scaler, f) + with open(scaler_filename, "wb") as fb: + pickle.dump(scaler, fb) msg = f"Wrote {scaler_filename}" print(msg) logger.info(msg) model_filename = f"{self.spec.basename}_model.sav" - with open(model_filename, "wb") as f: - pickle.dump(model, f) + with open(model_filename, "wb") as fb: + pickle.dump(model, fb) msg = f"Wrote {model_filename}" print(msg) logger.info(msg) model_info_filename = f"{self.spec.basename}_model_info.txt" - now: datetime.datetime = datetime_utils.now_pacific() - info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} -Model params: {params} -Training examples: {num_examples} -Training set score: {training_score:.2f}% -Testing set score: {test_score:.2f}%""" with open(model_info_filename, "w") as f: f.write(info) msg = f"Wrote {model_info_filename}:"