Spaces:
Sleeping
Sleeping
| # Importing necessary libraries | |
| import streamlit as st | |
| st.set_page_config( | |
| page_title="Transformations", | |
| page_icon=":shark:", | |
| layout="wide", | |
| initial_sidebar_state="collapsed", | |
| ) | |
| import pickle | |
| import numpy as np | |
| import pandas as pd | |
| from utilities import set_header, load_local_css | |
| import streamlit_authenticator as stauth | |
| import yaml | |
| from yaml import SafeLoader | |
| load_local_css("styles.css") | |
| set_header() | |
| # Check for authentication status | |
| for k, v in st.session_state.items(): | |
| if k not in ["logout", "login", "config"] and not k.startswith( | |
| "FormSubmitter" | |
| ): | |
| st.session_state[k] = v | |
| with open("config.yaml") as file: | |
| config = yaml.load(file, Loader=SafeLoader) | |
| st.session_state["config"] = config | |
| authenticator = stauth.Authenticate( | |
| config["credentials"], | |
| config["cookie"]["name"], | |
| config["cookie"]["key"], | |
| config["cookie"]["expiry_days"], | |
| config["preauthorized"], | |
| ) | |
| st.session_state["authenticator"] = authenticator | |
| name, authentication_status, username = authenticator.login("Login", "main") | |
| auth_status = st.session_state.get("authentication_status") | |
| if auth_status == True: | |
| authenticator.logout("Logout", "main") | |
| is_state_initiaized = st.session_state.get("initialized", False) | |
| if not is_state_initiaized: | |
| if 'session_name' not in st.session_state: | |
| st.session_state['session_name']=None | |
| # Deserialize and load the objects from the pickle file | |
| with open("data_import.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| # Accessing the loaded objects | |
| final_df_loaded = data["final_df"] | |
| bin_dict_loaded = data["bin_dict"] | |
| # Initialize session state | |
| if "transformed_columns_dict" not in st.session_state: | |
| st.session_state["transformed_columns_dict"] = {} # Default empty dictionary | |
| if "final_df" not in st.session_state: | |
| st.session_state["final_df"] = final_df_loaded # Default as original dataframe | |
| if "summary_string" not in st.session_state: | |
| st.session_state["summary_string"] = None # Default as None | |
| # Extract original columns for specified categories | |
| original_columns = { | |
| category: bin_dict_loaded[category] | |
| for category in ["Media", "Internal", "Exogenous"] | |
| if category in bin_dict_loaded | |
| } | |
| # Retrive Panel columns | |
| panel_1 = bin_dict_loaded.get("Panel Level 1") | |
| panel_2 = bin_dict_loaded.get("Panel Level 2") | |
| # # For testing on non panel level | |
| # final_df_loaded = final_df_loaded.drop("Panel_1", axis=1) | |
| # final_df_loaded = final_df_loaded.groupby("date").mean().reset_index() | |
| # panel_1 = None | |
| # Apply transformations on panel level | |
| st.write("") | |
| if panel_1: | |
| panel = panel_1 + panel_2 if panel_2 else panel_1 | |
| else: | |
| panel = [] | |
| # Function to build transformation widgets | |
| def transformation_widgets(category, transform_params, date_granularity): | |
| # Transformation Options | |
| transformation_options = { | |
| "Media": ["Lag", "Moving Average", "Saturation", "Power", "Adstock"], | |
| "Internal": ["Lead", "Lag", "Moving Average"], | |
| "Exogenous": ["Lead", "Lag", "Moving Average"], | |
| } | |
| with st.expander(f"{category} Transformations"): | |
| # Let users select which transformations to apply | |
| transformations_to_apply = st.multiselect( | |
| "Select transformations to apply", | |
| options=transformation_options[category], | |
| default=[], | |
| key=f"transformation_{category}", | |
| ) | |
| # Determine the number of transformations to put in each column | |
| transformations_per_column = ( | |
| len(transformations_to_apply) // 2 + len(transformations_to_apply) % 2 | |
| ) | |
| # Create two columns | |
| col1, col2 = st.columns(2) | |
| # Assign transformations to each column | |
| transformations_col1 = transformations_to_apply[:transformations_per_column] | |
| transformations_col2 = transformations_to_apply[transformations_per_column:] | |
| # Define a helper function to create widgets for each transformation | |
| def create_transformation_widgets(column, transformations): | |
| with column: | |
| for transformation in transformations: | |
| # Conditionally create widgets for selected transformations | |
| if transformation == "Lead": | |
| st.markdown(f"**Lead ({date_granularity})**") | |
| lead = st.slider( | |
| "Lead periods", | |
| 1, | |
| 10, | |
| (1, 2), | |
| 1, | |
| key=f"lead_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = lead[0] | |
| end = lead[1] | |
| step = 1 | |
| transform_params[category]["Lead"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Lag": | |
| st.markdown(f"**Lag ({date_granularity})**") | |
| lag = st.slider( | |
| "Lag periods", | |
| 1, | |
| 10, | |
| (1, 2), | |
| 1, | |
| key=f"lag_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = lag[0] | |
| end = lag[1] | |
| step = 1 | |
| transform_params[category]["Lag"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Moving Average": | |
| st.markdown(f"**Moving Average ({date_granularity})**") | |
| window = st.slider( | |
| "Window size for Moving Average", | |
| 1, | |
| 10, | |
| (1, 2), | |
| 1, | |
| key=f"ma_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = window[0] | |
| end = window[1] | |
| step = 1 | |
| transform_params[category]["Moving Average"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Saturation": | |
| st.markdown("**Saturation (%)**") | |
| saturation_point = st.slider( | |
| f"Saturation Percentage", | |
| 0, | |
| 100, | |
| (10, 20), | |
| 10, | |
| key=f"sat_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = saturation_point[0] | |
| end = saturation_point[1] | |
| step = 10 | |
| transform_params[category]["Saturation"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Power": | |
| st.markdown("**Power**") | |
| power = st.slider( | |
| f"Power", | |
| 0, | |
| 10, | |
| (2, 4), | |
| 1, | |
| key=f"power_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = power[0] | |
| end = power[1] | |
| step = 1 | |
| transform_params[category]["Power"] = np.arange( | |
| start, end + step, step | |
| ) | |
| if transformation == "Adstock": | |
| st.markdown("**Adstock**") | |
| rate = st.slider( | |
| f"Factor ({category})", | |
| 0.0, | |
| 1.0, | |
| (0.5, 0.7), | |
| 0.05, | |
| key=f"adstock_{category}", | |
| label_visibility="collapsed", | |
| ) | |
| start = rate[0] | |
| end = rate[1] | |
| step = 0.05 | |
| adstock_range = [ | |
| round(a, 3) for a in np.arange(start, end + step, step) | |
| ] | |
| transform_params[category]["Adstock"] = adstock_range | |
| # Create widgets in each column | |
| create_transformation_widgets(col1, transformations_col1) | |
| create_transformation_widgets(col2, transformations_col2) | |
| # Function to apply Lag transformation | |
| def apply_lag(df, lag): | |
| return df.shift(lag) | |
| # Function to apply Lead transformation | |
| def apply_lead(df, lead): | |
| return df.shift(-lead) | |
| # Function to apply Moving Average transformation | |
| def apply_moving_average(df, window_size): | |
| return df.rolling(window=window_size).mean() | |
| # Function to apply Saturation transformation | |
| def apply_saturation(df, saturation_percent_100): | |
| # Convert saturation percentage from 100-based to fraction | |
| saturation_percent = saturation_percent_100 / 100.0 | |
| # Calculate saturation point and steepness | |
| column_max = df.max() | |
| column_min = df.min() | |
| saturation_point = (column_min + column_max) / 2 | |
| numerator = np.log( | |
| (1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) - 1 | |
| ) | |
| denominator = np.log(saturation_point / max(column_max, 1e-9)) | |
| steepness = numerator / max( | |
| denominator, 1e-9 | |
| ) # Avoid division by zero with a small constant | |
| # Apply the saturation transformation | |
| transformed_series = df.apply( | |
| lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x | |
| ) | |
| return transformed_series | |
| # Function to apply Power transformation | |
| def apply_power(df, power): | |
| return df**power | |
| # Function to apply Adstock transformation | |
| def apply_adstock(df, factor): | |
| x = 0 | |
| # Use the walrus operator to update x iteratively with the Adstock formula | |
| adstock_var = [x := x * factor + v for v in df] | |
| ans = pd.Series(adstock_var, index=df.index) | |
| return ans | |
| # Function to generate transformed columns names | |
| def generate_transformed_columns(original_columns, transform_params): | |
| transformed_columns, summary = {}, {} | |
| for category, columns in original_columns.items(): | |
| for column in columns: | |
| transformed_columns[column] = [] | |
| summary_details = ( | |
| [] | |
| ) # List to hold transformation details for the current column | |
| if category in transform_params: | |
| for transformation, values in transform_params[category].items(): | |
| # Generate transformed column names for each value | |
| for value in values: | |
| transformed_name = f"{column}@{transformation}_{value}" | |
| transformed_columns[column].append(transformed_name) | |
| # Format the values list as a string with commas and "and" before the last item | |
| if len(values) > 1: | |
| formatted_values = ( | |
| ", ".join(map(str, values[:-1])) + " and " + str(values[-1]) | |
| ) | |
| else: | |
| formatted_values = str(values[0]) | |
| # Add transformation details | |
| summary_details.append(f"{transformation} ({formatted_values})") | |
| # Only add to summary if there are transformation details for the column | |
| if summary_details: | |
| formatted_summary = "⮕ ".join(summary_details) | |
| # Use <strong> tags to make the column name bold | |
| summary[column] = f"<strong>{column}</strong>: {formatted_summary}" | |
| # Generate a comprehensive summary string for all columns | |
| summary_items = [ | |
| f"{idx + 1}. {details}" for idx, details in enumerate(summary.values()) | |
| ] | |
| summary_string = "\n".join(summary_items) | |
| return transformed_columns, summary_string | |
| # Function to apply transformations to DataFrame slices based on specified categories and parameters | |
| def apply_category_transformations(df, bin_dict, transform_params, panel): | |
| # Dictionary for function mapping | |
| transformation_functions = { | |
| "Lead": apply_lead, | |
| "Lag": apply_lag, | |
| "Moving Average": apply_moving_average, | |
| "Saturation": apply_saturation, | |
| "Power": apply_power, | |
| "Adstock": apply_adstock, | |
| } | |
| # Initialize category_df as an empty DataFrame | |
| category_df = pd.DataFrame() | |
| # Iterate through each category specified in transform_params | |
| for category in ["Media", "Internal", "Exogenous"]: | |
| if ( | |
| category not in transform_params | |
| or category not in bin_dict | |
| or not transform_params[category] | |
| ): | |
| continue # Skip categories without transformations | |
| # Slice the DataFrame based on the columns specified in bin_dict for the current category | |
| df_slice = df[bin_dict[category] + panel] | |
| # Iterate through each transformation and its parameters for the current category | |
| for transformation, parameters in transform_params[category].items(): | |
| transformation_function = transformation_functions[transformation] | |
| # Check if there is panel data to group by | |
| if len(panel) > 0: | |
| # Apply the transformation to each group | |
| category_df = pd.concat( | |
| [ | |
| df_slice.groupby(panel) | |
| .transform(transformation_function, p) | |
| .add_suffix(f"@{transformation}_{p}") | |
| for p in parameters | |
| ], | |
| axis=1, | |
| ) | |
| # Replace all NaN or null values in category_df with 0 | |
| category_df.fillna(0, inplace=True) | |
| # Update df_slice | |
| df_slice = pd.concat( | |
| [df[panel], category_df], | |
| axis=1, | |
| ) | |
| else: | |
| for p in parameters: | |
| # Apply the transformation function to each column | |
| temp_df = df_slice.apply( | |
| lambda x: transformation_function(x, p), axis=0 | |
| ).rename(lambda x: f"{x}@{transformation}_{p}", axis="columns") | |
| # Concatenate the transformed DataFrame slice to the category DataFrame | |
| category_df = pd.concat([category_df, temp_df], axis=1) | |
| # Replace all NaN or null values in category_df with 0 | |
| category_df.fillna(0, inplace=True) | |
| # Update df_slice | |
| df_slice = pd.concat( | |
| [df[panel], category_df], | |
| axis=1, | |
| ) | |
| # If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame | |
| if not category_df.empty: | |
| final_df = pd.concat([df, category_df], axis=1) | |
| else: | |
| # If no transformations were applied, use the original DataFrame | |
| final_df = df | |
| return final_df | |
| # Function to infers the granularity of the date column in a DataFrame | |
| def infer_date_granularity(df): | |
| # Find the most common difference | |
| common_freq = pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] | |
| # Map the most common difference to a granularity | |
| if common_freq == 1: | |
| return "daily" | |
| elif common_freq == 7: | |
| return "weekly" | |
| elif 28 <= common_freq <= 31: | |
| return "monthly" | |
| else: | |
| return "irregular" | |
| ######################################################################################################################################################### | |
| # User input for transformations | |
| ######################################################################################################################################################### | |
| # Infer date granularity | |
| date_granularity = infer_date_granularity(final_df_loaded) | |
| # Initialize the main dictionary to store the transformation parameters for each category | |
| transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}} | |
| # User input for transformations | |
| st.markdown("### Select Transformations to Apply") | |
| for category in ["Media", "Internal", "Exogenous"]: | |
| # Skip Internal | |
| if category == "Internal": | |
| continue | |
| transformation_widgets(category, transform_params, date_granularity) | |
| ######################################################################################################################################################### | |
| # Apply transformations | |
| ######################################################################################################################################################### | |
| # Apply category-based transformations to the DataFrame | |
| if st.button("Accept and Proceed", use_container_width=True): | |
| with st.spinner("Applying transformations..."): | |
| final_df = apply_category_transformations( | |
| final_df_loaded, bin_dict_loaded, transform_params, panel | |
| ) | |
| # Generate a dictionary mapping original column names to lists of transformed column names | |
| transformed_columns_dict, summary_string = generate_transformed_columns( | |
| original_columns, transform_params | |
| ) | |
| # Store into transformed dataframe and summary session state | |
| st.session_state["final_df"] = final_df | |
| st.session_state["summary_string"] = summary_string | |
| ######################################################################################################################################################### | |
| # Display the transformed DataFrame and summary | |
| ######################################################################################################################################################### | |
| # Display the transformed DataFrame in the Streamlit app | |
| st.markdown("### Transformed DataFrame") | |
| st.dataframe(st.session_state["final_df"], hide_index=True) | |
| # Total rows and columns | |
| total_rows, total_columns = st.session_state["final_df"].shape | |
| st.markdown( | |
| f"<p style='text-align: justify;'>The transformed DataFrame contains <strong>{total_rows}</strong> rows and <strong>{total_columns}</strong> columns.</p>", | |
| unsafe_allow_html=True, | |
| ) | |
| # Display the summary of transformations as markdown | |
| if st.session_state["summary_string"]: | |
| with st.expander("Summary of Transformations"): | |
| st.markdown("### Summary of Transformations") | |
| st.markdown(st.session_state["summary_string"], unsafe_allow_html=True) | |
| def save_to_pickle(file_path, final_df): | |
| # Open the file in write-binary mode and dump the objects | |
| with open(file_path, "wb") as f: | |
| pickle.dump({"final_df_transformed": final_df}, f) | |
| # Data is now saved to file | |
| if st.button("Accept and Save", use_container_width=True): | |
| save_to_pickle( | |
| "final_df_transformed.pkl", st.session_state["final_df"] | |
| ) | |
| st.toast("💾 Saved Successfully!") |