More type annotations.
[python_utils.git] / ml / model_trainer.py
index ab3059f855388d06b8077a359897bb07ef5b2bc9..213a1814cff5e98507e30c19e17669ab123886ce 100644 (file)
@@ -12,6 +12,7 @@ import random
 import sys
 from types import SimpleNamespace
 from typing import Any, List, NamedTuple, Optional, Set, Tuple
 import sys
 from types import SimpleNamespace
 from typing import Any, List, NamedTuple, Optional, Set, Tuple
+import warnings
 
 import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
 
 import numpy as np
 from sklearn.model_selection import train_test_split  # type:ignore
@@ -21,23 +22,23 @@ from ansi import bold, reset
 import argparse_utils
 import config
 from decorator_utils import timed
 import argparse_utils
 import config
 from decorator_utils import timed
+import executors
 import parallelize as par
 
 logger = logging.getLogger(__file__)
 
 parser = config.add_commandline_args(
 import parallelize as par
 
 logger = logging.getLogger(__file__)
 
 parser = config.add_commandline_args(
-    f"ML Model Trainer ({__file__})",
-    "Arguments related to training an ML model"
+    f"ML Model Trainer ({__file__})", "Arguments related to training an ML model"
 )
 parser.add_argument(
     "--ml_trainer_quiet",
     action="store_true",
 )
 parser.add_argument(
     "--ml_trainer_quiet",
     action="store_true",
-    help="Don't prompt the user for anything."
+    help="Don't prompt the user for anything.",
 )
 parser.add_argument(
     "--ml_trainer_delete",
     action="store_true",
 )
 parser.add_argument(
     "--ml_trainer_delete",
     action="store_true",
-    help="Delete invalid/incomplete features files in addition to warning."
+    help="Delete invalid/incomplete features files in addition to warning.",
 )
 group = parser.add_mutually_exclusive_group()
 group.add_argument(
 )
 group = parser.add_mutually_exclusive_group()
 group.add_argument(
@@ -69,10 +70,10 @@ class InputSpec(SimpleNamespace):
     @staticmethod
     def populate_from_config() -> InputSpec:
         return InputSpec(
     @staticmethod
     def populate_from_config() -> InputSpec:
         return InputSpec(
-            dry_run = config.config["ml_trainer_dry_run"],
-            quiet = config.config["ml_trainer_quiet"],
-            persist_percentage_threshold = config.config["ml_trainer_persist_threshold"],
-            delete_bad_inputs = config.config["ml_trainer_delete"],
+            dry_run=config.config["ml_trainer_dry_run"],
+            quiet=config.config["ml_trainer_quiet"],
+            persist_percentage_threshold=config.config["ml_trainer_persist_threshold"],
+            delete_bad_inputs=config.config["ml_trainer_delete"],
         )
 
 
         )
 
 
@@ -125,11 +126,7 @@ class TrainingBlueprint(ABC):
         models = []
         modelid_to_params = {}
         for params in self.spec.training_parameters:
         models = []
         modelid_to_params = {}
         for params in self.spec.training_parameters:
-            model = self.train_model(
-                params,
-                self.X_train_scaled,
-                self.y_train
-            )
+            model = self.train_model(params, self.X_train_scaled, self.y_train)
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
             models.append(model)
             modelid_to_params[model.get_id()] = str(params)
 
@@ -165,38 +162,36 @@ class TrainingBlueprint(ABC):
                     best_model = model
                     best_params = params
                     if not self.spec.quiet:
                     best_model = model
                     best_params = params
                     if not self.spec.quiet:
-                        print(
-                            f"New best score {best_score:.2f}% with params {params}"
-                        )
+                        print(f"New best score {best_score:.2f}% with params {params}")
 
         if not self.spec.quiet:
 
         if not self.spec.quiet:
+            executors.DefaultExecutors().shutdown()
             msg = f"Done training; best test set score was: {best_test_score:.1f}%"
             print(msg)
             logger.info(msg)
             msg = f"Done training; best test set score was: {best_test_score:.1f}%"
             print(msg)
             logger.info(msg)
-        scaler_filename, model_filename, model_info_filename = (
-            self.maybe_persist_scaler_and_model(
-                best_training_score,
-                best_test_score,
-                best_params,
-                num_examples,
-                scaler,
-                best_model,
-            )
+
+        (
+            scaler_filename,
+            model_filename,
+            model_info_filename,
+        ) = self.maybe_persist_scaler_and_model(
+            best_training_score,
+            best_test_score,
+            best_params,
+            num_examples,
+            scaler,
+            best_model,
         )
         return OutputSpec(
         )
         return OutputSpec(
-            model_filename = model_filename,
-            model_info_filename = model_info_filename,
-            scaler_filename = scaler_filename,
-            training_score = best_training_score,
-            test_score = best_test_score,
+            model_filename=model_filename,
+            model_info_filename=model_info_filename,
+            scaler_filename=scaler_filename,
+            training_score=best_training_score,
+            test_score=best_test_score,
         )
 
     @par.parallelize(method=par.Method.THREAD)
         )
 
     @par.parallelize(method=par.Method.THREAD)
-    def read_files_from_list(
-            self,
-            files: List[str],
-            n: int
-    ) -> Tuple[List, List]:
+    def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
         # All features
         X = []
 
         # All features
         X = []
 
@@ -218,14 +213,18 @@ class TrainingBlueprint(ABC):
                 line = line.strip()
                 try:
                     (key, value) = line.split(self.spec.key_value_delimiter)
                 line = line.strip()
                 try:
                     (key, value) = line.split(self.spec.key_value_delimiter)
-                except Exception as e:
-                    logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
+                except Exception:
+                    logger.debug(
+                        f"WARNING: bad line in file {filename} '{line}', skipped"
+                    )
                     continue
 
                 key = key.strip()
                 value = value.strip()
                     continue
 
                 key = key.strip()
                 value = value.strip()
-                if (self.spec.features_to_skip is not None
-                        and key in self.spec.features_to_skip):
+                if (
+                    self.spec.features_to_skip is not None
+                    and key in self.spec.features_to_skip
+                ):
                     logger.debug(f"Skipping feature {key}")
                     continue
 
                     logger.debug(f"Skipping feature {key}")
                     continue
 
@@ -246,23 +245,21 @@ class TrainingBlueprint(ABC):
                     y.pop()
 
                 if self.spec.delete_bad_inputs:
                     y.pop()
 
                 if self.spec.delete_bad_inputs:
-                    msg = f"WARNING: {filename}: missing features or label.  DELETING."
-                    print(msg, file=sys.stderr)
+                    msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}.  DELETING."
                     logger.warning(msg)
                     logger.warning(msg)
+                    warnings.warn(msg)
                     os.remove(filename)
                 else:
                     os.remove(filename)
                 else:
-                    msg = f"WARNING: {filename}: missing features or label.  Skipped."
-                    print(msg, file=sys.stderr)
+                    msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}.  Skipping."
                     logger.warning(msg)
                     logger.warning(msg)
