import copy from collections import defaultdict from dataclasses import dataclass from functools import partial from typing import List, Optional import numpy as np from sklearn import ensemble, impute, linear_model from sklearn import metrics as skmetrics from sklearn import naive_bayes, neighbors, pipeline, preprocessing, svm, tree from xgboost import XGBClassifier, XGBRegressor MARKDOWN = """ --- tags: - autotrain - tabular - {task} - tabular-{task} datasets: - {dataset} --- # Model Trained Using AutoTrain - Problem type: Tabular {task} ## Validation Metrics {metrics} ## Best Params {params} ## Usage ```python import json import joblib import pandas as pd model = joblib.load('model.joblib') config = json.load(open('config.json')) features = config['features'] # data = pd.read_csv("data.csv") data = data[features] predictions = model.predict(data) # or model.predict_proba(data) # predictions can be converted to original labels using label_encoders.pkl ``` """ _MODELS: dict = defaultdict(dict) _MODELS["xgboost"]["classification"] = XGBClassifier _MODELS["xgboost"]["regression"] = XGBRegressor _MODELS["logistic_regression"]["classification"] = linear_model.LogisticRegression _MODELS["logistic_regression"]["regression"] = linear_model.LogisticRegression _MODELS["random_forest"]["classification"] = ensemble.RandomForestClassifier _MODELS["random_forest"]["regression"] = ensemble.RandomForestRegressor _MODELS["extra_trees"]["classification"] = ensemble.ExtraTreesClassifier _MODELS["extra_trees"]["regression"] = ensemble.ExtraTreesRegressor _MODELS["gradient_boosting"]["classification"] = ensemble.GradientBoostingClassifier _MODELS["gradient_boosting"]["regression"] = ensemble.GradientBoostingRegressor _MODELS["adaboost"]["classification"] = ensemble.AdaBoostClassifier _MODELS["adaboost"]["regression"] = ensemble.AdaBoostRegressor _MODELS["ridge"]["classification"] = linear_model.RidgeClassifier _MODELS["ridge"]["regression"] = linear_model.Ridge _MODELS["svm"]["classification"] = svm.LinearSVC _MODELS["svm"]["regression"] = svm.LinearSVR _MODELS["decision_tree"]["classification"] = tree.DecisionTreeClassifier _MODELS["decision_tree"]["regression"] = tree.DecisionTreeRegressor _MODELS["lasso"]["regression"] = linear_model.Lasso _MODELS["linear_regression"]["regression"] = linear_model.LinearRegression _MODELS["naive_bayes"]["classification"] = naive_bayes.GaussianNB _MODELS["knn"]["classification"] = neighbors.KNeighborsClassifier _MODELS["knn"]["regression"] = neighbors.KNeighborsRegressor CLASSIFICATION_TASKS = ("binary_classification", "multi_class_classification", "multi_label_classification") REGRESSION_TASKS = ("single_column_regression", "multi_column_regression") @dataclass class TabularMetrics: """ A class to calculate various metrics for different types of tabular tasks. Attributes: ----------- sub_task : str The type of sub-task. It can be one of the following: - "binary_classification" - "multi_class_classification" - "single_column_regression" - "multi_column_regression" - "multi_label_classification" labels : Optional[List], optional The list of labels for multi-class classification tasks (default is None). Methods: -------- __post_init__(): Initializes the valid metrics based on the sub-task type. calculate(y_true, y_pred): Calculates the metrics based on the true and predicted values. Parameters: ----------- y_true : array-like True labels or values. y_pred : array-like Predicted labels or values. Returns: -------- dict A dictionary with metric names as keys and their calculated values as values. """ sub_task: str labels: Optional[List] = None def __post_init__(self): if self.sub_task == "binary_classification": self.valid_metrics = { "auc": skmetrics.roc_auc_score, "logloss": skmetrics.log_loss, "f1": skmetrics.f1_score, "accuracy": skmetrics.accuracy_score, "precision": skmetrics.precision_score, "recall": skmetrics.recall_score, } elif self.sub_task == "multi_class_classification": self.valid_metrics = { "logloss": partial(skmetrics.log_loss, labels=self.labels), "accuracy": skmetrics.accuracy_score, "mlogloss": partial(skmetrics.log_loss, labels=self.labels), "f1_macro": partial(skmetrics.f1_score, average="macro", labels=self.labels), "f1_micro": partial(skmetrics.f1_score, average="micro", labels=self.labels), "f1_weighted": partial(skmetrics.f1_score, average="weighted", labels=self.labels), "precision_macro": partial(skmetrics.precision_score, average="macro", labels=self.labels), "precision_micro": partial(skmetrics.precision_score, average="micro", labels=self.labels), "precision_weighted": partial(skmetrics.precision_score, average="weighted", labels=self.labels), "recall_macro": partial(skmetrics.recall_score, average="macro", labels=self.labels), "recall_micro": partial(skmetrics.recall_score, average="micro", labels=self.labels), "recall_weighted": partial(skmetrics.recall_score, average="weighted", labels=self.labels), } elif self.sub_task in ("single_column_regression", "multi_column_regression"): self.valid_metrics = { "r2": skmetrics.r2_score, "mse": skmetrics.mean_squared_error, "mae": skmetrics.mean_absolute_error, "rmse": partial(skmetrics.mean_squared_error, squared=False), "rmsle": partial(skmetrics.mean_squared_log_error, squared=False), } elif self.sub_task == "multi_label_classification": self.valid_metrics = { "logloss": skmetrics.log_loss, } else: raise ValueError("Invalid problem type") def calculate(self, y_true, y_pred): metrics = {} for metric_name, metric_func in self.valid_metrics.items(): if self.sub_task == "binary_classification": if metric_name == "auc": metrics[metric_name] = metric_func(y_true, y_pred[:, 1]) elif metric_name == "logloss": metrics[metric_name] = metric_func(y_true, y_pred) else: metrics[metric_name] = metric_func(y_true, y_pred[:, 1] >= 0.5) elif self.sub_task == "multi_class_classification": if metric_name in ( "accuracy", "f1_macro", "f1_micro", "f1_weighted", "precision_macro", "precision_micro", "precision_weighted", "recall_macro", "recall_micro", "recall_weighted", ): metrics[metric_name] = metric_func(y_true, np.argmax(y_pred, axis=1)) else: metrics[metric_name] = metric_func(y_true, y_pred) else: if metric_name == "rmsle": temp_pred = copy.deepcopy(y_pred) temp_pred = np.clip(temp_pred, 0, None) metrics[metric_name] = metric_func(y_true, temp_pred) else: metrics[metric_name] = metric_func(y_true, y_pred) return metrics class TabularModel: """ A class used to represent a Tabular Model for AutoTrain training. Attributes ---------- model : str The name of the model to be used. preprocessor : object The preprocessor to be applied to the data. sub_task : str The sub-task type, either classification or regression. params : dict The parameters to be passed to the model. use_predict_proba : bool A flag indicating whether to use the predict_proba method. Methods ------- _get_model(): Retrieves the appropriate model based on the sub-task and model name. """ def __init__(self, model, preprocessor, sub_task, params): self.model = model self.preprocessor = preprocessor self.sub_task = sub_task self.params = params self.use_predict_proba = True _model = self._get_model() if self.preprocessor is not None: self.pipeline = pipeline.Pipeline([("preprocessor", self.preprocessor), ("model", _model)]) else: self.pipeline = pipeline.Pipeline([("model", _model)]) def _get_model(self): if self.model in _MODELS: if self.sub_task in CLASSIFICATION_TASKS: if self.model in ("svm", "ridge"): self.use_predict_proba = False return _MODELS[self.model]["classification"](**self.params) elif self.sub_task in REGRESSION_TASKS: self.use_predict_proba = False return _MODELS[self.model]["regression"](**self.params) else: raise ValueError("Invalid task") else: raise ValueError("Invalid model") def get_params(trial, model, task): if model == "xgboost": params = { "learning_rate": trial.suggest_float("learning_rate", 1e-2, 0.25, log=True), "reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True), "reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True), "subsample": trial.suggest_float("subsample", 0.1, 1.0), "colsample_bytree": trial.suggest_float("colsample_bytree", 0.1, 1.0), "max_depth": trial.suggest_int("max_depth", 1, 9), "early_stopping_rounds": trial.suggest_int("early_stopping_rounds", 100, 500), "n_estimators": trial.suggest_categorical("n_estimators", [7000, 15000, 20000]), "tree_method": "hist", "random_state": 42, } return params if model == "logistic_regression": if task in CLASSIFICATION_TASKS: params = { "C": trial.suggest_float("C", 1e-8, 1e3, log=True), "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), "solver": trial.suggest_categorical("solver", ["liblinear", "saga"]), "penalty": trial.suggest_categorical("penalty", ["l1", "l2"]), "n_jobs": -1, } return params raise ValueError("Task not supported") if model == "random_forest": params = { "n_estimators": trial.suggest_int("n_estimators", 10, 10000), "max_depth": trial.suggest_int("max_depth", 2, 15), "max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]), "min_samples_split": trial.suggest_int("min_samples_split", 2, 20), "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20), "bootstrap": trial.suggest_categorical("bootstrap", [True, False]), "n_jobs": -1, } if task in CLASSIFICATION_TASKS: params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"]) return params if task in REGRESSION_TASKS: params["criterion"] = trial.suggest_categorical( "criterion", ["squared_error", "absolute_error", "poisson"] ) return params raise ValueError("Task not supported") if model == "extra_trees": params = { "n_estimators": trial.suggest_int("n_estimators", 10, 10000), "max_depth": trial.suggest_int("max_depth", 2, 15), "max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]), "min_samples_split": trial.suggest_int("min_samples_split", 2, 20), "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20), "bootstrap": trial.suggest_categorical("bootstrap", [True, False]), "n_jobs": -1, } if task in CLASSIFICATION_TASKS: params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"]) return params if task in REGRESSION_TASKS: params["criterion"] = trial.suggest_categorical("criterion", ["squared_error", "absolute_error"]) return params raise ValueError("Task not supported") if model == "decision_tree": params = { "max_depth": trial.suggest_int("max_depth", 1, 15), "min_samples_split": trial.suggest_int("min_samples_split", 2, 20), "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20), "max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]), "splitter": trial.suggest_categorical("splitter", ["best", "random"]), } if task in CLASSIFICATION_TASKS: params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"]) return params if task in REGRESSION_TASKS: params["criterion"] = trial.suggest_categorical( "criterion", ["squared_error", "absolute_error", "friedman_mse", "poisson"] ) return params raise ValueError("Task not supported") if model == "linear_regression": if task in REGRESSION_TASKS: params = { "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), } return params raise ValueError("Task not supported") if model == "svm": if task in CLASSIFICATION_TASKS: params = { "C": trial.suggest_float("C", 1e-8, 1e3, log=True), "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), "penalty": "l2", "max_iter": trial.suggest_int("max_iter", 1000, 10000), } return params if task in REGRESSION_TASKS: params = { "C": trial.suggest_float("C", 1e-8, 1e3, log=True), "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), "loss": trial.suggest_categorical("loss", ["epsilon_insensitive", "squared_epsilon_insensitive"]), "epsilon": trial.suggest_float("epsilon", 1e-8, 1e-1, log=True), "max_iter": trial.suggest_int("max_iter", 1000, 10000), } return params raise ValueError("Task not supported") if model == "ridge": params = { "alpha": trial.suggest_float("alpha", 1e-8, 1e3, log=True), "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), "max_iter": trial.suggest_int("max_iter", 1000, 10000), } if task in CLASSIFICATION_TASKS: return params if task in REGRESSION_TASKS: return params raise ValueError("Task not supported") if model == "lasso": if task in REGRESSION_TASKS: params = { "alpha": trial.suggest_float("alpha", 1e-8, 1e3, log=True), "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]), "max_iter": trial.suggest_int("max_iter", 1000, 10000), } return params raise ValueError("Task not supported") if model == "knn": params = { "n_neighbors": trial.suggest_int("n_neighbors", 1, 25), "weights": trial.suggest_categorical("weights", ["uniform", "distance"]), "algorithm": trial.suggest_categorical("algorithm", ["ball_tree", "kd_tree", "brute"]), "leaf_size": trial.suggest_int("leaf_size", 1, 100), "p": trial.suggest_categorical("p", [1, 2]), "metric": trial.suggest_categorical("metric", ["minkowski", "euclidean", "manhattan"]), } if task in CLASSIFICATION_TASKS or task in REGRESSION_TASKS: return params raise ValueError("Task not supported") return ValueError("Invalid model") def get_imputer(imputer_name): """ Returns an imputer object based on the specified imputer name. Parameters: imputer_name (str): The name of the imputer to use. Can be one of the following: - "median": Uses the median value for imputation. - "mean": Uses the mean value for imputation. - "most_frequent": Uses the most frequent value for imputation. If None, returns None. Returns: impute.SimpleImputer or None: An instance of SimpleImputer with the specified strategy, or None if imputer_name is None. Raises: ValueError: If an invalid imputer_name is provided. """ if imputer_name is None: return None if imputer_name == "median": return impute.SimpleImputer(strategy="median") if imputer_name == "mean": return impute.SimpleImputer(strategy="mean") if imputer_name == "most_frequent": return impute.SimpleImputer(strategy="most_frequent") raise ValueError("Invalid imputer") def get_scaler(scaler_name): """ Returns a scaler object based on the provided scaler name. Parameters: scaler_name (str): The name of the scaler to be returned. Possible values are "standard", "minmax", "robust", and "normal". If None, returns None. Returns: scaler: An instance of the corresponding scaler from sklearn.preprocessing. If the scaler_name is None, returns None. Raises: ValueError: If the scaler_name is not one of the expected values. """ if scaler_name is None: return None if scaler_name == "standard": return preprocessing.StandardScaler() if scaler_name == "minmax": return preprocessing.MinMaxScaler() if scaler_name == "robust": return preprocessing.RobustScaler() if scaler_name == "normal": return preprocessing.Normalizer() raise ValueError("Invalid scaler") def get_metric_direction(sub_task): """ Determines the appropriate metric and its optimization direction based on the given sub-task. Parameters: sub_task (str): The type of sub-task. Must be one of the following: - "binary_classification" - "multi_class_classification" - "single_column_regression" - "multi_label_classification" - "multi_column_regression" Returns: tuple: A tuple containing: - str: The metric to be used (e.g., "logloss", "mlogloss", "rmse"). - str: The direction of optimization ("minimize"). Raises: ValueError: If the provided sub_task is not one of the recognized types. """ if sub_task == "binary_classification": return "logloss", "minimize" if sub_task == "multi_class_classification": return "mlogloss", "minimize" if sub_task == "single_column_regression": return "rmse", "minimize" if sub_task == "multi_label_classification": return "logloss", "minimize" if sub_task == "multi_column_regression": return "rmse", "minimize" raise ValueError("Invalid sub_task") def get_categorical_columns(df): """ Extracts the names of categorical columns from a DataFrame. Parameters: df (pandas.DataFrame): The DataFrame from which to extract categorical columns. Returns: list: A list of column names that are of categorical data type (either 'category' or 'object'). """ return list(df.select_dtypes(include=["category", "object"]).columns) def get_numerical_columns(df): """ Extracts and returns a list of numerical column names from a given DataFrame. Args: df (pandas.DataFrame): The DataFrame from which to extract numerical columns. Returns: list: A list of column names that have numerical data types. """ return list(df.select_dtypes(include=["number"]).columns) def create_model_card(config, sub_task, best_params, best_metrics): """ Generates a markdown formatted model card with the given configuration, sub-task, best parameters, and best metrics. Args: config (object): Configuration object containing task and data path information. sub_task (str): The specific sub-task for which the model card is being created. best_params (dict): Dictionary containing the best hyperparameters for the model. best_metrics (dict): Dictionary containing the best performance metrics for the model. Returns: str: A string containing the formatted model card in markdown. """ best_metrics = "\n".join([f"- {k}: {v}" for k, v in best_metrics.items()]) best_params = "\n".join([f"- {k}: {v}" for k, v in best_params.items()]) return MARKDOWN.format( task=config.task, dataset=config.data_path, metrics=best_metrics, params=best_params, )