Change settings in flake8 and black.
[python_utils.git] / ml / model_trainer.py
index 79ce7062b5b4a05616cddcf7b3d35d59cfbef007..12ccb3c6c0508e61081d6791b12f9a8f5a5571f2 100644 (file)
@@ -2,7 +2,6 @@
 
 from __future__ import annotations
 
 
 from __future__ import annotations
 
-from abc import ABC, abstractmethod
 import datetime
 import glob
 import logging
 import datetime
 import glob
 import logging
@@ -10,35 +9,37 @@ import os
 import pickle
 import random
 import sys
 import pickle
 import random
 import sys
+import warnings
+from abc import ABC, abstractmethod
 from types import SimpleNamespace
 from typing import Any, List, NamedTuple, Optional, Set, Tuple
 from types import SimpleNamespace
 from typing import Any, List, NamedTuple, Optional, Set, Tuple
-import warnings
 
 import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
 from sklearn.preprocessing import MinMaxScaler  # type: ignore
 
 
 import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
 from sklearn.preprocessing import MinMaxScaler  # type: ignore
 
-from ansi import bold, reset
 import argparse_utils
 import config
 import argparse_utils
 import config
-from decorator_utils import timed
+import executors
 import parallelize as par
 import parallelize as par
+from ansi import bold, reset
+from decorator_utils import timed
 
 logger = logging.getLogger(__file__)
 
 parser = config.add_commandline_args(
     f"ML Model Trainer ({__file__})",
 
 logger = logging.getLogger(__file__)
 
 parser = config.add_commandline_args(
     f"ML Model Trainer ({__file__})",
-    "Arguments related to training an ML model"
+    "Arguments related to training an ML model",
 )
 parser.add_argument(
     "--ml_trainer_quiet",
     action="store_true",
 )
 parser.add_argument(
     "--ml_trainer_quiet",
     action="store_true",
-    help="Don't prompt the user for anything."
+    help="Don't prompt the user for anything.",
 )
 parser.add_argument(
     "--ml_trainer_delete",
     action="store_true",
 )
 parser.add_argument(
     "--ml_trainer_delete",
     action="store_true",
-    help="Delete invalid/incomplete features files in addition to warning."
+    help="Delete invalid/incomplete features files in addition to warning.",
 )
 group = parser.add_mutually_exclusive_group()
 group.add_argument(
 )
 group = parser.add_mutually_exclusive_group()
 group.add_argument(
@@ -70,10 +71,10 @@ class InputSpec(SimpleNamespace):
     @staticmethod
     def populate_from_config() -> InputSpec:
         return InputSpec(
     @staticmethod
     def populate_from_config() -> InputSpec:
         return InputSpec(
-            dry_run = config.config["ml_trainer_dry_run"],
-            quiet = config.config["ml_trainer_quiet"],
-            persist_percentage_threshold = config.config["ml_trainer_persist_threshold"],
-            delete_bad_inputs = config.config["ml_trainer_delete"],
+            dry_run=config.config["ml_trainer_dry_run"],
+            quiet=config.config["ml_trainer_quiet"],
+            persist_percentage_threshold=config.config["ml_trainer_persist_threshold"],
+            delete_bad_inputs=config.config["ml_trainer_delete"],
         )
 
 
         )
 
 
@@ -81,8 +82,8 @@ class OutputSpec(NamedTuple):
     model_filename: Optional[str]
     model_info_filename: Optional[str]
     scaler_filename: Optional[str]
     model_filename: Optional[str]
     model_info_filename: Optional[str]
     scaler_filename: Optional[str]
-    training_score: float
-    test_score: float
+    training_score: np.float64
+    test_score: np.float64
 
 
 class TrainingBlueprint(ABC):
 
 
 class TrainingBlueprint(ABC):
@@ -126,18 +127,14 @@ class TrainingBlueprint(ABC):
         models = []
         modelid_to_params = {}
         for params in self.spec.training_parameters:
         models = []
         modelid_to_params = {}
         for params in self.spec.training_parameters:
-            model = self.train_model(
-                params,
-                self.X_train_scaled,
-                self.y_train
-            )
+            model = self.train_model(params, self.X_train_scaled, self.y_train)
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
         best_model = None
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
         best_model = None
-        best_score = None
-        best_test_score = None
-        best_training_score = None
+        best_score: Optional[np.float64] = None
+        best_test_score: Optional[np.float64] = None
+        best_training_score: Optional[np.float64] = None
         best_params = None
         for model in smart_future.wait_any(models):
             params = modelid_to_params[model.get_id()]
         best_params = None
         for model in smart_future.wait_any(models):
             params = modelid_to_params[model.get_id()]
@@ -166,38 +163,39 @@ class TrainingBlueprint(ABC):
                     best_model = model
                     best_params = params
                     if not self.spec.quiet:
                     best_model = model
                     best_params = params
                     if not self.spec.quiet:
-                        print(
-                            f"New best score {best_score:.2f}% with params {params}"
-                        )
+                        print(f"New best score {best_score:.2f}% with params {params}")
 
         if not self.spec.quiet:
 
         if not self.spec.quiet:
+            executors.DefaultExecutors().shutdown()
             msg = f"Done training; best test set score was: {best_test_score:.1f}%"
             print(msg)
             logger.info(msg)
             msg = f"Done training; best test set score was: {best_test_score:.1f}%"
             print(msg)
             logger.info(msg)
-        scaler_filename, model_filename, model_info_filename = (
-            self.maybe_persist_scaler_and_model(
-                best_training_score,
-                best_test_score,
-                best_params,
-                num_examples,
-                scaler,
-                best_model,
-            )
+
+        assert best_training_score is not None
+        assert best_test_score is not None
+        assert best_params is not None
+        (
+            scaler_filename,
+            model_filename,
+            model_info_filename,
+        ) = self.maybe_persist_scaler_and_model(
+            best_training_score,
+            best_test_score,
+            best_params,
+            num_examples,
+            scaler,
+            best_model,
         )
         return OutputSpec(
         )
         return OutputSpec(
-            model_filename = model_filename,
-            model_info_filename = model_info_filename,
-            scaler_filename = scaler_filename,
-            training_score = best_training_score,
-            test_score = best_test_score,
+            model_filename=model_filename,
+            model_info_filename=model_info_filename,
+            scaler_filename=scaler_filename,
+            training_score=best_training_score,
+            test_score=best_test_score,
         )
 
     @par.parallelize(method=par.Method.THREAD)
         )
 
     @par.parallelize(method=par.Method.THREAD)
-    def read_files_from_list(
-            self,
-            files: List[str],
-            n: int
-    ) -> Tuple[List, List]:
+    def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
         # All features
         X = []
 
         # All features
         X = []
 
@@ -225,8 +223,7 @@ class TrainingBlueprint(ABC):
 
                 key = key.strip()
                 value = value.strip()
 
                 key = key.strip()
                 value = value.strip()
-                if (self.spec.features_to_skip is not None
-                        and key in self.spec.features_to_skip):
+                if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
                     logger.debug(f"Skipping feature {key}")
                     continue
 
                     logger.debug(f"Skipping feature {key}")
                     continue
 
@@ -260,10 +257,8 @@ class TrainingBlueprint(ABC):
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
             from text_utils import progress_graph
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
             from text_utils import progress_graph
-            progress_graph(
-                self.file_done_count,
-                self.total_file_count
-            )
+
+            progress_graph(self.file_done_count, self.total_file_count)
 
     @timed
     def read_input_files(self):
 
     @timed
     def read_input_files(self):
@@ -312,9 +307,9 @@ class TrainingBlueprint(ABC):
             random_state=random.randrange(0, 1000),
         )
 
             random_state=random.randrange(0, 1000),
         )
 
