Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| from io import StringIO | |
| import pandas as pd | |
| import numpy as np | |
| import xgboost as xgb | |
| from math import sqrt | |
| from sklearn.metrics import mean_squared_error | |
| from sklearn.model_selection import train_test_split | |
| import plotly.express as px | |
| import logging | |
| from datetime import datetime | |
| import plotly.graph_objects as go | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import plotly.graph_objs as go | |
| from plotly.subplots import make_subplots | |
| from matplotlib import pyplot | |
| import whisper | |
| from openai import AzureOpenAI | |
| import json | |
| import re | |
| import gradio as gr | |
| # Configure logging | |
| logging.basicConfig( | |
| filename='demand_forecasting.log', # You can adjust the log file name here | |
| filemode='a', | |
| format='[%(asctime)s] [%(levelname)s] [%(filename)s] [%(lineno)s:%(funcName)s()] %(message)s', | |
| datefmt='%Y-%b-%d %H:%M:%S' | |
| ) | |
| LOGGER = logging.getLogger(__name__) | |
| log_level_env = 'INFO' # You can adjust the log level here | |
| log_level_dict = { | |
| 'DEBUG': logging.DEBUG, | |
| 'INFO': logging.INFO, | |
| 'WARNING': logging.WARNING, | |
| 'ERROR': logging.ERROR, | |
| 'CRITICAL': logging.CRITICAL | |
| } | |
| if log_level_env in log_level_dict: | |
| log_level = log_level_dict[log_level_env] | |
| else: | |
| log_level = log_level_dict['INFO'] | |
| LOGGER.setLevel(log_level) | |
| class DemandForecasting: | |
| def __init__(self): | |
| self.client = AzureOpenAI() | |
| self.whisper_model = whisper.load_model("medium.en") | |
| def get_column(self,train_csv_path: str): | |
| # Load the training data from the specified CSV file | |
| train_df = pd.read_csv(train_csv_path) | |
| column_names = train_df.columns.tolist() | |
| return column_names | |
| def load_data(self, train_csv_path: str) -> pd.DataFrame: | |
| """ | |
| Load training data from a CSV file. | |
| Args: | |
| train_csv_path (str): Path to the training CSV file. | |
| Returns: | |
| pd.DataFrame: DataFrame containing the training data. | |
| """ | |
| try: | |
| # Load the training data from the specified CSV file | |
| train_df = pd.read_csv(train_csv_path) | |
| # Return a tuple containing the training DataFrame | |
| return train_df | |
| except Exception as e: | |
| # Log an error message if an exception occurs during data loading | |
| LOGGER.error(f"Error loading data: {e}") | |
| # Return None | |
| return None | |
| def find_date_column(self, df_data: pd.DataFrame) -> str: | |
| """ | |
| Find the column containing date-type values from the DataFrame. | |
| Args: | |
| - df_data (pd.DataFrame): Input DataFrame. | |
| Returns: | |
| - str: Name of the column containing date-type values. | |
| """ | |
| for column in df_data.columns: | |
| # Check if the column can be converted to datetime | |
| try: | |
| pd.to_datetime(df_data[column]) | |
| return column | |
| except ValueError: | |
| pass | |
| # Return None if no date column is found | |
| return None | |
| def preprocess_data(self, df_data: pd.DataFrame, list_columns: list, target_column: str) -> pd.DataFrame: | |
| """ | |
| Transform date-related data in the DataFrame. | |
| Args: | |
| - df_data (pd.DataFrame): Input DataFrame. | |
| - list_columns (list): List of column names to retain. | |
| - target_column (str): Name of the target column. | |
| Returns: | |
| - pd.DataFrame: Transformed DataFrame. | |
| """ | |
| # Make a copy of the input DataFrame to avoid modifying the original data | |
| df_data = df_data.copy() | |
| list_columns.append(target_column) | |
| # Drop columns not in list_columns | |
| columns_to_drop = [col for col in df_data.columns if col not in list_columns] | |
| df_data.drop(columns=columns_to_drop, inplace=True) | |
| # Find the date column | |
| date_column = self.find_date_column(df_data) | |
| if date_column is None: | |
| raise ValueError("No date column found in the provided list of columns.") | |
| else: | |
| print("date_column", date_column) | |
| # Parse date information only if a valid date column is found | |
| df_data[date_column] = pd.to_datetime(df_data[date_column]) # Convert 'date' column to datetime format | |
| df_data['day'] = df_data[date_column].dt.day # Extract day of the month | |
| df_data['month'] = df_data[date_column].dt.month # Extract month | |
| df_data['year'] = df_data[date_column].dt.year # Extract year | |
| # Cyclical Encoding for Months | |
| df_data['month_sin'] = np.sin(2 * np.pi * df_data['month'] / 12) # Cyclical sine encoding for month | |
| df_data['month_cos'] = np.cos(2 * np.pi * df_data['month'] / 12) # Cyclical cosine encoding for month | |
| # Day of the Week | |
| df_data['day_of_week'] = df_data[date_column].dt.weekday # Extract day of the week (0 = Monday, 6 = Sunday) | |
| # Week of the Year | |
| df_data['week_of_year'] = df_data[date_column].dt.isocalendar().week.astype(int) # Extract week of the year as integer | |
| df_data.drop(columns=[date_column], axis=1, inplace=True) # Drop the original date column | |
| return df_data | |
| def train_model(self, train: pd.DataFrame, target_column, list_columns) -> tuple: | |
| """ | |
| Train an XGBoost model using the provided training data. | |
| Args: | |
| - train (pd.DataFrame): DataFrame containing training data. | |
| Returns: | |
| - tuple: A tuple containing the trained model, true validation labels, and predicted validation labels. | |
| """ | |
| try: | |
| # Extract features and target variable | |
| X = train.drop(columns=[target_column]) | |
| y = train[target_column] | |
| # Cannot use cross validation because it will use future data | |
| X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=333) | |
| # Convert data into DMatrix format for XGBoost | |
| dtrain = xgb.DMatrix(X_train, label=y_train) | |
| dval = xgb.DMatrix(X_val, label=y_val) | |
| # Parameters for XGBoost | |
| param = { | |
| 'max_depth': 9, | |
| 'eta': 0.3, | |
| 'objective': 'reg:squarederror' | |
| } | |
| num_round = 60 | |
| # Train the model | |
| model_xgb = xgb.train(param, dtrain, num_round) | |
| # Validate the model | |
| y_val_pred = model_xgb.predict(dval) # Predict validation set labels | |
| # Calculate mean squared error | |
| mse = mean_squared_error(y_val, y_val_pred) | |
| # Print validation RMSE | |
| validation = f"Validation RMSE: {np.sqrt(mse)}" | |
| # Return trained model, true validation labels, and predicted validation labels | |
| return model_xgb, y_val, y_val_pred, validation | |
| except Exception as e: | |
| # Log an error message if an exception occurs during model training | |
| LOGGER.error(f"Error training model: {e}") | |
| # Return None for all outputs in case of an error | |
| return None, None, None | |
| def plot_line_graph(self, y_val, y_val_pred): | |
| # Take only the first 1000 data points | |
| num_data_points = 1000 | |
| y_val = y_val[:num_data_points] | |
| y_val_pred = y_val_pred[:num_data_points] | |
| # Create Plotly figure | |
| fig = make_subplots(rows=1, cols=1) | |
| # Add actual vs predicted traces to the figure (line plot) | |
| fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='lines', name='Actual'), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='lines', name='Predicted'), row=1, col=1) | |
| # Update layout | |
| fig.update_layout(title='Actual vs Predicted Over Time', xaxis_title='Time', yaxis_title='Value') | |
| # Show interactive plot | |
| fig.show() | |
| return fig | |
| def plot_scatter_plot(self, y_val, y_val_pred): | |
| # Take only the first 1000 data points | |
| num_data_points = 1000 | |
| y_val = y_val[:num_data_points] | |
| y_val_pred = y_val_pred[:num_data_points] | |
| # Create Plotly figure | |
| fig = make_subplots(rows=1, cols=1) | |
| # Add scatter plots for actual vs predicted (scatter plot) | |
| fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val, mode='markers', name='Actual', marker=dict(color='blue', size=8)), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=np.arange(len(y_val)), y=y_val_pred, mode='markers', name='Predicted', marker=dict(color='orange', size=8)), row=1, col=1) | |
| # Update layout | |
| fig.update_layout(title='Actual vs Predicted Over Time (Scatter Plot)', xaxis_title='Time', yaxis_title='Value') | |
| # Show interactive plot | |
| fig.show() | |
| return fig | |
| def predict_sales_for_date(self, input_data, model: xgb.Booster) -> float: | |
| """ | |
| Predict the sales for a specific date using the trained model. | |
| Args: | |
| - date_input (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format). | |
| - model (xgb.Booster): Trained XGBoost model. | |
| - features (pd.DataFrame): DataFrame containing features for the date. | |
| Returns: | |
| - float: Predicted sales value. | |
| """ | |
| try: | |
| input_features = pd.DataFrame([input_data]) | |
| # Regular expression pattern for date in the format 'dd-mm-yyyy' | |
| for key, value in input_data.items(): | |
| if isinstance(value, str) and re.match(r'\d{2}-\d{2}-\d{4}', value): | |
| date_column = key | |
| if date_column: | |
| # # Assuming date_input is a datetime object | |
| date_input = pd.to_datetime(input_features[date_column]) | |
| # Extract day of the month | |
| input_features['day'] = date_input.dt.day | |
| # Extract month | |
| input_features['month'] = date_input.dt.month | |
| # Extract year | |
| input_features['year'] = date_input.dt.year | |
| # Cyclical sine encoding for month | |
| input_features['month_sin'] = np.sin(2 * np.pi * input_features['month'] / 12) | |
| # Cyclical cosine encoding for month | |
| input_features['month_cos'] = np.cos(2 * np.pi * input_features['month'] / 12) | |
| # Extract day of the week (0 = Monday, 6 = Sunday) | |
| input_features['day_of_week'] = date_input.dt.weekday | |
| # Extract week of the year as integer | |
| input_features['week_of_year'] = date_input.dt.isocalendar().week | |
| input_features.drop(columns=[date_column], inplace=True) | |
| # Convert input features to DMatrix format | |
| dinput = xgb.DMatrix(input_features) | |
| # Make predictions using the trained model | |
| predicted_sales = model.predict(dinput)[0] | |
| # Print the predicted sales value | |
| predicted_result = f"""{input_data[str(date_column)]}Predicted Value Is {predicted_sales}""" | |
| # Return the predicted sales value | |
| return predicted_result | |
| except Exception as e: | |
| # Log an error message if an exception occurs during sales prediction | |
| LOGGER.error(f"Error predicting sales: {e}") | |
| # Return None in case of an error | |
| return None | |
| def audio_to_text(self, audio_path): | |
| """ | |
| transcribe the audio to text. | |
| """ | |
| result = self.whisper_model.transcribe(audio_path) | |
| print("audio_to_text",result["text"]) | |
| return result["text"] | |
| def parse_text(self, text, column_list): | |
| # Define the prompt or input for the model | |
| conversation =[{"role": "system", "content": ""}, | |
| {"role": "user", "content":f""" extract the {column_list}. al | |
| l values should be intiger data type. if date in there the format is dd-mm-YYYY. | |
| text```{text}``` | |
| return result should be in JSON format: | |
| """ | |
| }] | |
| # Generate a response from the GPT-3 model | |
| chat_completion = self.client.chat.completions.create( | |
| model = "GPT-3", | |
| messages = conversation, | |
| max_tokens=500, | |
| temperature=0, | |
| n=1, | |
| stop=None, | |
| ) | |
| # Extract the generated text from the API response | |
| generated_text = chat_completion.choices[0].message.content | |
| # Assuming jsonString is your JSON string | |
| json_data = json.loads(generated_text) | |
| print("parse_text",json_data) | |
| return json_data | |
| def main(self, train_csv_path: str, audio_path, target_column, column_list) -> None: | |
| """ | |
| Main function to execute the demand forecasting pipeline. | |
| Args: | |
| - train_csv_path (str): Path to the training CSV file. | |
| - date (str): Date for which sales prediction is needed (in 'YYYY-MM-DD' format). | |
| """ | |
| try: | |
| # Split the string by comma and convert it into a list | |
| column_list = column_list.split(",") | |
| text = self.audio_to_text(audio_path) | |
| input_data = self.parse_text(text, column_list) | |
| #load data | |
| train_data = self.load_data(train_csv_path) | |
| #preprocess the train data | |
| train_df = self.preprocess_data(train_data, column_list, target_column) | |
| # Train model and get validation predictions | |
| trained_model, y_val, y_val_pred, validation = self.train_model(train_df, target_column, column_list) | |
| # Plot interactive evaluation for training | |
| line_graph = self.plot_line_graph(y_val, y_val_pred) | |
| scatter_plot = self.plot_scatter_plot(y_val, y_val_pred) | |
| # Predict sales for the specified date using the trained model | |
| predicted_value = self.predict_sales_for_date(input_data, trained_model) | |
| return plot, predicted_value, validation | |
| except Exception as e: | |
| # Log an error message if an exception occurs in the main function | |
| LOGGER.error(f"Error in main function: {e}") | |
| def gradio_interface(self): | |
| with gr.Blocks(css="style.css", theme="freddyaboulton/test-blue") as demo: | |
| gr.HTML("""<center><h1 style="color:#fff">Demand Forecasting</h1></center>""") | |
| with gr.Row(): | |
| with gr.Column(scale=0.50): | |
| train_csv = gr.File(elem_classes="uploadbutton") | |
| with gr.Column(scale=0.50): | |
| column_list = gr.Textbox(label="Column List") | |
| with gr.Row(): | |
| with gr.Column(scale=0.50): | |
| audio_path = gr.Audio(sources=["microphone"], type="filepath") | |
| with gr.Row(): | |
| with gr.Column(scale=0.50): | |
| selected_column = gr.Textbox(label="Select column") | |
| with gr.Column(scale=0.50): | |
| target_column = gr.Textbox(label="target column") | |
| with gr.Row(): | |
| validation = gr.Textbox(label="Validation") | |
| predicted_result = gr.Textbox(label="Predicted Result") | |
| plot = gr.Plot() | |
| train_csv.upload(self.get_column, train_csv, column_list) | |
| audio_path.stop_recording(self.main, [train_csv, audio_path, target_column, selected_column], [plot, predicted_result, validation]) | |
| demo.launch(debug=True) | |
| if __name__ == "__main__": | |
| demand = DemandForecasting() | |
| demand.gradio_interface() |