hardiktiwari's picture
Upload 244 files
33d4721 verified
import copy
from collections import defaultdict
from dataclasses import dataclass
from functools import partial
from typing import List, Optional
import numpy as np
from sklearn import ensemble, impute, linear_model
from sklearn import metrics as skmetrics
from sklearn import naive_bayes, neighbors, pipeline, preprocessing, svm, tree
from xgboost import XGBClassifier, XGBRegressor
MARKDOWN = """
---
tags:
- autotrain
- tabular
- {task}
- tabular-{task}
datasets:
- {dataset}
---
# Model Trained Using AutoTrain
- Problem type: Tabular {task}
## Validation Metrics
{metrics}
## Best Params
{params}
## Usage
```python
import json
import joblib
import pandas as pd
model = joblib.load('model.joblib')
config = json.load(open('config.json'))
features = config['features']
# data = pd.read_csv("data.csv")
data = data[features]
predictions = model.predict(data) # or model.predict_proba(data)
# predictions can be converted to original labels using label_encoders.pkl
```
"""
_MODELS: dict = defaultdict(dict)
_MODELS["xgboost"]["classification"] = XGBClassifier
_MODELS["xgboost"]["regression"] = XGBRegressor
_MODELS["logistic_regression"]["classification"] = linear_model.LogisticRegression
_MODELS["logistic_regression"]["regression"] = linear_model.LogisticRegression
_MODELS["random_forest"]["classification"] = ensemble.RandomForestClassifier
_MODELS["random_forest"]["regression"] = ensemble.RandomForestRegressor
_MODELS["extra_trees"]["classification"] = ensemble.ExtraTreesClassifier
_MODELS["extra_trees"]["regression"] = ensemble.ExtraTreesRegressor
_MODELS["gradient_boosting"]["classification"] = ensemble.GradientBoostingClassifier
_MODELS["gradient_boosting"]["regression"] = ensemble.GradientBoostingRegressor
_MODELS["adaboost"]["classification"] = ensemble.AdaBoostClassifier
_MODELS["adaboost"]["regression"] = ensemble.AdaBoostRegressor
_MODELS["ridge"]["classification"] = linear_model.RidgeClassifier
_MODELS["ridge"]["regression"] = linear_model.Ridge
_MODELS["svm"]["classification"] = svm.LinearSVC
_MODELS["svm"]["regression"] = svm.LinearSVR
_MODELS["decision_tree"]["classification"] = tree.DecisionTreeClassifier
_MODELS["decision_tree"]["regression"] = tree.DecisionTreeRegressor
_MODELS["lasso"]["regression"] = linear_model.Lasso
_MODELS["linear_regression"]["regression"] = linear_model.LinearRegression
_MODELS["naive_bayes"]["classification"] = naive_bayes.GaussianNB
_MODELS["knn"]["classification"] = neighbors.KNeighborsClassifier
_MODELS["knn"]["regression"] = neighbors.KNeighborsRegressor
CLASSIFICATION_TASKS = ("binary_classification", "multi_class_classification", "multi_label_classification")
REGRESSION_TASKS = ("single_column_regression", "multi_column_regression")
@dataclass
class TabularMetrics:
"""
A class to calculate various metrics for different types of tabular tasks.
Attributes:
-----------
sub_task : str
The type of sub-task. It can be one of the following:
- "binary_classification"
- "multi_class_classification"
- "single_column_regression"
- "multi_column_regression"
- "multi_label_classification"
labels : Optional[List], optional
The list of labels for multi-class classification tasks (default is None).
Methods:
--------
__post_init__():
Initializes the valid metrics based on the sub-task type.
calculate(y_true, y_pred):
Calculates the metrics based on the true and predicted values.
Parameters:
-----------
y_true : array-like
True labels or values.
y_pred : array-like
Predicted labels or values.
Returns:
--------
dict
A dictionary with metric names as keys and their calculated values as values.
"""
sub_task: str
labels: Optional[List] = None
def __post_init__(self):
if self.sub_task == "binary_classification":
self.valid_metrics = {
"auc": skmetrics.roc_auc_score,
"logloss": skmetrics.log_loss,
"f1": skmetrics.f1_score,
"accuracy": skmetrics.accuracy_score,
"precision": skmetrics.precision_score,
"recall": skmetrics.recall_score,
}
elif self.sub_task == "multi_class_classification":
self.valid_metrics = {
"logloss": partial(skmetrics.log_loss, labels=self.labels),
"accuracy": skmetrics.accuracy_score,
"mlogloss": partial(skmetrics.log_loss, labels=self.labels),
"f1_macro": partial(skmetrics.f1_score, average="macro", labels=self.labels),
"f1_micro": partial(skmetrics.f1_score, average="micro", labels=self.labels),
"f1_weighted": partial(skmetrics.f1_score, average="weighted", labels=self.labels),
"precision_macro": partial(skmetrics.precision_score, average="macro", labels=self.labels),
"precision_micro": partial(skmetrics.precision_score, average="micro", labels=self.labels),
"precision_weighted": partial(skmetrics.precision_score, average="weighted", labels=self.labels),
"recall_macro": partial(skmetrics.recall_score, average="macro", labels=self.labels),
"recall_micro": partial(skmetrics.recall_score, average="micro", labels=self.labels),
"recall_weighted": partial(skmetrics.recall_score, average="weighted", labels=self.labels),
}
elif self.sub_task in ("single_column_regression", "multi_column_regression"):
self.valid_metrics = {
"r2": skmetrics.r2_score,
"mse": skmetrics.mean_squared_error,
"mae": skmetrics.mean_absolute_error,
"rmse": partial(skmetrics.mean_squared_error, squared=False),
"rmsle": partial(skmetrics.mean_squared_log_error, squared=False),
}
elif self.sub_task == "multi_label_classification":
self.valid_metrics = {
"logloss": skmetrics.log_loss,
}
else:
raise ValueError("Invalid problem type")
def calculate(self, y_true, y_pred):
metrics = {}
for metric_name, metric_func in self.valid_metrics.items():
if self.sub_task == "binary_classification":
if metric_name == "auc":
metrics[metric_name] = metric_func(y_true, y_pred[:, 1])
elif metric_name == "logloss":
metrics[metric_name] = metric_func(y_true, y_pred)
else:
metrics[metric_name] = metric_func(y_true, y_pred[:, 1] >= 0.5)
elif self.sub_task == "multi_class_classification":
if metric_name in (
"accuracy",
"f1_macro",
"f1_micro",
"f1_weighted",
"precision_macro",
"precision_micro",
"precision_weighted",
"recall_macro",
"recall_micro",
"recall_weighted",
):
metrics[metric_name] = metric_func(y_true, np.argmax(y_pred, axis=1))
else:
metrics[metric_name] = metric_func(y_true, y_pred)
else:
if metric_name == "rmsle":
temp_pred = copy.deepcopy(y_pred)
temp_pred = np.clip(temp_pred, 0, None)
metrics[metric_name] = metric_func(y_true, temp_pred)
else:
metrics[metric_name] = metric_func(y_true, y_pred)
return metrics
class TabularModel:
"""
A class used to represent a Tabular Model for AutoTrain training.
Attributes
----------
model : str
The name of the model to be used.
preprocessor : object
The preprocessor to be applied to the data.
sub_task : str
The sub-task type, either classification or regression.
params : dict
The parameters to be passed to the model.
use_predict_proba : bool
A flag indicating whether to use the predict_proba method.
Methods
-------
_get_model():
Retrieves the appropriate model based on the sub-task and model name.
"""
def __init__(self, model, preprocessor, sub_task, params):
self.model = model
self.preprocessor = preprocessor
self.sub_task = sub_task
self.params = params
self.use_predict_proba = True
_model = self._get_model()
if self.preprocessor is not None:
self.pipeline = pipeline.Pipeline([("preprocessor", self.preprocessor), ("model", _model)])
else:
self.pipeline = pipeline.Pipeline([("model", _model)])
def _get_model(self):
if self.model in _MODELS:
if self.sub_task in CLASSIFICATION_TASKS:
if self.model in ("svm", "ridge"):
self.use_predict_proba = False
return _MODELS[self.model]["classification"](**self.params)
elif self.sub_task in REGRESSION_TASKS:
self.use_predict_proba = False
return _MODELS[self.model]["regression"](**self.params)
else:
raise ValueError("Invalid task")
else:
raise ValueError("Invalid model")
def get_params(trial, model, task):
if model == "xgboost":
params = {
"learning_rate": trial.suggest_float("learning_rate", 1e-2, 0.25, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-8, 100.0, log=True),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-8, 100.0, log=True),
"subsample": trial.suggest_float("subsample", 0.1, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.1, 1.0),
"max_depth": trial.suggest_int("max_depth", 1, 9),
"early_stopping_rounds": trial.suggest_int("early_stopping_rounds", 100, 500),
"n_estimators": trial.suggest_categorical("n_estimators", [7000, 15000, 20000]),
"tree_method": "hist",
"random_state": 42,
}
return params
if model == "logistic_regression":
if task in CLASSIFICATION_TASKS:
params = {
"C": trial.suggest_float("C", 1e-8, 1e3, log=True),
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
"solver": trial.suggest_categorical("solver", ["liblinear", "saga"]),
"penalty": trial.suggest_categorical("penalty", ["l1", "l2"]),
"n_jobs": -1,
}
return params
raise ValueError("Task not supported")
if model == "random_forest":
params = {
"n_estimators": trial.suggest_int("n_estimators", 10, 10000),
"max_depth": trial.suggest_int("max_depth", 2, 15),
"max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]),
"min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
"min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
"bootstrap": trial.suggest_categorical("bootstrap", [True, False]),
"n_jobs": -1,
}
if task in CLASSIFICATION_TASKS:
params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"])
return params
if task in REGRESSION_TASKS:
params["criterion"] = trial.suggest_categorical(
"criterion", ["squared_error", "absolute_error", "poisson"]
)
return params
raise ValueError("Task not supported")
if model == "extra_trees":
params = {
"n_estimators": trial.suggest_int("n_estimators", 10, 10000),
"max_depth": trial.suggest_int("max_depth", 2, 15),
"max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]),
"min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
"min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
"bootstrap": trial.suggest_categorical("bootstrap", [True, False]),
"n_jobs": -1,
}
if task in CLASSIFICATION_TASKS:
params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"])
return params
if task in REGRESSION_TASKS:
params["criterion"] = trial.suggest_categorical("criterion", ["squared_error", "absolute_error"])
return params
raise ValueError("Task not supported")
if model == "decision_tree":
params = {
"max_depth": trial.suggest_int("max_depth", 1, 15),
"min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
"min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 20),
"max_features": trial.suggest_categorical("max_features", ["auto", "sqrt", "log2", None]),
"splitter": trial.suggest_categorical("splitter", ["best", "random"]),
}
if task in CLASSIFICATION_TASKS:
params["criterion"] = trial.suggest_categorical("criterion", ["gini", "entropy"])
return params
if task in REGRESSION_TASKS:
params["criterion"] = trial.suggest_categorical(
"criterion", ["squared_error", "absolute_error", "friedman_mse", "poisson"]
)
return params
raise ValueError("Task not supported")
if model == "linear_regression":
if task in REGRESSION_TASKS:
params = {
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
}
return params
raise ValueError("Task not supported")
if model == "svm":
if task in CLASSIFICATION_TASKS:
params = {
"C": trial.suggest_float("C", 1e-8, 1e3, log=True),
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
"penalty": "l2",
"max_iter": trial.suggest_int("max_iter", 1000, 10000),
}
return params
if task in REGRESSION_TASKS:
params = {
"C": trial.suggest_float("C", 1e-8, 1e3, log=True),
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
"loss": trial.suggest_categorical("loss", ["epsilon_insensitive", "squared_epsilon_insensitive"]),
"epsilon": trial.suggest_float("epsilon", 1e-8, 1e-1, log=True),
"max_iter": trial.suggest_int("max_iter", 1000, 10000),
}
return params
raise ValueError("Task not supported")
if model == "ridge":
params = {
"alpha": trial.suggest_float("alpha", 1e-8, 1e3, log=True),
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
"max_iter": trial.suggest_int("max_iter", 1000, 10000),
}
if task in CLASSIFICATION_TASKS:
return params
if task in REGRESSION_TASKS:
return params
raise ValueError("Task not supported")
if model == "lasso":
if task in REGRESSION_TASKS:
params = {
"alpha": trial.suggest_float("alpha", 1e-8, 1e3, log=True),
"fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
"max_iter": trial.suggest_int("max_iter", 1000, 10000),
}
return params
raise ValueError("Task not supported")
if model == "knn":
params = {
"n_neighbors": trial.suggest_int("n_neighbors", 1, 25),
"weights": trial.suggest_categorical("weights", ["uniform", "distance"]),
"algorithm": trial.suggest_categorical("algorithm", ["ball_tree", "kd_tree", "brute"]),
"leaf_size": trial.suggest_int("leaf_size", 1, 100),
"p": trial.suggest_categorical("p", [1, 2]),
"metric": trial.suggest_categorical("metric", ["minkowski", "euclidean", "manhattan"]),
}
if task in CLASSIFICATION_TASKS or task in REGRESSION_TASKS:
return params
raise ValueError("Task not supported")
return ValueError("Invalid model")
def get_imputer(imputer_name):
"""
Returns an imputer object based on the specified imputer name.
Parameters:
imputer_name (str): The name of the imputer to use. Can be one of the following:
- "median": Uses the median value for imputation.
- "mean": Uses the mean value for imputation.
- "most_frequent": Uses the most frequent value for imputation.
If None, returns None.
Returns:
impute.SimpleImputer or None: An instance of SimpleImputer with the specified strategy,
or None if imputer_name is None.
Raises:
ValueError: If an invalid imputer_name is provided.
"""
if imputer_name is None:
return None
if imputer_name == "median":
return impute.SimpleImputer(strategy="median")
if imputer_name == "mean":
return impute.SimpleImputer(strategy="mean")
if imputer_name == "most_frequent":
return impute.SimpleImputer(strategy="most_frequent")
raise ValueError("Invalid imputer")
def get_scaler(scaler_name):
"""
Returns a scaler object based on the provided scaler name.
Parameters:
scaler_name (str): The name of the scaler to be returned.
Possible values are "standard", "minmax", "robust", and "normal".
If None, returns None.
Returns:
scaler: An instance of the corresponding scaler from sklearn.preprocessing.
If the scaler_name is None, returns None.
Raises:
ValueError: If the scaler_name is not one of the expected values.
"""
if scaler_name is None:
return None
if scaler_name == "standard":
return preprocessing.StandardScaler()
if scaler_name == "minmax":
return preprocessing.MinMaxScaler()
if scaler_name == "robust":
return preprocessing.RobustScaler()
if scaler_name == "normal":
return preprocessing.Normalizer()
raise ValueError("Invalid scaler")
def get_metric_direction(sub_task):
"""
Determines the appropriate metric and its optimization direction based on the given sub-task.
Parameters:
sub_task (str): The type of sub-task. Must be one of the following:
- "binary_classification"
- "multi_class_classification"
- "single_column_regression"
- "multi_label_classification"
- "multi_column_regression"
Returns:
tuple: A tuple containing:
- str: The metric to be used (e.g., "logloss", "mlogloss", "rmse").
- str: The direction of optimization ("minimize").
Raises:
ValueError: If the provided sub_task is not one of the recognized types.
"""
if sub_task == "binary_classification":
return "logloss", "minimize"
if sub_task == "multi_class_classification":
return "mlogloss", "minimize"
if sub_task == "single_column_regression":
return "rmse", "minimize"
if sub_task == "multi_label_classification":
return "logloss", "minimize"
if sub_task == "multi_column_regression":
return "rmse", "minimize"
raise ValueError("Invalid sub_task")
def get_categorical_columns(df):
"""
Extracts the names of categorical columns from a DataFrame.
Parameters:
df (pandas.DataFrame): The DataFrame from which to extract categorical columns.
Returns:
list: A list of column names that are of categorical data type (either 'category' or 'object').
"""
return list(df.select_dtypes(include=["category", "object"]).columns)
def get_numerical_columns(df):
"""
Extracts and returns a list of numerical column names from a given DataFrame.
Args:
df (pandas.DataFrame): The DataFrame from which to extract numerical columns.
Returns:
list: A list of column names that have numerical data types.
"""
return list(df.select_dtypes(include=["number"]).columns)
def create_model_card(config, sub_task, best_params, best_metrics):
"""
Generates a markdown formatted model card with the given configuration, sub-task, best parameters, and best metrics.
Args:
config (object): Configuration object containing task and data path information.
sub_task (str): The specific sub-task for which the model card is being created.
best_params (dict): Dictionary containing the best hyperparameters for the model.
best_metrics (dict): Dictionary containing the best performance metrics for the model.
Returns:
str: A string containing the formatted model card in markdown.
"""
best_metrics = "\n".join([f"- {k}: {v}" for k, v in best_metrics.items()])
best_params = "\n".join([f"- {k}: {v}" for k, v in best_params.items()])
return MARKDOWN.format(
task=config.task,
dataset=config.data_path,
metrics=best_metrics,
params=best_params,
)