-    def scale_data(self,
-                   X_train: np.ndarray,
-                   X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
+    def scale_data(
+        self, X_train: np.ndarray, X_test: np.ndarray
+    ) -> Tuple[Any, np.ndarray, np.ndarray]:
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
@@ -322,19 +317,17 @@ class TrainingBlueprint(ABC):
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
-    def train_model(self,
-                    parameters,
-                    X_train_scaled: np.ndarray,
-                    y_train: np.ndarray) -> Any:
+    def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
         pass
 
     def evaluate_model(
         pass
 
     def evaluate_model(
-            self,
-            model: Any,
-            X_train_scaled: np.ndarray,
-            y_train: np.ndarray,
-            X_test_scaled: np.ndarray,
-            y_test: np.ndarray) -> Tuple[np.float64, np.float64]:
+        self,
+        model: Any,
+        X_train_scaled: np.ndarray,
+        y_train: np.ndarray,
+        X_test_scaled: np.ndarray,
+        y_test: np.ndarray,
+    ) -> Tuple[np.float64, np.float64]:
         logger.debug("Evaluating the model")
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
         logger.debug("Evaluating the model")
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
@@ -345,13 +338,14 @@ class TrainingBlueprint(ABC):
         return (training_score, test_score)
 
     def maybe_persist_scaler_and_model(
         return (training_score, test_score)
 
     def maybe_persist_scaler_and_model(
-            self,
-            training_score: np.float64,
-            test_score: np.float64,
-            params: str,
-            num_examples: int,
-            scaler: Any,
-            model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+        self,
+        training_score: np.float64,
+        test_score: np.float64,
+        params: str,
+        num_examples: int,
+        scaler: Any,
+        model: Any,
+    ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
         if not self.spec.dry_run:
             import datetime_utils
             import input_utils
         if not self.spec.dry_run:
             import datetime_utils
             import input_utils
@@ -365,21 +359,20 @@ Training set score: {training_score:.2f}%
 Testing set score: {test_score:.2f}%"""
             print(f'\n{info}\n')
             if (
 Testing set score: {test_score:.2f}%"""
             print(f'\n{info}\n')
             if (
-                    (self.spec.persist_percentage_threshold is not None and
-                     test_score > self.spec.persist_percentage_threshold)
-                    or
-                    (not self.spec.quiet
-                     and input_utils.yn_response("Write the model? [y,n]: ") == "y")
+                self.spec.persist_percentage_threshold is not None
+                and test_score > self.spec.persist_percentage_threshold
+            ) or (
+                not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
-                with open(scaler_filename, "wb") as f:
-                    pickle.dump(scaler, f)
+                with open(scaler_filename, "wb") as fb:
+                    pickle.dump(scaler, fb)
                 msg = f"Wrote {scaler_filename}"
                 print(msg)
                 logger.info(msg)
                 model_filename = f"{self.spec.basename}_model.sav"
                 msg = f"Wrote {scaler_filename}"
                 print(msg)
                 logger.info(msg)
                 model_filename = f"{self.spec.basename}_model.sav"
-                with open(model_filename, "wb") as f:
-                    pickle.dump(model, f)
+                with open(model_filename, "wb") as fb:
+                    pickle.dump(model, fb)
                 msg = f"Wrote {model_filename}"
                 print(msg)
                 logger.info(msg)
                 msg = f"Wrote {model_filename}"
                 print(msg)
                 logger.info(msg)