#!/usr/bin/env python3 from __future__ import annotations import datetime import glob import logging import os import pickle import random import sys import warnings from abc import ABC, abstractmethod from types import SimpleNamespace from typing import Any, List, NamedTuple, Optional, Set, Tuple import numpy as np from sklearn.model_selection import train_test_split # type:ignore from sklearn.preprocessing import MinMaxScaler # type: ignore import argparse_utils import config import executors import parallelize as par from ansi import bold, reset from decorator_utils import timed logger = logging.getLogger(__file__) parser = config.add_commandline_args( f"ML Model Trainer ({__file__})", "Arguments related to training an ML model", ) parser.add_argument( "--ml_trainer_quiet", action="store_true", help="Don't prompt the user for anything.", ) parser.add_argument( "--ml_trainer_delete", action="store_true", help="Delete invalid/incomplete features files in addition to warning.", ) group = parser.add_mutually_exclusive_group() group.add_argument( "--ml_trainer_dry_run", action="store_true", help="Do not write a new model, just report efficacy.", ) group.add_argument( "--ml_trainer_persist_threshold", type=argparse_utils.valid_percentage, metavar='0..100', help="Persist the model if the test set score is >= this threshold.", ) class InputSpec(SimpleNamespace): file_glob: str feature_count: int features_to_skip: Set[str] key_value_delimiter: str training_parameters: List label: str basename: str dry_run: Optional[bool] quiet: Optional[bool] persist_percentage_threshold: Optional[float] delete_bad_inputs: Optional[bool] @staticmethod def populate_from_config() -> InputSpec: return InputSpec( dry_run=config.config["ml_trainer_dry_run"], quiet=config.config["ml_trainer_quiet"], persist_percentage_threshold=config.config["ml_trainer_persist_threshold"], delete_bad_inputs=config.config["ml_trainer_delete"], ) class OutputSpec(NamedTuple): model_filename: Optional[str] model_info_filename: Optional[str] scaler_filename: Optional[str] training_score: np.float64 test_score: np.float64 class TrainingBlueprint(ABC): def __init__(self): self.y_train = None self.y_test = None self.X_test_scaled = None self.X_train_scaled = None self.file_done_count = 0 self.total_file_count = 0 self.spec = None def train(self, spec: InputSpec) -> OutputSpec: import smart_future random.seed() self.spec = spec X_, y_ = self.read_input_files() num_examples = len(y_) # Every example's features X = np.array(X_) # Every example's label y = np.array(y_) print("Doing random test/train split...") X_train, X_test, self.y_train, self.y_test = self.test_train_split( X, y, ) print("Scaling training data...") scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data( X_train, X_test, ) print("Training model(s)...") models = [] modelid_to_params = {} for params in self.spec.training_parameters: model = self.train_model(params, self.X_train_scaled, self.y_train) models.append(model) modelid_to_params[model.get_id()] = str(params) best_model = None best_score: Optional[np.float64] = None best_test_score: Optional[np.float64] = None best_training_score: Optional[np.float64] = None best_params = None for model in smart_future.wait_any(models): params = modelid_to_params[model.get_id()] if isinstance(model, smart_future.SmartFuture): model = model._resolve() if model is not None: training_score, test_score = self.evaluate_model( model, self.X_train_scaled, self.y_train, self.X_test_scaled, self.y_test, ) score = (training_score + test_score * 20) / 21 if not self.spec.quiet: print( f"{bold()}{params}{reset()}: " f"Training set score={training_score:.2f}%, " f"test set score={test_score:.2f}%", file=sys.stderr, ) if best_score is None or score > best_score: best_score = score best_test_score = test_score best_training_score = training_score best_model = model best_params = params if not self.spec.quiet: print(f"New best score {best_score:.2f}% with params {params}") if not self.spec.quiet: executors.DefaultExecutors().shutdown() msg = f"Done training; best test set score was: {best_test_score:.1f}%" print(msg) logger.info(msg) assert best_training_score is not None assert best_test_score is not None assert best_params is not None ( scaler_filename, model_filename, model_info_filename, ) = self.maybe_persist_scaler_and_model( best_training_score, best_test_score, best_params, num_examples, scaler, best_model, ) return OutputSpec( model_filename=model_filename, model_info_filename=model_info_filename, scaler_filename=scaler_filename, training_score=best_training_score, test_score=best_test_score, ) @par.parallelize(method=par.Method.THREAD) def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]: # All features X = [] # The label y = [] for filename in files: wrote_label = False with open(filename, "r") as f: lines = f.readlines() # This example's features x = [] for line in lines: # We expect lines in features files to be of the form: # # key: value line = line.strip() try: (key, value) = line.split(self.spec.key_value_delimiter) except Exception: logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped") continue key = key.strip() value = value.strip() if self.spec.features_to_skip is not None and key in self.spec.features_to_skip: logger.debug(f"Skipping feature {key}") continue value = self.normalize_feature(value) if key == self.spec.label: y.append(value) wrote_label = True else: x.append(value) # Make sure we saw a label and the requisite number of features. if len(x) == self.spec.feature_count and wrote_label: X.append(x) self.file_done_count += 1 else: if wrote_label: y.pop() if self.spec.delete_bad_inputs: msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING." logger.warning(msg) warnings.warn(msg) os.remove(filename) else: msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping." logger.warning(msg) warnings.warn(msg) return (X, y) def make_progress_graph(self) -> None: if not self.spec.quiet: from text_utils import progress_graph progress_graph(self.file_done_count, self.total_file_count) @timed def read_input_files(self): import list_utils import smart_future # All features X = [] # The label y = [] results = [] all_files = glob.glob(self.spec.file_glob) self.total_file_count = len(all_files) for n, files in enumerate(list_utils.shard(all_files, 500)): file_list = list(files) results.append(self.read_files_from_list(file_list, n)) for result in smart_future.wait_any(results, callback=self.make_progress_graph): result = result._resolve() for z in result[0]: X.append(z) for z in result[1]: y.append(z) if not self.spec.quiet: print(" " * 80 + "\n") return (X, y) def normalize_feature(self, value: str) -> Any: if value in ("False", "None"): ret = 0 elif value == "True": ret = 255 elif isinstance(value, str) and "." in value: ret = round(float(value) * 100.0) else: ret = int(value) return ret def test_train_split(self, X, y) -> List: logger.debug("Performing test/train split") return train_test_split( X, y, random_state=random.randrange(0, 1000), ) def scale_data( self, X_train: np.ndarray, X_test: np.ndarray ) -> Tuple[Any, np.ndarray, np.ndarray]: logger.debug("Scaling data") scaler = MinMaxScaler() scaler.fit(X_train) return (scaler, scaler.transform(X_train), scaler.transform(X_test)) # Note: children should implement. Consider using @parallelize. @abstractmethod def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any: pass def evaluate_model( self, model: Any, X_train_scaled: np.ndarray, y_train: np.ndarray, X_test_scaled: np.ndarray, y_test: np.ndarray, ) -> Tuple[np.float64, np.float64]: logger.debug("Evaluating the model") training_score = model.score(X_train_scaled, y_train) * 100.0 test_score = model.score(X_test_scaled, y_test) * 100.0 logger.info( f"Model evaluation results: test_score={test_score:.5f}, " f"train_score={training_score:.5f}" ) return (training_score, test_score) def maybe_persist_scaler_and_model( self, training_score: np.float64, test_score: np.float64, params: str, num_examples: int, scaler: Any, model: Any, ) -> Tuple[Optional[str], Optional[str], Optional[str]]: if not self.spec.dry_run: import datetime_utils import input_utils import string_utils now: datetime.datetime = datetime_utils.now_pacific() info = f"""Timestamp: {datetime_utils.datetime_to_string(now)} Model params: {params} Training examples: {num_examples} Training set score: {training_score:.2f}% Testing set score: {test_score:.2f}%""" print(f'\n{info}\n') if ( self.spec.persist_percentage_threshold is not None and test_score > self.spec.persist_percentage_threshold ) or ( not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y" ): scaler_filename = f"{self.spec.basename}_scaler.sav" with open(scaler_filename, "wb") as fb: pickle.dump(scaler, fb) msg = f"Wrote {scaler_filename}" print(msg) logger.info(msg) model_filename = f"{self.spec.basename}_model.sav" with open(model_filename, "wb") as fb: pickle.dump(model, fb) msg = f"Wrote {model_filename}" print(msg) logger.info(msg) model_info_filename = f"{self.spec.basename}_model_info.txt" with open(model_info_filename, "w") as f: f.write(info) msg = f"Wrote {model_info_filename}:" print(msg) logger.info(msg) print(string_utils.indent(info, 2)) logger.info(info) return (scaler_filename, model_filename, model_info_filename) return (None, None, None)