Reduce the doctest lease duration...
[python_utils.git] / ml / model_trainer.py
index 6fc0da071e7dd6e2e026b8bbb7d7780c81015985..07f7b99292c9c9a3c3ac3a685c40ca59ee1b9582 100644 (file)
@@ -1,7 +1,10 @@
 #!/usr/bin/env python3
 
-from __future__ import annotations
+# © Copyright 2021-2022, Scott Gasch
+
+"""This is a blueprint for training sklearn ML models."""
 
+from __future__ import annotations
 import datetime
 import glob
 import logging
@@ -9,10 +12,12 @@ import os
 import pickle
 import random
 import sys
+import time
 import warnings
 from abc import ABC, abstractmethod
+from dataclasses import dataclass
 from types import SimpleNamespace
-from typing import Any, List, NamedTuple, Optional, Set, Tuple
+from typing import Any, List, Optional, Set, Tuple
 
 import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
@@ -25,10 +30,11 @@ import parallelize as par
 from ansi import bold, reset
 from decorator_utils import timed
 
-logger = logging.getLogger(__file__)
+logger = logging.getLogger(__name__)
 
 parser = config.add_commandline_args(
-    f"ML Model Trainer ({__file__})", "Arguments related to training an ML model"
+    f"ML Model Trainer ({__file__})",
+    "Arguments related to training an ML model",
 )
 parser.add_argument(
     "--ml_trainer_quiet",
@@ -55,6 +61,9 @@ group.add_argument(
 
 
 class InputSpec(SimpleNamespace):
+    """A collection of info needed to train the model provided by the
+    caller."""
+
     file_glob: str
     feature_count: int
     features_to_skip: Set[str]
@@ -77,15 +86,20 @@ class InputSpec(SimpleNamespace):
         )
 
 
-class OutputSpec(NamedTuple):
-    model_filename: Optional[str]
-    model_info_filename: Optional[str]
-    scaler_filename: Optional[str]
-    training_score: np.float64
-    test_score: np.float64
+@dataclass
+class OutputSpec:
+    """Info about the results of training returned to the caller."""
+
+    model_filename: Optional[str] = None
+    model_info_filename: Optional[str] = None
+    scaler_filename: Optional[str] = None
+    training_score: np.float64 = np.float64(0.0)
+    test_score: np.float64 = np.float64(0.0)
 
 
 class TrainingBlueprint(ABC):
+    """The blueprint for doing the actual training."""
+
     def __init__(self):
         self.y_train = None
         self.y_test = None
@@ -111,13 +125,13 @@ class TrainingBlueprint(ABC):
         y = np.array(y_)
 
         print("Doing random test/train split...")
-        X_train, X_test, self.y_train, self.y_test = self.test_train_split(
+        X_train, X_test, self.y_train, self.y_test = TrainingBlueprint.test_train_split(
             X,
             y,
         )
 
         print("Scaling training data...")
-        scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data(
+        scaler, self.X_train_scaled, self.X_test_scaled = TrainingBlueprint.scale_data(
             X_train,
             X_test,
         )
@@ -130,6 +144,7 @@ class TrainingBlueprint(ABC):
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
+        all_models = {}
         best_model = None
         best_score: Optional[np.float64] = None
         best_test_score: Optional[np.float64] = None
@@ -140,7 +155,7 @@ class TrainingBlueprint(ABC):
             if isinstance(model, smart_future.SmartFuture):
                 model = model._resolve()
             if model is not None:
-                training_score, test_score = self.evaluate_model(
+                training_score, test_score = TrainingBlueprint.evaluate_model(
                     model,
                     self.X_train_scaled,
                     self.y_train,
@@ -148,6 +163,7 @@ class TrainingBlueprint(ABC):
                     self.y_test,
                 )
                 score = (training_score + test_score * 20) / 21
+                all_models[params] = (score, training_score, test_score)
                 if not self.spec.quiet:
                     print(
                         f"{bold()}{params}{reset()}: "
@@ -164,15 +180,22 @@ class TrainingBlueprint(ABC):
                     if not self.spec.quiet:
                         print(f"New best score {best_score:.2f}% with params {params}")
 
+        executors.DefaultExecutors().shutdown()
+        assert best_training_score is not None
+        assert best_test_score is not None
+        assert best_params is not None
+
         if not self.spec.quiet:
-            executors.DefaultExecutors().shutdown()
-            msg = f"Done training; best test set score was: {best_test_score:.1f}%"
-            print(msg)
-            logger.info(msg)
-
-        assert best_training_score
-        assert best_test_score
-        assert best_params
+            time.sleep(1.0)
+            print('Done training...')
+            for params in all_models:
+                msg = f'{bold()}{params}{reset()}: score={all_models[params][0]:.2f}% '
+                msg += f'({all_models[params][2]:.2f}% test, '
+                msg += f'{all_models[params][1]:.2f}% train)'
+                if params == best_params:
+                    msg += f'{bold()} <-- winner{reset()}'
+                print(msg)
+
         (
             scaler_filename,
             model_filename,
@@ -194,7 +217,7 @@ class TrainingBlueprint(ABC):
         )
 
     @par.parallelize(method=par.Method.THREAD)
-    def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
+    def read_files_from_list(self, files: List[str]) -> Tuple[List, List]:
         # All features
         X = []
 
@@ -217,21 +240,16 @@ class TrainingBlueprint(ABC):
                 try:
                     (key, value) = line.split(self.spec.key_value_delimiter)
                 except Exception:
-                    logger.debug(
-                        f"WARNING: bad line in file {filename} '{line}', skipped"
-                    )
+                    logger.debug("WARNING: bad line in file %s '%s', skipped", filename, line)
                     continue
 
                 key = key.strip()
                 value = value.strip()
-                if (
-                    self.spec.features_to_skip is not None
-                    and key in self.spec.features_to_skip
-                ):
-                    logger.debug(f"Skipping feature {key}")
+                if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
+                    logger.debug("Skipping feature %s", key)
                     continue
 
-                value = self.normalize_feature(value)
+                value = TrainingBlueprint.normalize_feature(value)
 
                 if key == self.spec.label:
                     y.append(value)
@@ -260,9 +278,9 @@ class TrainingBlueprint(ABC):
 
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
-            from text_utils import progress_graph
+            from text_utils import bar_graph
 
-            progress_graph(self.file_done_count, self.total_file_count)
+            bar_graph(self.file_done_count, self.total_file_count)
 
     @timed
     def read_input_files(self):
@@ -278,9 +296,9 @@ class TrainingBlueprint(ABC):
         results = []
         all_files = glob.glob(self.spec.file_glob)
         self.total_file_count = len(all_files)
-        for n, files in enumerate(list_utils.shard(all_files, 500)):
+        for files in list_utils.shard(all_files, 500):
             file_list = list(files)
-            results.append(self.read_files_from_list(file_list, n))
+            results.append(self.read_files_from_list(file_list))
 
         for result in smart_future.wait_any(results, callback=self.make_progress_graph):
             result = result._resolve()
@@ -292,7 +310,8 @@ class TrainingBlueprint(ABC):
             print(" " * 80 + "\n")
         return (X, y)
 
-    def normalize_feature(self, value: str) -> Any:
+    @staticmethod
+    def normalize_feature(value: str) -> Any:
         if value in ("False", "None"):
             ret = 0
         elif value == "True":
@@ -303,7 +322,8 @@ class TrainingBlueprint(ABC):
             ret = int(value)
         return ret
 
-    def test_train_split(self, X, y) -> List:
+    @staticmethod
+    def test_train_split(X, y) -> List:
         logger.debug("Performing test/train split")
         return train_test_split(
             X,
@@ -311,9 +331,8 @@ class TrainingBlueprint(ABC):
             random_state=random.randrange(0, 1000),
         )
 
-    def scale_data(
-        self, X_train: np.ndarray, X_test: np.ndarray
-    ) -> Tuple[Any, np.ndarray, np.ndarray]:
+    @staticmethod
+    def scale_data(X_train: np.ndarray, X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
@@ -321,13 +340,11 @@ class TrainingBlueprint(ABC):
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
-    def train_model(
-        self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray
-    ) -> Any:
+    def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
         pass
 
+    @staticmethod
     def evaluate_model(
-        self,
         model: Any,
         X_train_scaled: np.ndarray,
         y_train: np.ndarray,
@@ -338,8 +355,9 @@ class TrainingBlueprint(ABC):
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
         logger.info(
-            f"Model evaluation results: test_score={test_score:.5f}, "
-            f"train_score={training_score:.5f}"
+            "Model evaluation results: test_score=%.5f, train_score=%.5f",
+            test_score,
+            training_score,
         )
         return (training_score, test_score)
 
@@ -368,8 +386,7 @@ Testing set score: {test_score:.2f}%"""
                 self.spec.persist_percentage_threshold is not None
                 and test_score > self.spec.persist_percentage_threshold
             ) or (
-                not self.spec.quiet
-                and input_utils.yn_response("Write the model? [y,n]: ") == "y"
+                not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
                 with open(scaler_filename, "wb") as fb: