import sys
from types import SimpleNamespace
from typing import Any, List, NamedTuple, Optional, Set, Tuple
+import warnings
import numpy as np
from sklearn.model_selection import train_test_split # type:ignore
import argparse_utils
import config
from decorator_utils import timed
+import executors
import parallelize as par
logger = logging.getLogger(__file__)
parser = config.add_commandline_args(
- f"ML Model Trainer ({__file__})",
- "Arguments related to training an ML model"
+ f"ML Model Trainer ({__file__})", "Arguments related to training an ML model"
)
parser.add_argument(
"--ml_trainer_quiet",
action="store_true",
- help="Don't prompt the user for anything."
+ help="Don't prompt the user for anything.",
)
parser.add_argument(
"--ml_trainer_delete",
action="store_true",
- help="Delete invalid/incomplete features files in addition to warning."
+ help="Delete invalid/incomplete features files in addition to warning.",
)
group = parser.add_mutually_exclusive_group()
group.add_argument(
@staticmethod
def populate_from_config() -> InputSpec:
return InputSpec(
- dry_run = config.config["ml_trainer_dry_run"],
- quiet = config.config["ml_trainer_quiet"],
- persist_percentage_threshold = config.config["ml_trainer_persist_threshold"],
- delete_bad_inputs = config.config["ml_trainer_delete"],
+ dry_run=config.config["ml_trainer_dry_run"],
+ quiet=config.config["ml_trainer_quiet"],
+ persist_percentage_threshold=config.config["ml_trainer_persist_threshold"],
+ delete_bad_inputs=config.config["ml_trainer_delete"],
)
models = []
modelid_to_params = {}
for params in self.spec.training_parameters:
- model = self.train_model(
- params,
- self.X_train_scaled,
- self.y_train
- )
+ model = self.train_model(params, self.X_train_scaled, self.y_train)
models.append(model)
modelid_to_params[model.get_id()] = str(params)
best_model = model
best_params = params
if not self.spec.quiet:
- print(
- f"New best score {best_score:.2f}% with params {params}"
- )
+ print(f"New best score {best_score:.2f}% with params {params}")
if not self.spec.quiet:
+ executors.DefaultExecutors().shutdown()
msg = f"Done training; best test set score was: {best_test_score:.1f}%"
print(msg)
logger.info(msg)
- scaler_filename, model_filename, model_info_filename = (
- self.maybe_persist_scaler_and_model(
- best_training_score,
- best_test_score,
- best_params,
- num_examples,
- scaler,
- best_model,
- )
+
+ (
+ scaler_filename,
+ model_filename,
+ model_info_filename,
+ ) = self.maybe_persist_scaler_and_model(
+ best_training_score,
+ best_test_score,
+ best_params,
+ num_examples,
+ scaler,
+ best_model,
)
return OutputSpec(
- model_filename = model_filename,
- model_info_filename = model_info_filename,
- scaler_filename = scaler_filename,
- training_score = best_training_score,
- test_score = best_test_score,
+ model_filename=model_filename,
+ model_info_filename=model_info_filename,
+ scaler_filename=scaler_filename,
+ training_score=best_training_score,
+ test_score=best_test_score,
)
@par.parallelize(method=par.Method.THREAD)
- def read_files_from_list(
- self,
- files: List[str],
- n: int
- ) -> Tuple[List, List]:
+ def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
# All features
X = []
line = line.strip()
try:
(key, value) = line.split(self.spec.key_value_delimiter)
- except Exception as e:
- logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
+ except Exception:
+ logger.debug(
+ f"WARNING: bad line in file {filename} '{line}', skipped"
+ )
continue
key = key.strip()
value = value.strip()
- if (self.spec.features_to_skip is not None
- and key in self.spec.features_to_skip):
+ if (
+ self.spec.features_to_skip is not None
+ and key in self.spec.features_to_skip
+ ):
logger.debug(f"Skipping feature {key}")
continue
y.pop()
if self.spec.delete_bad_inputs:
- msg = f"WARNING: {filename}: missing features or label. DELETING."
- print(msg, file=sys.stderr)
+ msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING."
logger.warning(msg)
+ warnings.warn(msg)
os.remove(filename)
else:
- msg = f"WARNING: {filename}: missing features or label. Skipped."
- print(msg, file=sys.stderr)
+ msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping."
logger.warning(msg)
+ warnings.warn(msg)
return (X, y)
def make_progress_graph(self) -> None:
if not self.spec.quiet:
from text_utils import progress_graph
- progress_graph(
- self.file_done_count,
- self.total_file_count
- )
+
+ progress_graph(self.file_done_count, self.total_file_count)
@timed
def read_input_files(self):
random_state=random.randrange(0, 1000),
)
- def scale_data(self,
- X_train: np.ndarray,
- X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
+ def scale_data(
+ self, X_train: np.ndarray, X_test: np.ndarray
+ ) -> Tuple[Any, np.ndarray, np.ndarray]:
logger.debug("Scaling data")
scaler = MinMaxScaler()
scaler.fit(X_train)
# Note: children should implement. Consider using @parallelize.
@abstractmethod
- def train_model(self,
- parameters,
- X_train_scaled: np.ndarray,
- y_train: np.ndarray) -> Any:
+ def train_model(
+ self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray
+ ) -> Any:
pass
def evaluate_model(
- self,
- model: Any,
- X_train_scaled: np.ndarray,
- y_train: np.ndarray,
- X_test_scaled: np.ndarray,
- y_test: np.ndarray) -> Tuple[np.float64, np.float64]:
+ self,
+ model: Any,
+ X_train_scaled: np.ndarray,
+ y_train: np.ndarray,
+ X_test_scaled: np.ndarray,
+ y_test: np.ndarray,
+ ) -> Tuple[np.float64, np.float64]:
logger.debug("Evaluating the model")
training_score = model.score(X_train_scaled, y_train) * 100.0
test_score = model.score(X_test_scaled, y_test) * 100.0
return (training_score, test_score)
def maybe_persist_scaler_and_model(
- self,
- training_score: np.float64,
- test_score: np.float64,
- params: str,
- num_examples: int,
- scaler: Any,
- model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+ self,
+ training_score: np.float64,
+ test_score: np.float64,
+ params: str,
+ num_examples: int,
+ scaler: Any,
+ model: Any,
+ ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
if not self.spec.dry_run:
import datetime_utils
import input_utils
import string_utils
+ now: datetime.datetime = datetime_utils.now_pacific()
+ info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
+Model params: {params}
+Training examples: {num_examples}
+Training set score: {training_score:.2f}%
+Testing set score: {test_score:.2f}%"""
+ print(f'\n{info}\n')
if (
- (self.spec.persist_percentage_threshold is not None and
- test_score > self.spec.persist_percentage_threshold)
- or
- (not self.spec.quiet
- and input_utils.yn_response("Write the model? [y,n]: ") == "y")
+ self.spec.persist_percentage_threshold is not None
+ and test_score > self.spec.persist_percentage_threshold
+ ) or (
+ not self.spec.quiet
+ and input_utils.yn_response("Write the model? [y,n]: ") == "y"
):
scaler_filename = f"{self.spec.basename}_scaler.sav"
with open(scaler_filename, "wb") as f:
print(msg)
logger.info(msg)
model_info_filename = f"{self.spec.basename}_model_info.txt"
- now: datetime.datetime = datetime_utils.now_pacific()
- info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
-Model params: {params}
-Training examples: {num_examples}
-Training set score: {training_score:.2f}%
-Testing set score: {test_score:.2f}%"""
with open(model_info_filename, "w") as f:
f.write(info)
msg = f"Wrote {model_info_filename}:"