Change settings in flake8 and black.
[python_utils.git] / ml / model_trainer.py
index acd721868a2a9e04de0da364b8d37dcc268b4fee..12ccb3c6c0508e61081d6791b12f9a8f5a5571f2 100644 (file)
@@ -2,7 +2,6 @@
 
 from __future__ import annotations
 
-from abc import ABC, abstractmethod
 import datetime
 import glob
 import logging
@@ -10,6 +9,8 @@ import os
 import pickle
 import random
 import sys
+import warnings
+from abc import ABC, abstractmethod
 from types import SimpleNamespace
 from typing import Any, List, NamedTuple, Optional, Set, Tuple
 
@@ -17,27 +18,28 @@ import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
 from sklearn.preprocessing import MinMaxScaler  # type: ignore
 
-from ansi import bold, reset
 import argparse_utils
 import config
-from decorator_utils import timed
+import executors
 import parallelize as par
+from ansi import bold, reset
+from decorator_utils import timed
 
 logger = logging.getLogger(__file__)
 
 parser = config.add_commandline_args(
     f"ML Model Trainer ({__file__})",
-    "Arguments related to training an ML model"
+    "Arguments related to training an ML model",
 )
 parser.add_argument(
     "--ml_trainer_quiet",
     action="store_true",
-    help="Don't prompt the user for anything."
+    help="Don't prompt the user for anything.",
 )
 parser.add_argument(
     "--ml_trainer_delete",
     action="store_true",
-    help="Delete invalid/incomplete features files in addition to warning."
+    help="Delete invalid/incomplete features files in addition to warning.",
 )
 group = parser.add_mutually_exclusive_group()
 group.add_argument(
@@ -69,10 +71,10 @@ class InputSpec(SimpleNamespace):
     @staticmethod
     def populate_from_config() -> InputSpec:
         return InputSpec(
-            dry_run = config.config["ml_trainer_dry_run"],
-            quiet = config.config["ml_trainer_quiet"],
-            persist_percentage_threshold = config.config["ml_trainer_persist_threshold"],
-            delete_bad_inputs = config.config["ml_trainer_delete"],
+            dry_run=config.config["ml_trainer_dry_run"],
+            quiet=config.config["ml_trainer_quiet"],
+            persist_percentage_threshold=config.config["ml_trainer_persist_threshold"],
+            delete_bad_inputs=config.config["ml_trainer_delete"],
         )
 
 
@@ -80,8 +82,8 @@ class OutputSpec(NamedTuple):
     model_filename: Optional[str]
     model_info_filename: Optional[str]
     scaler_filename: Optional[str]
-    training_score: float
-    test_score: float
+    training_score: np.float64
+    test_score: np.float64
 
 
 class TrainingBlueprint(ABC):
@@ -125,18 +127,14 @@ class TrainingBlueprint(ABC):
         models = []
         modelid_to_params = {}
         for params in self.spec.training_parameters:
-            model = self.train_model(
-                params,
-                self.X_train_scaled,
-                self.y_train
-            )
+            model = self.train_model(params, self.X_train_scaled, self.y_train)
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
         best_model = None
-        best_score = None
-        best_test_score = None
-        best_training_score = None
+        best_score: Optional[np.float64] = None
+        best_test_score: Optional[np.float64] = None
+        best_training_score: Optional[np.float64] = None
         best_params = None
         for model in smart_future.wait_any(models):
             params = modelid_to_params[model.get_id()]
@@ -165,38 +163,39 @@ class TrainingBlueprint(ABC):
                     best_model = model
                     best_params = params
                     if not self.spec.quiet:
-                        print(
-                            f"New best score {best_score:.2f}% with params {params}"
-                        )
+                        print(f"New best score {best_score:.2f}% with params {params}")
 
         if not self.spec.quiet:
+            executors.DefaultExecutors().shutdown()
             msg = f"Done training; best test set score was: {best_test_score:.1f}%"
             print(msg)
             logger.info(msg)
-        scaler_filename, model_filename, model_info_filename = (
-            self.maybe_persist_scaler_and_model(
-                best_training_score,
-                best_test_score,
-                best_params,
-                num_examples,
-                scaler,
-                best_model,
-            )
+
+        assert best_training_score is not None
+        assert best_test_score is not None
+        assert best_params is not None
+        (
+            scaler_filename,
+            model_filename,
+            model_info_filename,
+        ) = self.maybe_persist_scaler_and_model(
+            best_training_score,
+            best_test_score,
+            best_params,
+            num_examples,
+            scaler,
+            best_model,
         )
         return OutputSpec(
-            model_filename = model_filename,
-            model_info_filename = model_info_filename,
-            scaler_filename = scaler_filename,
-            training_score = best_training_score,
-            test_score = best_test_score,
+            model_filename=model_filename,
+            model_info_filename=model_info_filename,
+            scaler_filename=scaler_filename,
+            training_score=best_training_score,
+            test_score=best_test_score,
         )
 
     @par.parallelize(method=par.Method.THREAD)
-    def read_files_from_list(
-            self,
-            files: List[str],
-            n: int
-    ) -> Tuple[List, List]:
+    def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
         # All features
         X = []
 
@@ -224,8 +223,7 @@ class TrainingBlueprint(ABC):
 
                 key = key.strip()
                 value = value.strip()
-                if (self.spec.features_to_skip is not None
-                        and key in self.spec.features_to_skip):
+                if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
                     logger.debug(f"Skipping feature {key}")
                     continue
 
@@ -247,22 +245,20 @@ class TrainingBlueprint(ABC):
 
                 if self.spec.delete_bad_inputs:
                     msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}.  DELETING."
-                    print(msg, file=sys.stderr)
                     logger.warning(msg)
+                    warnings.warn(msg)
                     os.remove(filename)
                 else:
                     msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}.  Skipping."
