3 from __future__ import annotations
5 from abc import ABC, abstractmethod
13 from types import SimpleNamespace
14 from typing import Any, List, NamedTuple, Optional, Set, Tuple
17 from sklearn.model_selection import train_test_split # type:ignore
18 from sklearn.preprocessing import MinMaxScaler # type: ignore
20 from ansi import bold, reset
23 from decorator_utils import timed
24 import parallelize as par
26 logger = logging.getLogger(__file__)
28 parser = config.add_commandline_args(
29 f"ML Model Trainer ({__file__})",
30 "Arguments related to training an ML model"
35 help="Don't prompt the user for anything."
38 "--ml_trainer_delete",
40 help="Delete invalid/incomplete features files in addition to warning."
42 group = parser.add_mutually_exclusive_group()
44 "--ml_trainer_dry_run",
46 help="Do not write a new model, just report efficacy.",
49 "--ml_trainer_persist_threshold",
50 type=argparse_utils.valid_percentage,
52 help="Persist the model if the test set score is >= this threshold.",
56 class InputSpec(SimpleNamespace):
59 features_to_skip: Set[str]
60 key_value_delimiter: str
61 training_parameters: List
64 dry_run: Optional[bool]
66 persist_percentage_threshold: Optional[float]
67 delete_bad_inputs: Optional[bool]
70 def populate_from_config() -> InputSpec:
72 dry_run = config.config["ml_trainer_dry_run"],
73 quiet = config.config["ml_trainer_quiet"],
74 persist_percentage_threshold = config.config["ml_trainer_persist_threshold"],
75 delete_bad_inputs = config.config["ml_trainer_delete"],
79 class OutputSpec(NamedTuple):
80 model_filename: Optional[str]
81 model_info_filename: Optional[str]
82 scaler_filename: Optional[str]
87 class TrainingBlueprint(ABC):
91 self.X_test_scaled = None
92 self.X_train_scaled = None
93 self.file_done_count = 0
94 self.total_file_count = 0
97 def train(self, spec: InputSpec) -> OutputSpec:
103 X_, y_ = self.read_input_files()
104 num_examples = len(y_)
106 # Every example's features
109 # Every example's label
112 print("Doing random test/train split...")
113 X_train, X_test, self.y_train, self.y_test = self.test_train_split(
118 print("Scaling training data...")
119 scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data(
124 print("Training model(s)...")
126 modelid_to_params = {}
127 for params in self.spec.training_parameters:
128 model = self.train_model(
134 modelid_to_params[model.get_id()] = str(params)
138 best_test_score = None
139 best_training_score = None
141 for model in smart_future.wait_any(models):
142 params = modelid_to_params[model.get_id()]
143 if isinstance(model, smart_future.SmartFuture):
144 model = model._resolve()
145 if model is not None:
146 training_score, test_score = self.evaluate_model(
153 score = (training_score + test_score * 20) / 21
154 if not self.spec.quiet:
156 f"{bold()}{params}{reset()}: "
157 f"Training set score={training_score:.2f}%, "
158 f"test set score={test_score:.2f}%",
161 if best_score is None or score > best_score:
163 best_test_score = test_score
164 best_training_score = training_score
167 if not self.spec.quiet:
169 f"New best score {best_score:.2f}% with params {params}"
172 if not self.spec.quiet:
173 msg = f"Done training; best test set score was: {best_test_score:.1f}%"
176 scaler_filename, model_filename, model_info_filename = (
177 self.maybe_persist_scaler_and_model(
187 model_filename = model_filename,
188 model_info_filename = model_info_filename,
189 scaler_filename = scaler_filename,
190 training_score = best_training_score,
191 test_score = best_test_score,
194 @par.parallelize(method=par.Method.THREAD)
195 def read_files_from_list(
199 ) -> Tuple[List, List]:
206 for filename in files:
208 with open(filename, "r") as f:
209 lines = f.readlines()
211 # This example's features
215 # We expect lines in features files to be of the form:
220 (key, value) = line.split(self.spec.key_value_delimiter)
221 except Exception as e:
222 logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
226 value = value.strip()
227 if (self.spec.features_to_skip is not None
228 and key in self.spec.features_to_skip):
229 logger.debug(f"Skipping feature {key}")
232 value = self.normalize_feature(value)
234 if key == self.spec.label:
240 # Make sure we saw a label and the requisite number of features.
241 if len(x) == self.spec.feature_count and wrote_label:
243 self.file_done_count += 1
248 if self.spec.delete_bad_inputs:
249 msg = f"WARNING: {filename}: missing features or label. DELETING."
250 print(msg, file=sys.stderr)
254 msg = f"WARNING: {filename}: missing features or label. Skipped."
255 print(msg, file=sys.stderr)
259 def make_progress_graph(self) -> None:
260 if not self.spec.quiet:
261 from text_utils import progress_graph
263 self.file_done_count,
264 self.total_file_count
268 def read_input_files(self):
279 all_files = glob.glob(self.spec.file_glob)
280 self.total_file_count = len(all_files)
281 for n, files in enumerate(list_utils.shard(all_files, 500)):
282 file_list = list(files)
283 results.append(self.read_files_from_list(file_list, n))
285 for result in smart_future.wait_any(results, callback=self.make_progress_graph):
286 result = result._resolve()
291 if not self.spec.quiet:
292 print(" " * 80 + "\n")
295 def normalize_feature(self, value: str) -> Any:
296 if value in ("False", "None"):
298 elif value == "True":
300 elif isinstance(value, str) and "." in value:
301 ret = round(float(value) * 100.0)
306 def test_train_split(self, X, y) -> List:
307 logger.debug("Performing test/train split")
308 return train_test_split(
311 random_state=random.randrange(0, 1000),
316 X_test: np.ndarray) -> Tuple[Any, np.ndarray, np.ndarray]:
317 logger.debug("Scaling data")
318 scaler = MinMaxScaler()
320 return (scaler, scaler.transform(X_train), scaler.transform(X_test))
322 # Note: children should implement. Consider using @parallelize.
324 def train_model(self,
326 X_train_scaled: np.ndarray,
327 y_train: np.ndarray) -> Any:
333 X_train_scaled: np.ndarray,
335 X_test_scaled: np.ndarray,
336 y_test: np.ndarray) -> Tuple[np.float64, np.float64]:
337 logger.debug("Evaluating the model")
338 training_score = model.score(X_train_scaled, y_train) * 100.0
339 test_score = model.score(X_test_scaled, y_test) * 100.0
341 f"Model evaluation results: test_score={test_score:.5f}, "
342 f"train_score={training_score:.5f}"
344 return (training_score, test_score)
346 def maybe_persist_scaler_and_model(
348 training_score: np.float64,
349 test_score: np.float64,
353 model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
354 if not self.spec.dry_run:
355 import datetime_utils
360 (self.spec.persist_percentage_threshold is not None and
361 test_score > self.spec.persist_percentage_threshold)
364 and input_utils.yn_response("Write the model? [y,n]: ") == "y")
366 scaler_filename = f"{self.spec.basename}_scaler.sav"
367 with open(scaler_filename, "wb") as f:
368 pickle.dump(scaler, f)
369 msg = f"Wrote {scaler_filename}"
372 model_filename = f"{self.spec.basename}_model.sav"
373 with open(model_filename, "wb") as f:
374 pickle.dump(model, f)
375 msg = f"Wrote {model_filename}"
378 model_info_filename = f"{self.spec.basename}_model_info.txt"
379 now: datetime.datetime = datetime_utils.now_pst()
380 info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
381 Model params: {params}
382 Training examples: {num_examples}
383 Training set score: {training_score:.2f}%
384 Testing set score: {test_score:.2f}%"""
385 with open(model_info_filename, "w") as f:
387 msg = f"Wrote {model_info_filename}:"
390 print(string_utils.indent(info, 2))
392 return (scaler_filename, model_filename, model_info_filename)
393 return (None, None, None)