# Importing necessary libraries import streamlit as st st.set_page_config( page_title="Transformations", page_icon=":shark:", layout="wide", initial_sidebar_state="collapsed", ) import pickle import numpy as np import pandas as pd from utilities import set_header, load_local_css import streamlit_authenticator as stauth import yaml from yaml import SafeLoader load_local_css("styles.css") set_header() # Check for authentication status for k, v in st.session_state.items(): if k not in ["logout", "login", "config"] and not k.startswith( "FormSubmitter" ): st.session_state[k] = v with open("config.yaml") as file: config = yaml.load(file, Loader=SafeLoader) st.session_state["config"] = config authenticator = stauth.Authenticate( config["credentials"], config["cookie"]["name"], config["cookie"]["key"], config["cookie"]["expiry_days"], config["preauthorized"], ) st.session_state["authenticator"] = authenticator name, authentication_status, username = authenticator.login("Login", "main") auth_status = st.session_state.get("authentication_status") if auth_status == True: authenticator.logout("Logout", "main") is_state_initiaized = st.session_state.get("initialized", False) if not is_state_initiaized: if 'session_name' not in st.session_state: st.session_state['session_name']=None # Deserialize and load the objects from the pickle file with open("data_import.pkl", "rb") as f: data = pickle.load(f) # Accessing the loaded objects final_df_loaded = data["final_df"] bin_dict_loaded = data["bin_dict"] # Initialize session state if "transformed_columns_dict" not in st.session_state: st.session_state["transformed_columns_dict"] = {} # Default empty dictionary if "final_df" not in st.session_state: st.session_state["final_df"] = final_df_loaded # Default as original dataframe if "summary_string" not in st.session_state: st.session_state["summary_string"] = None # Default as None # Extract original columns for specified categories original_columns = { category: bin_dict_loaded[category] for category in ["Media", "Internal", "Exogenous"] if category in bin_dict_loaded } # Retrive Panel columns panel_1 = bin_dict_loaded.get("Panel Level 1") panel_2 = bin_dict_loaded.get("Panel Level 2") # # For testing on non panel level # final_df_loaded = final_df_loaded.drop("Panel_1", axis=1) # final_df_loaded = final_df_loaded.groupby("date").mean().reset_index() # panel_1 = None # Apply transformations on panel level st.write("") if panel_1: panel = panel_1 + panel_2 if panel_2 else panel_1 else: panel = [] # Function to build transformation widgets def transformation_widgets(category, transform_params, date_granularity): # Transformation Options transformation_options = { "Media": ["Lag", "Moving Average", "Saturation", "Power", "Adstock"], "Internal": ["Lead", "Lag", "Moving Average"], "Exogenous": ["Lead", "Lag", "Moving Average"], } with st.expander(f"{category} Transformations"): # Let users select which transformations to apply transformations_to_apply = st.multiselect( "Select transformations to apply", options=transformation_options[category], default=[], key=f"transformation_{category}", ) # Determine the number of transformations to put in each column transformations_per_column = ( len(transformations_to_apply) // 2 + len(transformations_to_apply) % 2 ) # Create two columns col1, col2 = st.columns(2) # Assign transformations to each column transformations_col1 = transformations_to_apply[:transformations_per_column] transformations_col2 = transformations_to_apply[transformations_per_column:] # Define a helper function to create widgets for each transformation def create_transformation_widgets(column, transformations): with column: for transformation in transformations: # Conditionally create widgets for selected transformations if transformation == "Lead": st.markdown(f"**Lead ({date_granularity})**") lead = st.slider( "Lead periods", 1, 10, (1, 2), 1, key=f"lead_{category}", label_visibility="collapsed", ) start = lead[0] end = lead[1] step = 1 transform_params[category]["Lead"] = np.arange( start, end + step, step ) if transformation == "Lag": st.markdown(f"**Lag ({date_granularity})**") lag = st.slider( "Lag periods", 1, 10, (1, 2), 1, key=f"lag_{category}", label_visibility="collapsed", ) start = lag[0] end = lag[1] step = 1 transform_params[category]["Lag"] = np.arange( start, end + step, step ) if transformation == "Moving Average": st.markdown(f"**Moving Average ({date_granularity})**") window = st.slider( "Window size for Moving Average", 1, 10, (1, 2), 1, key=f"ma_{category}", label_visibility="collapsed", ) start = window[0] end = window[1] step = 1 transform_params[category]["Moving Average"] = np.arange( start, end + step, step ) if transformation == "Saturation": st.markdown("**Saturation (%)**") saturation_point = st.slider( f"Saturation Percentage", 0, 100, (10, 20), 10, key=f"sat_{category}", label_visibility="collapsed", ) start = saturation_point[0] end = saturation_point[1] step = 10 transform_params[category]["Saturation"] = np.arange( start, end + step, step ) if transformation == "Power": st.markdown("**Power**") power = st.slider( f"Power", 0, 10, (2, 4), 1, key=f"power_{category}", label_visibility="collapsed", ) start = power[0] end = power[1] step = 1 transform_params[category]["Power"] = np.arange( start, end + step, step ) if transformation == "Adstock": st.markdown("**Adstock**") rate = st.slider( f"Factor ({category})", 0.0, 1.0, (0.5, 0.7), 0.05, key=f"adstock_{category}", label_visibility="collapsed", ) start = rate[0] end = rate[1] step = 0.05 adstock_range = [ round(a, 3) for a in np.arange(start, end + step, step) ] transform_params[category]["Adstock"] = adstock_range # Create widgets in each column create_transformation_widgets(col1, transformations_col1) create_transformation_widgets(col2, transformations_col2) # Function to apply Lag transformation def apply_lag(df, lag): return df.shift(lag) # Function to apply Lead transformation def apply_lead(df, lead): return df.shift(-lead) # Function to apply Moving Average transformation def apply_moving_average(df, window_size): return df.rolling(window=window_size).mean() # Function to apply Saturation transformation def apply_saturation(df, saturation_percent_100): # Convert saturation percentage from 100-based to fraction saturation_percent = saturation_percent_100 / 100.0 # Calculate saturation point and steepness column_max = df.max() column_min = df.min() saturation_point = (column_min + column_max) / 2 numerator = np.log( (1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) - 1 ) denominator = np.log(saturation_point / max(column_max, 1e-9)) steepness = numerator / max( denominator, 1e-9 ) # Avoid division by zero with a small constant # Apply the saturation transformation transformed_series = df.apply( lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x ) return transformed_series # Function to apply Power transformation def apply_power(df, power): return df**power # Function to apply Adstock transformation def apply_adstock(df, factor): x = 0 # Use the walrus operator to update x iteratively with the Adstock formula adstock_var = [x := x * factor + v for v in df] ans = pd.Series(adstock_var, index=df.index) return ans # Function to generate transformed columns names @st.cache_resource(show_spinner=False) def generate_transformed_columns(original_columns, transform_params): transformed_columns, summary = {}, {} for category, columns in original_columns.items(): for column in columns: transformed_columns[column] = [] summary_details = ( [] ) # List to hold transformation details for the current column if category in transform_params: for transformation, values in transform_params[category].items(): # Generate transformed column names for each value for value in values: transformed_name = f"{column}@{transformation}_{value}" transformed_columns[column].append(transformed_name) # Format the values list as a string with commas and "and" before the last item if len(values) > 1: formatted_values = ( ", ".join(map(str, values[:-1])) + " and " + str(values[-1]) ) else: formatted_values = str(values[0]) # Add transformation details summary_details.append(f"{transformation} ({formatted_values})") # Only add to summary if there are transformation details for the column if summary_details: formatted_summary = "⮕ ".join(summary_details) # Use tags to make the column name bold summary[column] = f"{column}: {formatted_summary}" # Generate a comprehensive summary string for all columns summary_items = [ f"{idx + 1}. {details}" for idx, details in enumerate(summary.values()) ] summary_string = "\n".join(summary_items) return transformed_columns, summary_string # Function to apply transformations to DataFrame slices based on specified categories and parameters @st.cache_resource(show_spinner=False) def apply_category_transformations(df, bin_dict, transform_params, panel): # Dictionary for function mapping transformation_functions = { "Lead": apply_lead, "Lag": apply_lag, "Moving Average": apply_moving_average, "Saturation": apply_saturation, "Power": apply_power, "Adstock": apply_adstock, } # Initialize category_df as an empty DataFrame category_df = pd.DataFrame() # Iterate through each category specified in transform_params for category in ["Media", "Internal", "Exogenous"]: if ( category not in transform_params or category not in bin_dict or not transform_params[category] ): continue # Skip categories without transformations # Slice the DataFrame based on the columns specified in bin_dict for the current category df_slice = df[bin_dict[category] + panel] # Iterate through each transformation and its parameters for the current category for transformation, parameters in transform_params[category].items(): transformation_function = transformation_functions[transformation] # Check if there is panel data to group by if len(panel) > 0: # Apply the transformation to each group category_df = pd.concat( [ df_slice.groupby(panel) .transform(transformation_function, p) .add_suffix(f"@{transformation}_{p}") for p in parameters ], axis=1, ) # Replace all NaN or null values in category_df with 0 category_df.fillna(0, inplace=True) # Update df_slice df_slice = pd.concat( [df[panel], category_df], axis=1, ) else: for p in parameters: # Apply the transformation function to each column temp_df = df_slice.apply( lambda x: transformation_function(x, p), axis=0 ).rename(lambda x: f"{x}@{transformation}_{p}", axis="columns") # Concatenate the transformed DataFrame slice to the category DataFrame category_df = pd.concat([category_df, temp_df], axis=1) # Replace all NaN or null values in category_df with 0 category_df.fillna(0, inplace=True) # Update df_slice df_slice = pd.concat( [df[panel], category_df], axis=1, ) # If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame if not category_df.empty: final_df = pd.concat([df, category_df], axis=1) else: # If no transformations were applied, use the original DataFrame final_df = df return final_df # Function to infers the granularity of the date column in a DataFrame @st.cache_resource(show_spinner=False) def infer_date_granularity(df): # Find the most common difference common_freq = pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] # Map the most common difference to a granularity if common_freq == 1: return "daily" elif common_freq == 7: return "weekly" elif 28 <= common_freq <= 31: return "monthly" else: return "irregular" ######################################################################################################################################################### # User input for transformations ######################################################################################################################################################### # Infer date granularity date_granularity = infer_date_granularity(final_df_loaded) # Initialize the main dictionary to store the transformation parameters for each category transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}} # User input for transformations st.markdown("### Select Transformations to Apply") for category in ["Media", "Internal", "Exogenous"]: # Skip Internal if category == "Internal": continue transformation_widgets(category, transform_params, date_granularity) ######################################################################################################################################################### # Apply transformations ######################################################################################################################################################### # Apply category-based transformations to the DataFrame if st.button("Accept and Proceed", use_container_width=True): with st.spinner("Applying transformations..."): final_df = apply_category_transformations( final_df_loaded, bin_dict_loaded, transform_params, panel ) # Generate a dictionary mapping original column names to lists of transformed column names transformed_columns_dict, summary_string = generate_transformed_columns( original_columns, transform_params ) # Store into transformed dataframe and summary session state st.session_state["final_df"] = final_df st.session_state["summary_string"] = summary_string ######################################################################################################################################################### # Display the transformed DataFrame and summary ######################################################################################################################################################### # Display the transformed DataFrame in the Streamlit app st.markdown("### Transformed DataFrame") st.dataframe(st.session_state["final_df"], hide_index=True) # Total rows and columns total_rows, total_columns = st.session_state["final_df"].shape st.markdown( f"

The transformed DataFrame contains {total_rows} rows and {total_columns} columns.

", unsafe_allow_html=True, ) # Display the summary of transformations as markdown if st.session_state["summary_string"]: with st.expander("Summary of Transformations"): st.markdown("### Summary of Transformations") st.markdown(st.session_state["summary_string"], unsafe_allow_html=True) @st.cache_resource(show_spinner=False) def save_to_pickle(file_path, final_df): # Open the file in write-binary mode and dump the objects with open(file_path, "wb") as f: pickle.dump({"final_df_transformed": final_df}, f) # Data is now saved to file if st.button("Accept and Save", use_container_width=True): save_to_pickle( "final_df_transformed.pkl", st.session_state["final_df"] ) st.toast("💾 Saved Successfully!")