+                    warnings.warn(msg)
         return (X, y)
 
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
             from text_utils import progress_graph
         return (X, y)
 
     def make_progress_graph(self) -> None:
         if not self.spec.quiet:
             from text_utils import progress_graph
-            progress_graph(
-                self.file_done_count,
-                self.total_file_count
-            )
+
+            progress_graph(self.file_done_count, self.total_file_count)
 
     @timed
     def read_input_files(self):
 
     @timed
     def read_input_files(self):
@@ -311,9 +308,9 @@ class TrainingBlueprint(ABC):
             random_state=random.randrange(0, 1000),
         )
 
             random_state=random.randrange(0, 1000),
         )
 
-    def scale_data(self,
-                   X_train: np.ndarray,
-                   X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
+    def scale_data(
+        self, X_train: np.ndarray, X_test: np.ndarray
+    ) -> Tuple[Any, np.ndarray, np.ndarray]:
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
         logger.debug("Scaling data")
         scaler = MinMaxScaler()
         scaler.fit(X_train)
@@ -321,19 +318,19 @@ class TrainingBlueprint(ABC):
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
 
     # Note: children should implement.  Consider using @parallelize.
     @abstractmethod
-    def train_model(self,
-                    parameters,
-                    X_train_scaled: np.ndarray,
-                    y_train: np.ndarray) -> Any:
+    def train_model(
+        self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray
+    ) -> Any:
         pass
 
     def evaluate_model(
         pass
 
     def evaluate_model(
-            self,
-            model: Any,
-            X_train_scaled: np.ndarray,
-            y_train: np.ndarray,
-            X_test_scaled: np.ndarray,
-            y_test: np.ndarray) -> Tuple[np.float64, np.float64]:
+        self,
+        model: Any,
+        X_train_scaled: np.ndarray,
+        y_train: np.ndarray,
+        X_test_scaled: np.ndarray,
+        y_test: np.ndarray,
+    ) -> Tuple[np.float64, np.float64]:
         logger.debug("Evaluating the model")
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
         logger.debug("Evaluating the model")
         training_score = model.score(X_train_scaled, y_train) * 100.0
         test_score = model.score(X_test_scaled, y_test) * 100.0
@@ -344,24 +341,32 @@ class TrainingBlueprint(ABC):
         return (training_score, test_score)
 
     def maybe_persist_scaler_and_model(
         return (training_score, test_score)
 
     def maybe_persist_scaler_and_model(
-            self,
-            training_score: np.float64,
-            test_score: np.float64,
-            params: str,
-            num_examples: int,
-            scaler: Any,
-            model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+        self,
+        training_score: np.float64,
+        test_score: np.float64,
+        params: str,
+        num_examples: int,
+        scaler: Any,
+        model: Any,
+    ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
         if not self.spec.dry_run:
             import datetime_utils
             import input_utils
             import string_utils
 
         if not self.spec.dry_run:
             import datetime_utils
             import input_utils
             import string_utils
 
+            now: datetime.datetime = datetime_utils.now_pacific()
+            info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
+Model params: {params}
+Training examples: {num_examples}
+Training set score: {training_score:.2f}%
+Testing set score: {test_score:.2f}%"""
+            print(f'\n{info}\n')
             if (
             if (
-                    (self.spec.persist_percentage_threshold is not None and
-                     test_score > self.spec.persist_percentage_threshold)
-                    or
-                    (not self.spec.quiet
-                     and input_utils.yn_response("Write the model? [y,n]: ") == "y")
+                self.spec.persist_percentage_threshold is not None
+                and test_score > self.spec.persist_percentage_threshold
+            ) or (
+                not self.spec.quiet
+                and input_utils.yn_response("Write the model? [y,n]: ") == "y"
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
                 with open(scaler_filename, "wb") as f:
             ):
                 scaler_filename = f"{self.spec.basename}_scaler.sav"
                 with open(scaler_filename, "wb") as f:
@@ -376,12 +381,6 @@ class TrainingBlueprint(ABC):
                 print(msg)
                 logger.info(msg)
                 model_info_filename = f"{self.spec.basename}_model_info.txt"
                 print(msg)
                 logger.info(msg)
                 model_info_filename = f"{self.spec.basename}_model_info.txt"
-                now: datetime.datetime = datetime_utils.now_pst()
-                info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
-Model params: {params}
-Training examples: {num_examples}
-Training set score: {training_score:.2f}%
-Testing set score: {test_score:.2f}%"""
                 with open(model_info_filename, "w") as f:
                     f.write(info)
                 msg = f"Wrote {model_info_filename}:"
                 with open(model_info_filename, "w") as f:
                     f.write(info)
                 msg = f"Wrote {model_info_filename}:"