-                    print(msg, file=sys.stderr)
                     logger.warning(msg)
+                    warnings.warn(msg)
         return (X, y)
 
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
             from text_utils import progress_graph
-            progress_graph(
-                self.file_done_count,
-                self.total_file_count
-            )
+
+            progress_graph(self.file_done_count, self.total_file_count)
 
     @timed
     def read_input_files(self):
@@ -311,9 +307,9 @@ class TrainingBlueprint(ABC):
             random_state=random.randrange(0, 1000),
         )
 
-    def scale_data(self,
-                   X_train: np.ndarray,
-                   X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
+    def scale_data(
+        self, X_train: np.ndarray, X_test: np.ndarray
+    ) -> Tuple[Any, np.ndarray, np.ndarray]:
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
@@ -321,19 +317,17 @@ class TrainingBlueprint(ABC):
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
-    def train_model(self,
-                    parameters,
-                    X_train_scaled: np.ndarray,
-                    y_train: np.ndarray) -> Any:
+    def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
         pass
 
     def evaluate_model(
-            self,
-            model: Any,
-            X_train_scaled: np.ndarray,
-            y_train: np.ndarray,
-            X_test_scaled: np.ndarray,
-            y_test: np.ndarray) -> Tuple[np.float64, np.float64]:
+        self,
+        model: Any,
+        X_train_scaled: np.ndarray,
+        y_train: np.ndarray,
+        X_test_scaled: np.ndarray,
+        y_test: np.ndarray,
+    ) -> Tuple[np.float64, np.float64]:
         logger.debug("Evaluating the model")
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
@@ -344,13 +338,14 @@ class TrainingBlueprint(ABC):
         return (training_score, test_score)
 
     def maybe_persist_scaler_and_model(
-            self,
-            training_score: np.float64,
-            test_score: np.float64,
-            params: str,
-            num_examples: int,
-            scaler: Any,
-            model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+        self,
+        training_score: np.float64,
+        test_score: np.float64,
+        params: str,
+        num_examples: int,
+        scaler: Any,
+        model: Any,
+    ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
         if not self.spec.dry_run:
             import datetime_utils
             import input_utils
@@ -364,21 +359,20 @@ Training set score: {training_score:.2f}%
 Testing set score: {test_score:.2f}%"""
             print(f'\n{info}\n')
             if (
-                    (self.spec.persist_percentage_threshold is not None and
-                     test_score > self.spec.persist_percentage_threshold)
-                    or
-                    (not self.spec.quiet
-                     and input_utils.yn_response("Write the model? [y,n]: ") == "y")
+                self.spec.persist_percentage_threshold is not None
+                and test_score > self.spec.persist_percentage_threshold
+            ) or (
+                not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
-                with open(scaler_filename, "wb") as f:
-                    pickle.dump(scaler, f)
+                with open(scaler_filename, "wb") as fb:
+                    pickle.dump(scaler, fb)
                 msg = f"Wrote {scaler_filename}"
                 print(msg)
                 logger.info(msg)
                 model_filename = f"{self.spec.basename}_model.sav"
-                with open(model_filename, "wb") as f:
-                    pickle.dump(model, f)
+                with open(model_filename, "wb") as fb:
+                    pickle.dump(model, fb)
                 msg = f"Wrote {model_filename}"
                 print(msg)
                 logger.info(msg)