#!/usr/bin/env python3 # © Copyright 2021-2022, Scott Gasch """This is a blueprint for training sklearn ML models.""" from __future__ import annotations import datetime import glob import logging import os import pickle import random import sys import time import warnings from abc import ABC, abstractmethod from dataclasses import dataclass from types import SimpleNamespace from typing import Any, List, Optional, Set, Tuple import numpy as np from sklearn.model_selection import train_test_split # type:ignore from sklearn.preprocessing import MinMaxScaler # type: ignore import argparse_utils import config import executors import parallelize as par from ansi import bold, reset from decorator_utils import timed logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"ML Model Trainer ({__file__})", "Arguments related to training an ML model", ) parser.add_argument( "--ml_trainer_quiet", action="store_true", help="Don't prompt the user for anything.", ) parser.add_argument( "--ml_trainer_delete", action="store_true", help="Delete invalid/incomplete features files in addition to warning.", ) group = parser.add_mutually_exclusive_group() group.add_argument( "--ml_trainer_dry_run", action="store_true", help="Do not write a new model, just report efficacy.", ) group.add_argument( "--ml_trainer_persist_threshold", type=argparse_utils.valid_percentage, metavar='0..100', help="Persist the model if the test set score is >= this threshold.", ) class InputSpec(SimpleNamespace): """A collection of info needed to train the model provided by the caller.""" file_glob: str feature_count: int features_to_skip: Set[str] key_value_delimiter: str training_parameters: List label: str basename: str dry_run: Optional[bool] quiet: Optional[bool] persist_percentage_threshold: Optional[float] delete_bad_inputs: Optional[bool] @staticmethod def populate_from_config() -> InputSpec: return InputSpec( dry_run=config.config["ml_trainer_dry_run"], quiet=config.config["ml_trainer_quiet"], persist_percentage_threshold=config.config["ml_trainer_persist_threshold"], delete_bad_inputs=config.config["ml_trainer_delete"], ) @dataclass class OutputSpec: """Info about the results of training returned to the caller.""" model_filename: Optional[str] = None model_info_filename: Optional[str] = None scaler_filename: Optional[str] = None training_score: np.float64 = np.float64(0.0) test_score: np.float64 = np.float64(0.0) class TrainingBlueprint(ABC): """The blueprint for doing the actual training.""" def __init__(self): self.y_train = None self.y_test = None self.X_test_scaled = None self.X_train_scaled = None self.file_done_count = 0 self.total_file_count = 0 self.spec = None def train(self, spec: InputSpec) -> OutputSpec: import smart_future random.seed() self.spec = spec X_, y_ = self.read_input_files() num_examples = len(y_) # Every example's features X = np.array(X_) # Every example's label y = np.array(y_) print("Doing random test/train split...") X_train, X_test, self.y_train, self.y_test = TrainingBlueprint.test_train_split( X, y, ) print("Scaling training data...") scaler, self.X_train_scaled, self.X_test_scaled = TrainingBlueprint.scale_data( X_train, X_test, ) print("Training model(s)...") models = [] modelid_to_params = {} for params in self.spec.training_parameters: model = self.train_model(params, self.X_train_scaled, self.y_train) models.append(model) modelid_to_params[model.get_id()] = str(params) all_models = {} best_model = None best_score: Optional[np.float64] = None best_test_score: Optional[np.float64] = None best_training_score: Optional[np.float64] = None best_params = None for model in smart_future.wait_any(models): params = modelid_to_params[model.get_id()] if isinstance(model, smart_future.SmartFuture): model = model._resolve() if model is not None: training_score, test_score = TrainingBlueprint.evaluate_model( model, self.X_train_scaled, self.y_train, self.X_test_scaled, self.y_test, ) score = (training_score + test_score * 20) / 21 all_models[params] = (score, training_score, test_score) if not self.spec.quiet: print( f"{bold()}{params}{reset()}: " f"Training set score={training_score:.2f}%, " f"test set score={test_score:.2f}%", file=sys.stderr, ) if best_score is None or score > best_score: best_score = score best_test_score = test_score best_training_score = training_score best_model = model best_params = params if not self.spec.quiet: print(f"New best score {best_score:.2f}% with params {params}") executors.DefaultExecutors().shutdown() assert best_training_score is not None assert best_test_score is not None assert best_params is not None if not self.spec.quiet: time.sleep(1.0) print('Done training...') for params in all_models: msg = f'{bold()}{params}{reset()}: score={all_models[params][0]:.2f}% ' msg += f'({all_models[params][2]:.2f}% test, ' msg += f'{all_models[params][1]:.2f}% train)' if params == best_params: msg += f'{bold()} <-- winner{reset()}' print(msg) ( scaler_filename, model_filename, model_info_filename, ) = self.maybe_persist_scaler_and_model( best_training_score, best_test_score, best_params, num_examples, scaler, best_model, ) return OutputSpec( model_filename=model_filename, model_info_filename=model_info_filename, scaler_filename=scaler_filename, training_score=best_training_score, test_score=best_test_score, ) @par.parallelize(method=par.Method.THREAD) def read_files_from_list(self, files: List[str]) -> Tuple[List, List]: # All features X = [] # The label y = [] for filename in files: wrote_label = False with open(filename, "r") as f: lines = f.readlines() # This example's features x = [] for line in lines: # We expect lines in features files to be of the form: # # key: value line = line.strip() try: (key, value) = line.split(self.spec.key_value_delimiter) except Exception: logger.debug("WARNING: bad line in file %s '%s', skipped", filename, line) continue key = key.strip() value = value.strip() if self.spec.features_to_skip is not None and key in self.spec.features_to_skip: logger.debug("Skipping feature %s", key) continue value = TrainingBlueprint.normalize_feature(value) if key == self.spec.label: y.append(value) wrote_label = True else: x.append(value) # Make sure we saw a label and the requisite number of features. if len(x) == self.spec.feature_count and wrote_label: X.append(x) self.file_done_count += 1 else: if wrote_label: y.pop() if self.spec.delete_bad_inputs: msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING." logger.warning(msg) warnings.warn(msg) os.remove(filename) else: msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping." logger.warning(msg) warnings.warn(msg) return (X, y) def make_progress_graph(self) -> None: if not self.spec.quiet: from text_utils import bar_graph bar_graph(self.file_done_count, self.total_file_count) @timed def read_input_files(self): import list_utils import smart_future # All features X = [] # The label y = [] results = [] all_files = glob.glob(self.spec.file_glob) self.total_file_count = len(all_files) for files in list_utils.shard(all_files, 500): file_list = list(files) results.append(self.read_files_from_list(file_list)) for result in smart_future.wait_any(results, callback=self.make_progress_graph): result = result._resolve() for z in result[0]: X.append(z) for z in result[1]: y.append(z) if not self.spec.quiet: print(" " * 80 + "\n") return (X, y) @staticmethod def normalize_feature(value: str) -> Any: if value in ("False", "None"): ret = 0 elif value == "True": ret = 255 elif isinstance(value, str) and "." in value: ret = round(float(value) * 100.0) else: ret = int(value) return ret @staticmethod def test_train_split(X, y) -> List: logger.debug("Performing test/train split") return train_test_split( X, y, random_state=random.randrange(0, 1000), ) @staticmethod def scale_data(X_train: np.ndarray, X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]: logger.debug("Scaling data") scaler = MinMaxScaler() scaler.fit(X_train) return (scaler, scaler.transform(X_train), scaler.transform(X_test)) # Note: children should implement. Consider using @parallelize. @abstractmethod def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any: pass @staticmethod def evaluate_model( model: Any, X_train_scaled: np.ndarray, y_train: np.ndarray, X_test_scaled: np.ndarray, y_test: np.ndarray, ) -> Tuple[np.float64, np.float64]: logger.debug("Evaluating the model") training_score = model.score(X_train_scaled, y_train) * 100.0 test_score = model.score(X_test_scaled, y_test) * 100.0 logger.info( "Model evaluation results: test_score=%.5f, train_score=%.5f", test_score, training_score, ) return (training_score, test_score) def maybe_persist_scaler_and_model( self, training_score: np.float64, test_score: np.float64, params: str, num_examples: int, scaler: Any, model: Any, ) -> Tuple[Optional[str], Optional[str], Optional[str]]: if not self.spec.dry_run: import datetime_utils import input_utils import string_utils now: datetime.datetime = datetime_utils.now_pacific() info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} Model params: {params} Training examples: {num_examples} Training set score: {training_score:.2f}% Testing set score: {test_score:.2f}%""" print(f'\n{info}\n') if ( self.spec.persist_percentage_threshold is not None and test_score > self.spec.persist_percentage_threshold ) or ( not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y" ): scaler_filename = f"{self.spec.basename}_scaler.sav" with open(scaler_filename, "wb") as fb: pickle.dump(scaler, fb) msg = f"Wrote {scaler_filename}" print(msg) logger.info(msg) model_filename = f"{self.spec.basename}_model.sav" with open(model_filename, "wb") as fb: pickle.dump(model, fb) msg = f"Wrote {model_filename}" print(msg) logger.info(msg) model_info_filename = f"{self.spec.basename}_model_info.txt" with open(model_info_filename, "w") as f: f.write(info) msg = f"Wrote {model_info_filename}:" print(msg) logger.info(msg) print(string_utils.indent(info, 2)) logger.info(info) return (scaler_filename, model_filename, model_info_filename) return (None, None, None)