Spaces:
Sleeping
Sleeping
| # Importing necessary libraries | |
| import streamlit as st | |
| st.set_page_config( | |
| page_title="Transformations", | |
| page_icon=":shark:", | |
| layout="wide", | |
| initial_sidebar_state="collapsed", | |
| ) | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| from utilities import set_header, load_local_css | |
| import streamlit_authenticator as stauth | |
| import yaml | |
| from yaml import SafeLoader | |
| import os | |
| import sqlite3 | |
| from utilities import update_db | |
| load_local_css("styles.css") | |
| set_header() | |
| # Check for authentication status | |
| for k, v in st.session_state.items(): | |
| if k not in ["logout", "login", "config"] and not k.startswith( | |
| "FormSubmitter" | |
| ): | |
| st.session_state[k] = v | |
| with open("config.yaml") as file: | |
| config = yaml.load(file, Loader=SafeLoader) | |
| st.session_state["config"] = config | |
| authenticator = stauth.Authenticate( | |
| config["credentials"], | |
| config["cookie"]["name"], | |
| config["cookie"]["key"], | |
| config["cookie"]["expiry_days"], | |
| config["preauthorized"], | |
| ) | |
| st.session_state["authenticator"] = authenticator | |
| name, authentication_status, username = authenticator.login("Login", "main") | |
| auth_status = st.session_state.get("authentication_status") | |
| if auth_status == True: | |
| authenticator.logout("Logout", "main") | |
| is_state_initiaized = st.session_state.get("initialized", False) | |
| if "project_dct" not in st.session_state: | |
| st.error("Please load a project from Home page") | |
| st.stop() | |
| conn = sqlite3.connect( | |
| r"DB/User.db", check_same_thread=False | |
| ) # connection with sql db | |
| c = conn.cursor() | |
| if not is_state_initiaized: | |
| if "session_name" not in st.session_state: | |
| st.session_state["session_name"] = None | |
| if not os.path.exists( | |
| os.path.join(st.session_state["project_path"], "data_import.pkl") | |
| ): | |
| st.error("Please move to Data Import page") | |
| # Deserialize and load the objects from the pickle file | |
| with open( | |
| os.path.join(st.session_state["project_path"], "data_import.pkl"), "rb" | |
| ) as f: | |
| data = pickle.load(f) | |
| # Accessing the loaded objects | |
| final_df_loaded = data["final_df"] | |
| bin_dict_loaded = data["bin_dict"] | |
| # final_df_loaded.to_csv("Test/final_df_loaded.csv",index=False) | |
| # Initialize session state==- | |
| if "transformed_columns_dict" not in st.session_state: | |
| st.session_state["transformed_columns_dict"] = ( | |
| {} | |
| ) # Default empty dictionary | |
| if "final_df" not in st.session_state: | |
| st.session_state["final_df"] = ( | |
| final_df_loaded # Default as original dataframe | |
| ) | |
| if "summary_string" not in st.session_state: | |
| st.session_state["summary_string"] = None # Default as None | |
| # Extract original columns for specified categories | |
| original_columns = { | |
| category: bin_dict_loaded[category] | |
| for category in ["Media", "Internal", "Exogenous"] | |
| if category in bin_dict_loaded | |
| } | |
| # Retrive Panel columns | |
| panel_1 = bin_dict_loaded.get("Panel Level 1") | |
| panel_2 = bin_dict_loaded.get("Panel Level 2") | |
| # # For testing on non panel level | |
| # final_df_loaded = final_df_loaded.drop("Panel_1", axis=1) | |
| # final_df_loaded = final_df_loaded.groupby("date").mean().reset_index() | |
| # panel_1 = None | |
| # Apply transformations on panel level | |
| if panel_1: | |
| panel = panel_1 + panel_2 if panel_2 else panel_1 | |
| else: | |
| panel = [] | |
| # Function to build transformation widgets | |
| def transformation_widgets(category, transform_params, date_granularity): | |
| if ( | |
| st.session_state["project_dct"]["transformations"] is None | |
| or st.session_state["project_dct"]["transformations"] == {} | |
| ): | |
| st.session_state["project_dct"]["transformations"] = {} | |
| if ( | |
| category | |
| not in st.session_state["project_dct"]["transformations"].keys() | |
| ): | |
| st.session_state["project_dct"]["transformations"][category] = {} | |
| # Define a dict of pre-defined default values of every transformation | |
| predefined_defualts = { | |
| "Lag": (1, 2), | |
| "Lead": (1, 2), | |
| "Moving Average": (1, 2), | |
| "Saturation": (10, 20), | |
| "Power": (2, 4), | |
| "Adstock": (0.5, 0.7), | |
| } | |
| def selection_change(): | |
| # Handles removing transformations | |
| if f"transformation_{category}" in st.session_state: | |
| current_selection = st.session_state[ | |
| f"transformation_{category}" | |
| ] | |
| past_selection = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category][f"transformation_{category}"] | |
| removed_selection = list( | |
| set(past_selection) - set(current_selection) | |
| ) | |
| for selection in removed_selection: | |
| # Option 1 - revert to defualt | |
| # st.session_state['project_dct']['transformations'][category][selection] = predefined_defualts[selection] | |
| # option 2 - delete from dict | |
| del st.session_state["project_dct"]["transformations"][ | |
| category | |
| ][selection] | |
| # Transformation Options | |
| transformation_options = { | |
| "Media": [ | |
| "Lag", | |
| "Moving Average", | |
| "Saturation", | |
| "Power", | |
| "Adstock", | |
| ], | |
| "Internal": ["Lead", "Lag", "Moving Average"], | |
| "Exogenous": ["Lead", "Lag", "Moving Average"], | |
| } | |
| expanded = st.session_state["project_dct"]["transformations"][ | |
| category | |
| ].get("expanded", False) | |
| st.session_state["project_dct"]["transformations"][category][ | |
| "expanded" | |
| ] = False | |
| with st.expander(f"{category} Transformations", expanded=expanded): | |
| st.session_state["project_dct"]["transformations"][category][ | |
| "expanded" | |
| ] = True | |
| # Let users select which transformations to apply | |
| sel_transformations = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get(f"transformation_{category}", []) | |
| transformations_to_apply = st.multiselect( | |
| "Select transformations to apply", | |
| options=transformation_options[category], | |
| default=sel_transformations, | |
| key=f"transformation_{category}", | |
| # on_change=selection_change(), | |
| ) | |
| st.session_state["project_dct"]["transformations"][category][ | |
| "transformation_" + category | |
| ] = transformations_to_apply | |
| # Determine the number of transformations to put in each column | |
| transformations_per_column = ( | |
| len(transformations_to_apply) // 2 | |
| + len(transformations_to_apply) % 2 | |
| ) | |
| # Create two columns | |
| col1, col2 = st.columns(2) | |
| # Assign transformations to each column | |
| transformations_col1 = transformations_to_apply[ | |
| :transformations_per_column | |
| ] | |
| transformations_col2 = transformations_to_apply[ | |
| transformations_per_column: | |
| ] | |
| # Define a helper function to create widgets for each transformation | |
| def create_transformation_widgets(column, transformations): | |
| with column: | |
| for transformation in transformations: | |
| # Conditionally create widgets for selected transformations | |
| if transformation == "Lead": | |
| lead_default = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get( | |
| "Lead", predefined_defualts["Lead"] | |
| ) | |
| st.markdown(f"**Lead ({date_granularity})**") | |
| lead = st.slider( | |
| "Lead periods", | |
| 1, | |
| 10, | |
| lead_default, | |
| 1, | |
| key=f"lead_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["Lead"] = lead | |
| start = lead[0] | |
| end = lead[1] | |
| step = 1 | |
| transform_params[category]["Lead"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Lag": | |
| lag_default = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get("Lag", predefined_defualts["Lag"]) | |
| st.markdown(f"**Lag ({date_granularity})**") | |
| lag = st.slider( | |
| "Lag periods", | |
| 1, | |
| 10, | |
| (1, 2), # lag_default, | |
| 1, | |
| key=f"lag_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["Lag"] = lag | |
| start = lag[0] | |
| end = lag[1] | |
| step = 1 | |
| transform_params[category]["Lag"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Moving Average": | |
| ma_default = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get( | |
| "MA", predefined_defualts["Moving Average"] | |
| ) | |
| st.markdown( | |
| f"**Moving Average ({date_granularity})**" | |
| ) | |
| window = st.slider( | |
| "Window size for Moving Average", | |
| 1, | |
| 10, | |
| ma_default, | |
| 1, | |
| key=f"ma_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["MA"] = window | |
| start = window[0] | |
| end = window[1] | |
| step = 1 | |
| transform_params[category]["Moving Average"] = ( | |
| np.arange(start, end + step, step) | |
| ) | |
| if transformation == "Saturation": | |
| st.markdown("**Saturation (%)**") | |
| saturation_default = st.session_state[ | |
| "project_dct" | |
| ]["transformations"][category].get( | |
| "Saturation", predefined_defualts["Saturation"] | |
| ) | |
| saturation_point = st.slider( | |
| f"Saturation Percentage", | |
| 0, | |
| 100, | |
| saturation_default, | |
| 10, | |
| key=f"sat_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["Saturation"] = saturation_point | |
| start = saturation_point[0] | |
| end = saturation_point[1] | |
| step = 10 | |
| transform_params[category]["Saturation"] = ( | |
| np.arange(start, end + step, step) | |
| ) | |
| if transformation == "Power": | |
| st.markdown("**Power**") | |
| power_default = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get( | |
| "Power", predefined_defualts["Power"] | |
| ) | |
| power = st.slider( | |
| f"Power", | |
| 0, | |
| 10, | |
| power_default, | |
| 1, | |
| key=f"power_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["Power"] = power | |
| start = power[0] | |
| end = power[1] | |
| step = 1 | |
| transform_params[category]["Power"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Adstock": | |
| ads_default = st.session_state["project_dct"][ | |
| "transformations" | |
| ][category].get( | |
| "Adstock", predefined_defualts["Adstock"] | |
| ) | |
| st.markdown("**Adstock**") | |
| rate = st.slider( | |
| f"Factor ({category})", | |
| 0.0, | |
| 1.0, | |
| ads_default, | |
| 0.05, | |
| key=f"adstock_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| st.session_state["project_dct"]["transformations"][ | |
| category | |
| ]["Adstock"] = rate | |
| start = rate[0] | |
| end = rate[1] | |
| step = 0.05 | |
| adstock_range = [ | |
| round(a, 3) | |
| for a in np.arange(start, end + step, step) | |
| ] | |
| transform_params[category][ | |
| "Adstock" | |
| ] = adstock_range | |
| # Create widgets in each column | |
| create_transformation_widgets(col1, transformations_col1) | |
| create_transformation_widgets(col2, transformations_col2) | |
| # Function to apply Lag transformation | |
| def apply_lag(df, lag): | |
| return df.shift(lag) | |
| # Function to apply Lead transformation | |
| def apply_lead(df, lead): | |
| return df.shift(-lead) | |
| # Function to apply Moving Average transformation | |
| def apply_moving_average(df, window_size): | |
| return df.rolling(window=window_size).mean() | |
| # Function to apply Saturation transformation | |
| def apply_saturation(df, saturation_percent_100): | |
| # Convert saturation percentage from 100-based to fraction | |
| saturation_percent = saturation_percent_100 / 100.0 | |
| # Calculate saturation point and steepness | |
| column_max = df.max() | |
| column_min = df.min() | |
| saturation_point = (column_min + column_max) / 2 | |
| numerator = np.log( | |
| (1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) | |
| - 1 | |
| ) | |
| denominator = np.log(saturation_point / max(column_max, 1e-9)) | |
| steepness = numerator / max( | |
| denominator, 1e-9 | |
| ) # Avoid division by zero with a small constant | |
| # Apply the saturation transformation | |
| transformed_series = df.apply( | |
| lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x | |
| ) | |
| return transformed_series | |
| # Function to apply Power transformation | |
| def apply_power(df, power): | |
| return df**power | |
| # Function to apply Adstock transformation | |
| def apply_adstock(df, factor): | |
| x = 0 | |
| # Use the walrus operator to update x iteratively with the Adstock formula | |
| adstock_var = [x := x * factor + v for v in df] | |
| ans = pd.Series(adstock_var, index=df.index) | |
| return ans | |
| # Function to generate transformed columns names | |
| def generate_transformed_columns(original_columns, transform_params): | |
| transformed_columns, summary = {}, {} | |
| for category, columns in original_columns.items(): | |
| for column in columns: | |
| transformed_columns[column] = [] | |
| summary_details = ( | |
| [] | |
| ) # List to hold transformation details for the current column | |
| if category in transform_params: | |
| for transformation, values in transform_params[ | |
| category | |
| ].items(): | |
| # Generate transformed column names for each value | |
| for value in values: | |
| transformed_name = ( | |
| f"{column}@{transformation}_{value}" | |
| ) | |
| transformed_columns[column].append( | |
| transformed_name | |
| ) | |
| # Format the values list as a string with commas and "and" before the last item | |
| if len(values) > 1: | |
| formatted_values = ( | |
| ", ".join(map(str, values[:-1])) | |
| + " and " | |
| + str(values[-1]) | |
| ) | |
| else: | |
| formatted_values = str(values[0]) | |
| # Add transformation details | |
| summary_details.append( | |
| f"{transformation} ({formatted_values})" | |
| ) | |
| # Only add to summary if there are transformation details for the column | |
| if summary_details: | |
| formatted_summary = "⮕ ".join(summary_details) | |
| # Use <strong> tags to make the column name bold | |
| summary[column] = ( | |
| f"<strong>{column}</strong>: {formatted_summary}" | |
| ) | |
| # Generate a comprehensive summary string for all columns | |
| summary_items = [ | |
| f"{idx + 1}. {details}" | |
| for idx, details in enumerate(summary.values()) | |
| ] | |
| summary_string = "\n".join(summary_items) | |
| return transformed_columns, summary_string | |
| # Function to apply transformations to DataFrame slices based on specified categories and parameters | |
| def apply_category_transformations(df, bin_dict, transform_params, panel): | |
| # Dictionary for function mapping | |
| transformation_functions = { | |
| "Lead": apply_lead, | |
| "Lag": apply_lag, | |
| "Moving Average": apply_moving_average, | |
| "Saturation": apply_saturation, | |
| "Power": apply_power, | |
| "Adstock": apply_adstock, | |
| } | |
| # Initialize category_df as an empty DataFrame | |
| category_df = pd.DataFrame() | |
| # Iterate through each category specified in transform_params | |
| for category in ["Media", "Internal", "Exogenous"]: | |
| if ( | |
| category not in transform_params | |
| or category not in bin_dict | |
| or not transform_params[category] | |
| ): | |
| continue # Skip categories without transformations | |
| # Slice the DataFrame based on the columns specified in bin_dict for the current category | |
| df_slice = df[bin_dict[category] + panel] | |
| # Iterate through each transformation and its parameters for the current category | |
| for transformation, parameters in transform_params[ | |
| category | |
| ].items(): | |
| transformation_function = transformation_functions[ | |
| transformation | |
| ] | |
| # Check if there is panel data to group by | |
| if len(panel) > 0: | |
| # Apply the transformation to each group | |
| category_df = pd.concat( | |
| [ | |
| df_slice.groupby(panel) | |
| .transform(transformation_function, p) | |
| .add_suffix(f"@{transformation}_{p}") | |
| for p in parameters | |
| ], | |
| axis=1, | |
| ) | |
| # Replace all NaN or null values in category_df with 0 | |
| category_df.fillna(0, inplace=True) | |
| # Update df_slice | |
| df_slice = pd.concat( | |
| [df[panel], category_df], | |
| axis=1, | |
| ) | |
| else: | |
| for p in parameters: | |
| # Apply the transformation function to each column | |
| temp_df = df_slice.apply( | |
| lambda x: transformation_function(x, p), axis=0 | |
| ).rename( | |
| lambda x: f"{x}@{transformation}_{p}", | |
| axis="columns", | |
| ) | |
| # Concatenate the transformed DataFrame slice to the category DataFrame | |
| category_df = pd.concat([category_df, temp_df], axis=1) | |
| # Replace all NaN or null values in category_df with 0 | |
| category_df.fillna(0, inplace=True) | |
| # Update df_slice | |
| df_slice = pd.concat( | |
| [df[panel], category_df], | |
| axis=1, | |
| ) | |
| # If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame | |
| if not category_df.empty: | |
| final_df = pd.concat([df, category_df], axis=1) | |
| else: | |
| # If no transformations were applied, use the original DataFrame | |
| final_df = df | |
| return final_df | |
| # Function to infers the granularity of the date column in a DataFrame | |
| def infer_date_granularity(df): | |
| # Find the most common difference | |
| common_freq = ( | |
| pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] | |
| ) | |
| # Map the most common difference to a granularity | |
| if common_freq == 1: | |
| return "daily" | |
| elif common_freq == 7: | |
| return "weekly" | |
| elif 28 <= common_freq <= 31: | |
| return "monthly" | |
| else: | |
| return "irregular" | |
| ######################################################################################################################################################### | |
| # User input for transformations | |
| ######################################################################################################################################################### | |
| # Infer date granularity | |
| date_granularity = infer_date_granularity(final_df_loaded) | |
| # Initialize the main dictionary to store the transformation parameters for each category | |
| transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}} | |
| # User input for transformations | |
| st.markdown("### Select Transformations to Apply") | |
| for category in ["Media", "Internal", "Exogenous"]: | |
| # Skip Internal | |
| if category == "Internal": | |
| continue | |
| transformation_widgets(category, transform_params, date_granularity) | |
| ######################################################################################################################################################### | |
| # Apply transformations | |
| ######################################################################################################################################################### | |
| # Apply category-based transformations to the DataFrame | |
| if st.button("Accept and Proceed", use_container_width=True): | |
| with st.spinner("Applying transformations..."): | |
| final_df = apply_category_transformations( | |
| final_df_loaded, bin_dict_loaded, transform_params, panel | |
| ) | |
| # Generate a dictionary mapping original column names to lists of transformed column names | |
| transformed_columns_dict, summary_string = ( | |
| generate_transformed_columns( | |
| original_columns, transform_params | |
| ) | |
| ) | |
| # Store into transformed dataframe and summary session state | |
| st.session_state["final_df"] = final_df | |
| st.session_state["summary_string"] = summary_string | |
| ######################################################################################################################################################### | |
| # Display the transformed DataFrame and summary | |
| ######################################################################################################################################################### | |
| # Display the transformed DataFrame in the Streamlit app | |
| st.markdown("### Transformed DataFrame") | |
| st.dataframe(st.session_state["final_df"], hide_index=True) | |
| # Total rows and columns | |
| total_rows, total_columns = st.session_state["final_df"].shape | |
| st.markdown( | |
| f"<p style='text-align: justify;'>The transformed DataFrame contains <strong>{total_rows}</strong> rows and <strong>{total_columns}</strong> columns.</p>", | |
| unsafe_allow_html=True, | |
| ) | |
| # Display the summary of transformations as markdown | |
| if st.session_state["summary_string"]: | |
| with st.expander("Summary of Transformations"): | |
| st.markdown("### Summary of Transformations") | |
| st.markdown( | |
| st.session_state["summary_string"], unsafe_allow_html=True | |
| ) | |
| def save_to_pickle(file_path, final_df): | |
| # Open the file in write-binary mode and dump the objects | |
| with open(file_path, "wb") as f: | |
| pickle.dump({"final_df_transformed": final_df}, f) | |
| # Data is now saved to file | |
| if st.button("Accept and Save", use_container_width=True): | |
| save_to_pickle( | |
| os.path.join( | |
| st.session_state["project_path"], "final_df_transformed.pkl" | |
| ), | |
| st.session_state["final_df"], | |
| ) | |
| project_dct_path = os.path.join( | |
| st.session_state["project_path"], "project_dct.pkl" | |
| ) | |
| with open(project_dct_path, "wb") as f: | |
| pickle.dump(st.session_state["project_dct"], f) | |
| update_db("3_Transformations.py") | |
| st.toast("💾 Saved Successfully!") | |