v6Mastercardapp / pages /3_Transformations.py
BlendMMM's picture
Upload 11 files
ff89010 verified
# Importing necessary libraries
import streamlit as st
st.set_page_config(
page_title="Transformations",
page_icon=":shark:",
layout="wide",
initial_sidebar_state="collapsed",
)
import pickle
import numpy as np
import pandas as pd
from utilities import set_header, load_local_css
import streamlit_authenticator as stauth
import yaml
from yaml import SafeLoader
import os
import sqlite3
from utilities import update_db
load_local_css("styles.css")
set_header()
# Check for authentication status
for k, v in st.session_state.items():
if k not in ["logout", "login", "config"] and not k.startswith("FormSubmitter"):
st.session_state[k] = v
with open("config.yaml") as file:
config = yaml.load(file, Loader=SafeLoader)
st.session_state["config"] = config
authenticator = stauth.Authenticate(
config["credentials"],
config["cookie"]["name"],
config["cookie"]["key"],
config["cookie"]["expiry_days"],
config["preauthorized"],
)
st.session_state["authenticator"] = authenticator
name, authentication_status, username = authenticator.login("Login", "main")
auth_status = st.session_state.get("authentication_status")
if auth_status == True:
authenticator.logout("Logout", "main")
is_state_initiaized = st.session_state.get("initialized", False)
if "project_dct" not in st.session_state:
st.error("Please load a project from Home page")
st.stop()
conn = sqlite3.connect(
r"DB/User.db", check_same_thread=False
) # connection with sql db
c = conn.cursor()
if not is_state_initiaized:
if "session_name" not in st.session_state:
st.session_state["session_name"] = None
if not os.path.exists(
os.path.join(st.session_state["project_path"], "data_import.pkl")
):
st.error("Please move to Data Import page")
# Deserialize and load the objects from the pickle file
with open(
os.path.join(st.session_state["project_path"], "data_import.pkl"), "rb"
) as f:
data = pickle.load(f)
# Accessing the loaded objects
final_df_loaded = data["final_df"]
bin_dict_loaded = data["bin_dict"]
# final_df_loaded.to_csv("Test/final_df_loaded.csv",index=False)
# Initialize session state==-
if "transformed_columns_dict" not in st.session_state:
st.session_state["transformed_columns_dict"] = {} # Default empty dictionary
if "final_df" not in st.session_state:
st.session_state["final_df"] = final_df_loaded # Default as original dataframe
if "summary_string" not in st.session_state:
st.session_state["summary_string"] = None # Default as None
# Extract original columns for specified categories
original_columns = {
category: bin_dict_loaded[category]
for category in ["Media", "Internal", "Exogenous"]
if category in bin_dict_loaded
}
# Retrive Panel columns
panel_1 = bin_dict_loaded.get("Panel Level 1")
panel_2 = bin_dict_loaded.get("Panel Level 2")
# # For testing on non panel level
# final_df_loaded = final_df_loaded.drop("Panel_1", axis=1)
# final_df_loaded = final_df_loaded.groupby("date").mean().reset_index()
# panel_1 = None
# Apply transformations on panel level
if panel_1:
panel = panel_1 + panel_2 if panel_2 else panel_1
else:
panel = []
# Function to build transformation widgets
def transformation_widgets(category, transform_params, date_granularity):
if (
st.session_state["project_dct"]["transformations"] is None
or st.session_state["project_dct"]["transformations"] == {}
):
st.session_state["project_dct"]["transformations"] = {}
if category not in st.session_state["project_dct"]["transformations"].keys():
st.session_state["project_dct"]["transformations"][category] = {}
# Define a dict of pre-defined default values of every transformation
predefined_defualts = {
"Lag": (1, 2),
"Lead": (1, 2),
"Moving Average": (1, 2),
"Saturation": (10, 20),
"Power": (2, 4),
"Adstock": (0.5, 0.7),
}
def selection_change():
# Handles removing transformations
if f"transformation_{category}" in st.session_state:
current_selection = st.session_state[f"transformation_{category}"]
past_selection = st.session_state["project_dct"]["transformations"][
category
][f"transformation_{category}"]
removed_selection = list(set(past_selection) - set(current_selection))
for selection in removed_selection:
# Option 1 - revert to defualt
# st.session_state['project_dct']['transformations'][category][selection] = predefined_defualts[selection]
# option 2 - delete from dict
del st.session_state["project_dct"]["transformations"][category][
selection
]
# Transformation Options
transformation_options = {
"Media": [
"Lag",
"Moving Average",
"Saturation",
"Power",
"Adstock",
],
"Internal": ["Lead", "Lag", "Moving Average"],
"Exogenous": ["Lead", "Lag", "Moving Average"],
}
expanded = st.session_state["project_dct"]["transformations"][category].get(
"expanded", False
)
st.session_state["project_dct"]["transformations"][category]["expanded"] = False
with st.expander(f"{category} Transformations", expanded=expanded):
st.session_state["project_dct"]["transformations"][category][
"expanded"
] = True
# Let users select which transformations to apply
sel_transformations = st.session_state["project_dct"]["transformations"][
category
].get(f"transformation_{category}", [])
transformations_to_apply = st.multiselect(
"Select transformations to apply",
options=transformation_options[category],
default=sel_transformations,
key=f"transformation_{category}",
# on_change=selection_change(),
)
st.session_state["project_dct"]["transformations"][category][
"transformation_" + category
] = transformations_to_apply
# Determine the number of transformations to put in each column
transformations_per_column = (
len(transformations_to_apply) // 2 + len(transformations_to_apply) % 2
)
# Create two columns
col1, col2 = st.columns(2)
# Assign transformations to each column
transformations_col1 = transformations_to_apply[:transformations_per_column]
transformations_col2 = transformations_to_apply[transformations_per_column:]
# Define a helper function to create widgets for each transformation
def create_transformation_widgets(column, transformations):
with column:
for transformation in transformations:
# Conditionally create widgets for selected transformations
if transformation == "Lead":
lead_default = st.session_state["project_dct"][
"transformations"
][category].get("Lead", predefined_defualts["Lead"])
st.markdown(f"**Lead ({date_granularity})**")
lead = st.slider(
"Lead periods",
1,
10,
lead_default,
1,
key=f"lead_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["Lead"] = lead
start = lead[0]
end = lead[1]
step = 1
transform_params[category]["Lead"] = np.arange(
start, end + step, step
)
if transformation == "Lag":
lag_default = st.session_state["project_dct"][
"transformations"
][category].get("Lag", predefined_defualts["Lag"])
st.markdown(f"**Lag ({date_granularity})**")
lag = st.slider(
"Lag periods",
1,
10,
(1, 2), # lag_default,
1,
key=f"lag_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["Lag"] = lag
start = lag[0]
end = lag[1]
step = 1
transform_params[category]["Lag"] = np.arange(
start, end + step, step
)
if transformation == "Moving Average":
ma_default = st.session_state["project_dct"][
"transformations"
][category].get("MA", predefined_defualts["Moving Average"])
st.markdown(f"**Moving Average ({date_granularity})**")
window = st.slider(
"Window size for Moving Average",
1,
10,
ma_default,
1,
key=f"ma_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["MA"] = window
start = window[0]
end = window[1]
step = 1
transform_params[category]["Moving Average"] = np.arange(
start, end + step, step
)
if transformation == "Saturation":
st.markdown("**Saturation (%)**")
saturation_default = st.session_state["project_dct"][
"transformations"
][category].get(
"Saturation", predefined_defualts["Saturation"]
)
saturation_point = st.slider(
f"Saturation Percentage",
0,
100,
saturation_default,
10,
key=f"sat_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["Saturation"] = saturation_point
start = saturation_point[0]
end = saturation_point[1]
step = 10
transform_params[category]["Saturation"] = np.arange(
start, end + step, step
)
if transformation == "Power":
st.markdown("**Power**")
power_default = st.session_state["project_dct"][
"transformations"
][category].get("Power", predefined_defualts["Power"])
power = st.slider(
f"Power",
0,
10,
power_default,
1,
key=f"power_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["Power"] = power
start = power[0]
end = power[1]
step = 1
transform_params[category]["Power"] = np.arange(
start, end + step, step
)
if transformation == "Adstock":
ads_default = st.session_state["project_dct"][
"transformations"
][category].get("Adstock", predefined_defualts["Adstock"])
st.markdown("**Adstock**")
rate = st.slider(
f"Factor ({category})",
0.0,
1.0,
ads_default,
0.05,
key=f"adstock_{category}",
label_visibility="collapsed",
)
st.session_state["project_dct"]["transformations"][
category
]["Adstock"] = rate
start = rate[0]
end = rate[1]
step = 0.05
adstock_range = [
round(a, 3) for a in np.arange(start, end + step, step)
]
transform_params[category]["Adstock"] = adstock_range
# Create widgets in each column
create_transformation_widgets(col1, transformations_col1)
create_transformation_widgets(col2, transformations_col2)
# Function to apply Lag transformation
def apply_lag(df, lag):
return df.shift(lag)
# Function to apply Lead transformation
def apply_lead(df, lead):
return df.shift(-lead)
# Function to apply Moving Average transformation
def apply_moving_average(df, window_size):
return df.rolling(window=window_size).mean()
# Function to apply Saturation transformation
def apply_saturation(df, saturation_percent_100):
# Convert saturation percentage from 100-based to fraction
saturation_percent = saturation_percent_100 / 100.0
# Calculate saturation point and steepness
column_max = df.max()
column_min = df.min()
saturation_point = (column_min + column_max) / 2
numerator = np.log(
(1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) - 1
)
denominator = np.log(saturation_point / max(column_max, 1e-9))
steepness = numerator / max(
denominator, 1e-9
) # Avoid division by zero with a small constant
# Apply the saturation transformation
transformed_series = df.apply(
lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x
)
return transformed_series
# Function to apply Power transformation
def apply_power(df, power):
return df**power
# Function to apply Adstock transformation
def apply_adstock(df, factor):
x = 0
# Use the walrus operator to update x iteratively with the Adstock formula
adstock_var = [x := x * factor + v for v in df]
ans = pd.Series(adstock_var, index=df.index)
return ans
# Function to generate transformed columns names
@st.cache_resource(show_spinner=False)
def generate_transformed_columns(original_columns, transform_params):
transformed_columns, summary = {}, {}
for category, columns in original_columns.items():
for column in columns:
transformed_columns[column] = []
summary_details = (
[]
) # List to hold transformation details for the current column
if category in transform_params:
for transformation, values in transform_params[category].items():
# Generate transformed column names for each value
for value in values:
transformed_name = f"{column}@{transformation}_{value}"
transformed_columns[column].append(transformed_name)
# Format the values list as a string with commas and "and" before the last item
if len(values) > 1:
formatted_values = (
", ".join(map(str, values[:-1]))
+ " and "
+ str(values[-1])
)
else:
formatted_values = str(values[0])
# Add transformation details
summary_details.append(f"{transformation} ({formatted_values})")
# Only add to summary if there are transformation details for the column
if summary_details:
formatted_summary = "⮕ ".join(summary_details)
# Use <strong> tags to make the column name bold
summary[column] = f"<strong>{column}</strong>: {formatted_summary}"
# Generate a comprehensive summary string for all columns
summary_items = [
f"{idx + 1}. {details}" for idx, details in enumerate(summary.values())
]
summary_string = "\n".join(summary_items)
return transformed_columns, summary_string
# Function to apply transformations to DataFrame slices based on specified categories and parameters
@st.cache_resource(show_spinner=False)
def apply_category_transformations(df, bin_dict, transform_params, panel):
# Dictionary for function mapping
transformation_functions = {
"Lead": apply_lead,
"Lag": apply_lag,
"Moving Average": apply_moving_average,
"Saturation": apply_saturation,
"Power": apply_power,
"Adstock": apply_adstock,
}
# Initialize category_df as an empty DataFrame
category_df = pd.DataFrame()
# Iterate through each category specified in transform_params
for category in ["Media", "Internal", "Exogenous"]:
if (
category not in transform_params
or category not in bin_dict
or not transform_params[category]
):
continue # Skip categories without transformations
# Slice the DataFrame based on the columns specified in bin_dict for the current category
df_slice = df[bin_dict[category] + panel]
# Iterate through each transformation and its parameters for the current category
for transformation, parameters in transform_params[category].items():
transformation_function = transformation_functions[transformation]
# Check if there is panel data to group by
if len(panel) > 0:
# Apply the transformation to each group
category_df = pd.concat(
[
df_slice.groupby(panel)
.transform(transformation_function, p)
.add_suffix(f"@{transformation}_{p}")
for p in parameters
],
axis=1,
)
# Replace all NaN or null values in category_df with 0
category_df.fillna(0, inplace=True)
# Update df_slice
df_slice = pd.concat(
[df[panel], category_df],
axis=1,
)
else:
for p in parameters:
# Apply the transformation function to each column
temp_df = df_slice.apply(
lambda x: transformation_function(x, p), axis=0
).rename(
lambda x: f"{x}@{transformation}_{p}",
axis="columns",
)
# Concatenate the transformed DataFrame slice to the category DataFrame
category_df = pd.concat([category_df, temp_df], axis=1)
# Replace all NaN or null values in category_df with 0
category_df.fillna(0, inplace=True)
# Update df_slice
df_slice = pd.concat(
[df[panel], category_df],
axis=1,
)
# If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame
if not category_df.empty:
final_df = pd.concat([df, category_df], axis=1)
else:
# If no transformations were applied, use the original DataFrame
final_df = df
return final_df
# Function to infers the granularity of the date column in a DataFrame
@st.cache_resource(show_spinner=False)
def infer_date_granularity(df):
# Find the most common difference
common_freq = pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0]
# Map the most common difference to a granularity
if common_freq == 1:
return "daily"
elif common_freq == 7:
return "weekly"
elif 28 <= common_freq <= 31:
return "monthly"
else:
return "irregular"
#########################################################################################################################################################
# User input for transformations
#########################################################################################################################################################
# Infer date granularity
date_granularity = infer_date_granularity(final_df_loaded)
# Initialize the main dictionary to store the transformation parameters for each category
transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}}
# User input for transformations
st.markdown("### Select Transformations to Apply")
for category in ["Media", "Internal", "Exogenous"]:
# Skip Internal
if category == "Internal":
continue
transformation_widgets(category, transform_params, date_granularity)
#########################################################################################################################################################
# Apply transformations
#########################################################################################################################################################
# Apply category-based transformations to the DataFrame
if st.button("Accept and Proceed", use_container_width=True):
with st.spinner("Applying transformations..."):
final_df = apply_category_transformations(
final_df_loaded, bin_dict_loaded, transform_params, panel
)
# Generate a dictionary mapping original column names to lists of transformed column names
transformed_columns_dict, summary_string = generate_transformed_columns(
original_columns, transform_params
)
# Store into transformed dataframe and summary session state
st.session_state["final_df"] = final_df
st.session_state["summary_string"] = summary_string
#########################################################################################################################################################
# Display the transformed DataFrame and summary
#########################################################################################################################################################
# Display the transformed DataFrame in the Streamlit app
st.markdown("### Transformed DataFrame")
final_df = st.session_state["final_df"].copy()
sort_col = []
for col in final_df.columns:
if col in ["Panel_1", "Panel_2", "date"]:
sort_col.append(col)
sorted_final_df = final_df.sort_values(
by=sort_col, ascending=True, na_position="first"
)
st.dataframe(sorted_final_df, hide_index=True)
# Total rows and columns
total_rows, total_columns = st.session_state["final_df"].shape
st.markdown(
f"<p style='text-align: justify;'>The transformed DataFrame contains <strong>{total_rows}</strong> rows and <strong>{total_columns}</strong> columns.</p>",
unsafe_allow_html=True,
)
# Display the summary of transformations as markdown
if "summary_string" in st.session_state and st.session_state["summary_string"]:
with st.expander("Summary of Transformations"):
st.markdown("### Summary of Transformations")
st.markdown(st.session_state["summary_string"], unsafe_allow_html=True)
@st.cache_resource(show_spinner=False)
def save_to_pickle(file_path, final_df):
# Open the file in write-binary mode and dump the objects
with open(file_path, "wb") as f:
pickle.dump({"final_df_transformed": final_df}, f)
# Data is now saved to file
if st.button("Accept and Save", use_container_width=True):
save_to_pickle(
os.path.join(st.session_state["project_path"], "final_df_transformed.pkl"),
st.session_state["final_df"],
)
project_dct_path = os.path.join(
st.session_state["project_path"], "project_dct.pkl"
)
with open(project_dct_path, "wb") as f:
pickle.dump(st.session_state["project_dct"], f)
update_db("3_Transformations.py")
st.toast("💾 Saved Successfully!")