Spaces:
Sleeping
Sleeping
# Importing necessary libraries | |
import streamlit as st | |
st.set_page_config( | |
page_title="Transformations", | |
page_icon=":shark:", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
) | |
import pickle | |
import numpy as np | |
import pandas as pd | |
from utilities import set_header, load_local_css | |
import streamlit_authenticator as stauth | |
import yaml | |
from yaml import SafeLoader | |
import os | |
import sqlite3 | |
from utilities import update_db | |
load_local_css("styles.css") | |
set_header() | |
# Check for authentication status | |
for k, v in st.session_state.items(): | |
if k not in ["logout", "login", "config"] and not k.startswith("FormSubmitter"): | |
st.session_state[k] = v | |
with open("config.yaml") as file: | |
config = yaml.load(file, Loader=SafeLoader) | |
st.session_state["config"] = config | |
authenticator = stauth.Authenticate( | |
config["credentials"], | |
config["cookie"]["name"], | |
config["cookie"]["key"], | |
config["cookie"]["expiry_days"], | |
config["preauthorized"], | |
) | |
st.session_state["authenticator"] = authenticator | |
name, authentication_status, username = authenticator.login("Login", "main") | |
auth_status = st.session_state.get("authentication_status") | |
if auth_status == True: | |
authenticator.logout("Logout", "main") | |
is_state_initiaized = st.session_state.get("initialized", False) | |
if "project_dct" not in st.session_state: | |
st.error("Please load a project from Home page") | |
st.stop() | |
conn = sqlite3.connect( | |
r"DB/User.db", check_same_thread=False | |
) # connection with sql db | |
c = conn.cursor() | |
if not is_state_initiaized: | |
if "session_name" not in st.session_state: | |
st.session_state["session_name"] = None | |
if not os.path.exists( | |
os.path.join(st.session_state["project_path"], "data_import.pkl") | |
): | |
st.error("Please move to Data Import page") | |
# Deserialize and load the objects from the pickle file | |
with open( | |
os.path.join(st.session_state["project_path"], "data_import.pkl"), "rb" | |
) as f: | |
data = pickle.load(f) | |
# Accessing the loaded objects | |
final_df_loaded = data["final_df"] | |
bin_dict_loaded = data["bin_dict"] | |
# final_df_loaded.to_csv("Test/final_df_loaded.csv",index=False) | |
# Initialize session state==- | |
if "transformed_columns_dict" not in st.session_state: | |
st.session_state["transformed_columns_dict"] = {} # Default empty dictionary | |
if "final_df" not in st.session_state: | |
st.session_state["final_df"] = final_df_loaded # Default as original dataframe | |
if "summary_string" not in st.session_state: | |
st.session_state["summary_string"] = None # Default as None | |
# Extract original columns for specified categories | |
original_columns = { | |
category: bin_dict_loaded[category] | |
for category in ["Media", "Internal", "Exogenous"] | |
if category in bin_dict_loaded | |
} | |
# Retrive Panel columns | |
panel_1 = bin_dict_loaded.get("Panel Level 1") | |
panel_2 = bin_dict_loaded.get("Panel Level 2") | |
# # For testing on non panel level | |
# final_df_loaded = final_df_loaded.drop("Panel_1", axis=1) | |
# final_df_loaded = final_df_loaded.groupby("date").mean().reset_index() | |
# panel_1 = None | |
# Apply transformations on panel level | |
if panel_1: | |
panel = panel_1 + panel_2 if panel_2 else panel_1 | |
else: | |
panel = [] | |
# Function to build transformation widgets | |
def transformation_widgets(category, transform_params, date_granularity): | |
if ( | |
st.session_state["project_dct"]["transformations"] is None | |
or st.session_state["project_dct"]["transformations"] == {} | |
): | |
st.session_state["project_dct"]["transformations"] = {} | |
if category not in st.session_state["project_dct"]["transformations"].keys(): | |
st.session_state["project_dct"]["transformations"][category] = {} | |
# Define a dict of pre-defined default values of every transformation | |
predefined_defualts = { | |
"Lag": (1, 2), | |
"Lead": (1, 2), | |
"Moving Average": (1, 2), | |
"Saturation": (10, 20), | |
"Power": (2, 4), | |
"Adstock": (0.5, 0.7), | |
} | |
def selection_change(): | |
# Handles removing transformations | |
if f"transformation_{category}" in st.session_state: | |
current_selection = st.session_state[f"transformation_{category}"] | |
past_selection = st.session_state["project_dct"]["transformations"][ | |
category | |
][f"transformation_{category}"] | |
removed_selection = list(set(past_selection) - set(current_selection)) | |
for selection in removed_selection: | |
# Option 1 - revert to defualt | |
# st.session_state['project_dct']['transformations'][category][selection] = predefined_defualts[selection] | |
# option 2 - delete from dict | |
del st.session_state["project_dct"]["transformations"][category][ | |
selection | |
] | |
# Transformation Options | |
transformation_options = { | |
"Media": [ | |
"Lag", | |
"Moving Average", | |
"Saturation", | |
"Power", | |
"Adstock", | |
], | |
"Internal": ["Lead", "Lag", "Moving Average"], | |
"Exogenous": ["Lead", "Lag", "Moving Average"], | |
} | |
expanded = st.session_state["project_dct"]["transformations"][category].get( | |
"expanded", False | |
) | |
st.session_state["project_dct"]["transformations"][category]["expanded"] = False | |
with st.expander(f"{category} Transformations", expanded=expanded): | |
st.session_state["project_dct"]["transformations"][category][ | |
"expanded" | |
] = True | |
# Let users select which transformations to apply | |
sel_transformations = st.session_state["project_dct"]["transformations"][ | |
category | |
].get(f"transformation_{category}", []) | |
transformations_to_apply = st.multiselect( | |
"Select transformations to apply", | |
options=transformation_options[category], | |
default=sel_transformations, | |
key=f"transformation_{category}", | |
# on_change=selection_change(), | |
) | |
st.session_state["project_dct"]["transformations"][category][ | |
"transformation_" + category | |
] = transformations_to_apply | |
# Determine the number of transformations to put in each column | |
transformations_per_column = ( | |
len(transformations_to_apply) // 2 + len(transformations_to_apply) % 2 | |
) | |
# Create two columns | |
col1, col2 = st.columns(2) | |
# Assign transformations to each column | |
transformations_col1 = transformations_to_apply[:transformations_per_column] | |
transformations_col2 = transformations_to_apply[transformations_per_column:] | |
# Define a helper function to create widgets for each transformation | |
def create_transformation_widgets(column, transformations): | |
with column: | |
for transformation in transformations: | |
# Conditionally create widgets for selected transformations | |
if transformation == "Lead": | |
lead_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get("Lead", predefined_defualts["Lead"]) | |
st.markdown(f"**Lead ({date_granularity})**") | |
lead = st.slider( | |
"Lead periods", | |
1, | |
10, | |
lead_default, | |
1, | |
key=f"lead_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["Lead"] = lead | |
start = lead[0] | |
end = lead[1] | |
step = 1 | |
transform_params[category]["Lead"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Lag": | |
lag_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get("Lag", predefined_defualts["Lag"]) | |
st.markdown(f"**Lag ({date_granularity})**") | |
lag = st.slider( | |
"Lag periods", | |
1, | |
10, | |
(1, 2), # lag_default, | |
1, | |
key=f"lag_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["Lag"] = lag | |
start = lag[0] | |
end = lag[1] | |
step = 1 | |
transform_params[category]["Lag"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Moving Average": | |
ma_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get("MA", predefined_defualts["Moving Average"]) | |
st.markdown(f"**Moving Average ({date_granularity})**") | |
window = st.slider( | |
"Window size for Moving Average", | |
1, | |
10, | |
ma_default, | |
1, | |
key=f"ma_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["MA"] = window | |
start = window[0] | |
end = window[1] | |
step = 1 | |
transform_params[category]["Moving Average"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Saturation": | |
st.markdown("**Saturation (%)**") | |
saturation_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get( | |
"Saturation", predefined_defualts["Saturation"] | |
) | |
saturation_point = st.slider( | |
f"Saturation Percentage", | |
0, | |
100, | |
saturation_default, | |
10, | |
key=f"sat_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["Saturation"] = saturation_point | |
start = saturation_point[0] | |
end = saturation_point[1] | |
step = 10 | |
transform_params[category]["Saturation"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Power": | |
st.markdown("**Power**") | |
power_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get("Power", predefined_defualts["Power"]) | |
power = st.slider( | |
f"Power", | |
0, | |
10, | |
power_default, | |
1, | |
key=f"power_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["Power"] = power | |
start = power[0] | |
end = power[1] | |
step = 1 | |
transform_params[category]["Power"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Adstock": | |
ads_default = st.session_state["project_dct"][ | |
"transformations" | |
][category].get("Adstock", predefined_defualts["Adstock"]) | |
st.markdown("**Adstock**") | |
rate = st.slider( | |
f"Factor ({category})", | |
0.0, | |
1.0, | |
ads_default, | |
0.05, | |
key=f"adstock_{category}", | |
label_visibility="collapsed", | |
) | |
st.session_state["project_dct"]["transformations"][ | |
category | |
]["Adstock"] = rate | |
start = rate[0] | |
end = rate[1] | |
step = 0.05 | |
adstock_range = [ | |
round(a, 3) for a in np.arange(start, end + step, step) | |
] | |
transform_params[category]["Adstock"] = adstock_range | |
# Create widgets in each column | |
create_transformation_widgets(col1, transformations_col1) | |
create_transformation_widgets(col2, transformations_col2) | |
# Function to apply Lag transformation | |
def apply_lag(df, lag): | |
return df.shift(lag) | |
# Function to apply Lead transformation | |
def apply_lead(df, lead): | |
return df.shift(-lead) | |
# Function to apply Moving Average transformation | |
def apply_moving_average(df, window_size): | |
return df.rolling(window=window_size).mean() | |
# Function to apply Saturation transformation | |
def apply_saturation(df, saturation_percent_100): | |
# Convert saturation percentage from 100-based to fraction | |
saturation_percent = saturation_percent_100 / 100.0 | |
# Calculate saturation point and steepness | |
column_max = df.max() | |
column_min = df.min() | |
saturation_point = (column_min + column_max) / 2 | |
numerator = np.log( | |
(1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) - 1 | |
) | |
denominator = np.log(saturation_point / max(column_max, 1e-9)) | |
steepness = numerator / max( | |
denominator, 1e-9 | |
) # Avoid division by zero with a small constant | |
# Apply the saturation transformation | |
transformed_series = df.apply( | |
lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x | |
) | |
return transformed_series | |
# Function to apply Power transformation | |
def apply_power(df, power): | |
return df**power | |
# Function to apply Adstock transformation | |
def apply_adstock(df, factor): | |
x = 0 | |
# Use the walrus operator to update x iteratively with the Adstock formula | |
adstock_var = [x := x * factor + v for v in df] | |
ans = pd.Series(adstock_var, index=df.index) | |
return ans | |
# Function to generate transformed columns names | |
def generate_transformed_columns(original_columns, transform_params): | |
transformed_columns, summary = {}, {} | |
for category, columns in original_columns.items(): | |
for column in columns: | |
transformed_columns[column] = [] | |
summary_details = ( | |
[] | |
) # List to hold transformation details for the current column | |
if category in transform_params: | |
for transformation, values in transform_params[category].items(): | |
# Generate transformed column names for each value | |
for value in values: | |
transformed_name = f"{column}@{transformation}_{value}" | |
transformed_columns[column].append(transformed_name) | |
# Format the values list as a string with commas and "and" before the last item | |
if len(values) > 1: | |
formatted_values = ( | |
", ".join(map(str, values[:-1])) | |
+ " and " | |
+ str(values[-1]) | |
) | |
else: | |
formatted_values = str(values[0]) | |
# Add transformation details | |
summary_details.append(f"{transformation} ({formatted_values})") | |
# Only add to summary if there are transformation details for the column | |
if summary_details: | |
formatted_summary = "⮕ ".join(summary_details) | |
# Use <strong> tags to make the column name bold | |
summary[column] = f"<strong>{column}</strong>: {formatted_summary}" | |
# Generate a comprehensive summary string for all columns | |
summary_items = [ | |
f"{idx + 1}. {details}" for idx, details in enumerate(summary.values()) | |
] | |
summary_string = "\n".join(summary_items) | |
return transformed_columns, summary_string | |
# Function to apply transformations to DataFrame slices based on specified categories and parameters | |
def apply_category_transformations(df, bin_dict, transform_params, panel): | |
# Dictionary for function mapping | |
transformation_functions = { | |
"Lead": apply_lead, | |
"Lag": apply_lag, | |
"Moving Average": apply_moving_average, | |
"Saturation": apply_saturation, | |
"Power": apply_power, | |
"Adstock": apply_adstock, | |
} | |
# Initialize category_df as an empty DataFrame | |
category_df = pd.DataFrame() | |
# Iterate through each category specified in transform_params | |
for category in ["Media", "Internal", "Exogenous"]: | |
if ( | |
category not in transform_params | |
or category not in bin_dict | |
or not transform_params[category] | |
): | |
continue # Skip categories without transformations | |
# Slice the DataFrame based on the columns specified in bin_dict for the current category | |
df_slice = df[bin_dict[category] + panel] | |
# Iterate through each transformation and its parameters for the current category | |
for transformation, parameters in transform_params[category].items(): | |
transformation_function = transformation_functions[transformation] | |
# Check if there is panel data to group by | |
if len(panel) > 0: | |
# Apply the transformation to each group | |
category_df = pd.concat( | |
[ | |
df_slice.groupby(panel) | |
.transform(transformation_function, p) | |
.add_suffix(f"@{transformation}_{p}") | |
for p in parameters | |
], | |
axis=1, | |
) | |
# Replace all NaN or null values in category_df with 0 | |
category_df.fillna(0, inplace=True) | |
# Update df_slice | |
df_slice = pd.concat( | |
[df[panel], category_df], | |
axis=1, | |
) | |
else: | |
for p in parameters: | |
# Apply the transformation function to each column | |
temp_df = df_slice.apply( | |
lambda x: transformation_function(x, p), axis=0 | |
).rename( | |
lambda x: f"{x}@{transformation}_{p}", | |
axis="columns", | |
) | |
# Concatenate the transformed DataFrame slice to the category DataFrame | |
category_df = pd.concat([category_df, temp_df], axis=1) | |
# Replace all NaN or null values in category_df with 0 | |
category_df.fillna(0, inplace=True) | |
# Update df_slice | |
df_slice = pd.concat( | |
[df[panel], category_df], | |
axis=1, | |
) | |
# If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame | |
if not category_df.empty: | |
final_df = pd.concat([df, category_df], axis=1) | |
else: | |
# If no transformations were applied, use the original DataFrame | |
final_df = df | |
return final_df | |
# Function to infers the granularity of the date column in a DataFrame | |
def infer_date_granularity(df): | |
# Find the most common difference | |
common_freq = pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] | |
# Map the most common difference to a granularity | |
if common_freq == 1: | |
return "daily" | |
elif common_freq == 7: | |
return "weekly" | |
elif 28 <= common_freq <= 31: | |
return "monthly" | |
else: | |
return "irregular" | |
######################################################################################################################################################### | |
# User input for transformations | |
######################################################################################################################################################### | |
# Infer date granularity | |
date_granularity = infer_date_granularity(final_df_loaded) | |
# Initialize the main dictionary to store the transformation parameters for each category | |
transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}} | |
# User input for transformations | |
st.markdown("### Select Transformations to Apply") | |
for category in ["Media", "Internal", "Exogenous"]: | |
# Skip Internal | |
if category == "Internal": | |
continue | |
transformation_widgets(category, transform_params, date_granularity) | |
######################################################################################################################################################### | |
# Apply transformations | |
######################################################################################################################################################### | |
# Apply category-based transformations to the DataFrame | |
if st.button("Accept and Proceed", use_container_width=True): | |
with st.spinner("Applying transformations..."): | |
final_df = apply_category_transformations( | |
final_df_loaded, bin_dict_loaded, transform_params, panel | |
) | |
# Generate a dictionary mapping original column names to lists of transformed column names | |
transformed_columns_dict, summary_string = generate_transformed_columns( | |
original_columns, transform_params | |
) | |
# Store into transformed dataframe and summary session state | |
st.session_state["final_df"] = final_df | |
st.session_state["summary_string"] = summary_string | |
######################################################################################################################################################### | |
# Display the transformed DataFrame and summary | |
######################################################################################################################################################### | |
# Display the transformed DataFrame in the Streamlit app | |
st.markdown("### Transformed DataFrame") | |
final_df = st.session_state["final_df"].copy() | |
sort_col = [] | |
for col in final_df.columns: | |
if col in ["Panel_1", "Panel_2", "date"]: | |
sort_col.append(col) | |
sorted_final_df = final_df.sort_values( | |
by=sort_col, ascending=True, na_position="first" | |
) | |
st.dataframe(sorted_final_df, hide_index=True) | |
# Total rows and columns | |
total_rows, total_columns = st.session_state["final_df"].shape | |
st.markdown( | |
f"<p style='text-align: justify;'>The transformed DataFrame contains <strong>{total_rows}</strong> rows and <strong>{total_columns}</strong> columns.</p>", | |
unsafe_allow_html=True, | |
) | |
# Display the summary of transformations as markdown | |
if "summary_string" in st.session_state and st.session_state["summary_string"]: | |
with st.expander("Summary of Transformations"): | |
st.markdown("### Summary of Transformations") | |
st.markdown(st.session_state["summary_string"], unsafe_allow_html=True) | |
def save_to_pickle(file_path, final_df): | |
# Open the file in write-binary mode and dump the objects | |
with open(file_path, "wb") as f: | |
pickle.dump({"final_df_transformed": final_df}, f) | |
# Data is now saved to file | |
if st.button("Accept and Save", use_container_width=True): | |
save_to_pickle( | |
os.path.join(st.session_state["project_path"], "final_df_transformed.pkl"), | |
st.session_state["final_df"], | |
) | |
project_dct_path = os.path.join( | |
st.session_state["project_path"], "project_dct.pkl" | |
) | |
with open(project_dct_path, "wb") as f: | |
pickle.dump(st.session_state["project_dct"], f) | |
update_db("3_Transformations.py") | |
st.toast("💾 Saved Successfully!") | |