3 from __future__ import annotations
13 from abc import ABC, abstractmethod
14 from types import SimpleNamespace
15 from typing import Any, List, NamedTuple, Optional, Set, Tuple
18 from sklearn.model_selection import train_test_split # type:ignore
19 from sklearn.preprocessing import MinMaxScaler # type: ignore
24 import parallelize as par
25 from ansi import bold, reset
26 from decorator_utils import timed
28 logger = logging.getLogger(__file__)
30 parser = config.add_commandline_args(
31 f"ML Model Trainer ({__file__})",
32 "Arguments related to training an ML model",
37 help="Don't prompt the user for anything.",
40 "--ml_trainer_delete",
42 help="Delete invalid/incomplete features files in addition to warning.",
44 group = parser.add_mutually_exclusive_group()
46 "--ml_trainer_dry_run",
48 help="Do not write a new model, just report efficacy.",
51 "--ml_trainer_persist_threshold",
52 type=argparse_utils.valid_percentage,
54 help="Persist the model if the test set score is >= this threshold.",
58 class InputSpec(SimpleNamespace):
61 features_to_skip: Set[str]
62 key_value_delimiter: str
63 training_parameters: List
66 dry_run: Optional[bool]
68 persist_percentage_threshold: Optional[float]
69 delete_bad_inputs: Optional[bool]
72 def populate_from_config() -> InputSpec:
74 dry_run=config.config["ml_trainer_dry_run"],
75 quiet=config.config["ml_trainer_quiet"],
76 persist_percentage_threshold=config.config["ml_trainer_persist_threshold"],
77 delete_bad_inputs=config.config["ml_trainer_delete"],
81 class OutputSpec(NamedTuple):
82 model_filename: Optional[str]
83 model_info_filename: Optional[str]
84 scaler_filename: Optional[str]
85 training_score: np.float64
86 test_score: np.float64
89 class TrainingBlueprint(ABC):
93 self.X_test_scaled = None
94 self.X_train_scaled = None
95 self.file_done_count = 0
96 self.total_file_count = 0
99 def train(self, spec: InputSpec) -> OutputSpec:
105 X_, y_ = self.read_input_files()
106 num_examples = len(y_)
108 # Every example's features
111 # Every example's label
114 print("Doing random test/train split...")
115 X_train, X_test, self.y_train, self.y_test = self.test_train_split(
120 print("Scaling training data...")
121 scaler, self.X_train_scaled, self.X_test_scaled = self.scale_data(
126 print("Training model(s)...")
128 modelid_to_params = {}
129 for params in self.spec.training_parameters:
130 model = self.train_model(params, self.X_train_scaled, self.y_train)
132 modelid_to_params[model.get_id()] = str(params)
135 best_score: Optional[np.float64] = None
136 best_test_score: Optional[np.float64] = None
137 best_training_score: Optional[np.float64] = None
139 for model in smart_future.wait_any(models):
140 params = modelid_to_params[model.get_id()]
141 if isinstance(model, smart_future.SmartFuture):
142 model = model._resolve()
143 if model is not None:
144 training_score, test_score = self.evaluate_model(
151 score = (training_score + test_score * 20) / 21
152 if not self.spec.quiet:
154 f"{bold()}{params}{reset()}: "
155 f"Training set score={training_score:.2f}%, "
156 f"test set score={test_score:.2f}%",
159 if best_score is None or score > best_score:
161 best_test_score = test_score
162 best_training_score = training_score
165 if not self.spec.quiet:
166 print(f"New best score {best_score:.2f}% with params {params}")
168 if not self.spec.quiet:
169 executors.DefaultExecutors().shutdown()
170 msg = f"Done training; best test set score was: {best_test_score:.1f}%"
174 assert best_training_score is not None
175 assert best_test_score is not None
176 assert best_params is not None
181 ) = self.maybe_persist_scaler_and_model(
190 model_filename=model_filename,
191 model_info_filename=model_info_filename,
192 scaler_filename=scaler_filename,
193 training_score=best_training_score,
194 test_score=best_test_score,
197 @par.parallelize(method=par.Method.THREAD)
198 def read_files_from_list(self, files: List[str], n: int) -> Tuple[List, List]:
205 for filename in files:
207 with open(filename, "r") as f:
208 lines = f.readlines()
210 # This example's features
214 # We expect lines in features files to be of the form:
219 (key, value) = line.split(self.spec.key_value_delimiter)
221 logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
225 value = value.strip()
226 if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
227 logger.debug(f"Skipping feature {key}")
230 value = self.normalize_feature(value)
232 if key == self.spec.label:
238 # Make sure we saw a label and the requisite number of features.
239 if len(x) == self.spec.feature_count and wrote_label:
241 self.file_done_count += 1
246 if self.spec.delete_bad_inputs:
247 msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. DELETING."
252 msg = f"WARNING: {filename}: missing features or label; expected {self.spec.feature_count} but saw {len(x)}. Skipping."
257 def make_progress_graph(self) -> None:
258 if not self.spec.quiet:
259 from text_utils import progress_graph
261 progress_graph(self.file_done_count, self.total_file_count)
264 def read_input_files(self):
275 all_files = glob.glob(self.spec.file_glob)
276 self.total_file_count = len(all_files)
277 for n, files in enumerate(list_utils.shard(all_files, 500)):
278 file_list = list(files)
279 results.append(self.read_files_from_list(file_list, n))
281 for result in smart_future.wait_any(results, callback=self.make_progress_graph):
282 result = result._resolve()
287 if not self.spec.quiet:
288 print(" " * 80 + "\n")
291 def normalize_feature(self, value: str) -> Any:
292 if value in ("False", "None"):
294 elif value == "True":
296 elif isinstance(value, str) and "." in value:
297 ret = round(float(value) * 100.0)
302 def test_train_split(self, X, y) -> List:
303 logger.debug("Performing test/train split")
304 return train_test_split(
307 random_state=random.randrange(0, 1000),
311 self, X_train: np.ndarray, X_test: np.ndarray
312 ) -> Tuple[Any, np.ndarray, np.ndarray]:
313 logger.debug("Scaling data")
314 scaler = MinMaxScaler()
316 return (scaler, scaler.transform(X_train), scaler.transform(X_test))
318 # Note: children should implement. Consider using @parallelize.
320 def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
326 X_train_scaled: np.ndarray,
328 X_test_scaled: np.ndarray,
330 ) -> Tuple[np.float64, np.float64]:
331 logger.debug("Evaluating the model")
332 training_score = model.score(X_train_scaled, y_train) * 100.0
333 test_score = model.score(X_test_scaled, y_test) * 100.0
335 f"Model evaluation results: test_score={test_score:.5f}, "
336 f"train_score={training_score:.5f}"
338 return (training_score, test_score)
340 def maybe_persist_scaler_and_model(
342 training_score: np.float64,
343 test_score: np.float64,
348 ) -> Tuple[Optional[str], Optional[str], Optional[str]]:
349 if not self.spec.dry_run:
350 import datetime_utils
354 now: datetime.datetime = datetime_utils.now_pacific()
355 info = f"""Timestamp: {datetime_utils.datetime_to_string(now)}
356 Model params: {params}
357 Training examples: {num_examples}
358 Training set score: {training_score:.2f}%
359 Testing set score: {test_score:.2f}%"""
362 self.spec.persist_percentage_threshold is not None
363 and test_score > self.spec.persist_percentage_threshold
365 not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
367 scaler_filename = f"{self.spec.basename}_scaler.sav"
368 with open(scaler_filename, "wb") as fb:
369 pickle.dump(scaler, fb)
370 msg = f"Wrote {scaler_filename}"
373 model_filename = f"{self.spec.basename}_model.sav"
374 with open(model_filename, "wb") as fb:
375 pickle.dump(model, fb)
376 msg = f"Wrote {model_filename}"
379 model_info_filename = f"{self.spec.basename}_model_info.txt"
380 with open(model_info_filename, "w") as f:
382 msg = f"Wrote {model_info_filename}:"
385 print(string_utils.indent(info, 2))
387 return (scaler_filename, model_filename, model_info_filename)
388 return (None, None, None)