Spaces:
Sleeping
Sleeping
# Importing necessary libraries | |
import streamlit as st | |
st.set_page_config( | |
page_title="Transformations", | |
page_icon=":shark:", | |
layout="wide", | |
initial_sidebar_state="collapsed", | |
) | |
import pickle | |
import numpy as np | |
import pandas as pd | |
from utilities import set_header, load_local_css | |
import streamlit_authenticator as stauth | |
import yaml | |
from yaml import SafeLoader | |
load_local_css("styles.css") | |
set_header() | |
# Check for authentication status | |
for k, v in st.session_state.items(): | |
if k not in ["logout", "login", "config"] and not k.startswith( | |
"FormSubmitter" | |
): | |
st.session_state[k] = v | |
with open("config.yaml") as file: | |
config = yaml.load(file, Loader=SafeLoader) | |
st.session_state["config"] = config | |
authenticator = stauth.Authenticate( | |
config["credentials"], | |
config["cookie"]["name"], | |
config["cookie"]["key"], | |
config["cookie"]["expiry_days"], | |
config["preauthorized"], | |
) | |
st.session_state["authenticator"] = authenticator | |
name, authentication_status, username = authenticator.login("Login", "main") | |
auth_status = st.session_state.get("authentication_status") | |
if auth_status == True: | |
authenticator.logout("Logout", "main") | |
is_state_initiaized = st.session_state.get("initialized", False) | |
if not is_state_initiaized: | |
if 'session_name' not in st.session_state: | |
st.session_state['session_name']=None | |
# Deserialize and load the objects from the pickle file | |
with open("data_import.pkl", "rb") as f: | |
data = pickle.load(f) | |
# Accessing the loaded objects | |
final_df_loaded = data["final_df"] | |
bin_dict_loaded = data["bin_dict"] | |
# Initialize session state | |
if "transformed_columns_dict" not in st.session_state: | |
st.session_state["transformed_columns_dict"] = {} # Default empty dictionary | |
if "final_df" not in st.session_state: | |
st.session_state["final_df"] = final_df_loaded # Default as original dataframe | |
if "summary_string" not in st.session_state: | |
st.session_state["summary_string"] = None # Default as None | |
# Extract original columns for specified categories | |
original_columns = { | |
category: bin_dict_loaded[category] | |
for category in ["Media", "Internal", "Exogenous"] | |
if category in bin_dict_loaded | |
} | |
# Retrive Panel columns | |
panel_1 = bin_dict_loaded.get("Panel Level 1") | |
panel_2 = bin_dict_loaded.get("Panel Level 2") | |
# # For testing on non panel level | |
# final_df_loaded = final_df_loaded.drop("Panel_1", axis=1) | |
# final_df_loaded = final_df_loaded.groupby("date").mean().reset_index() | |
# panel_1 = None | |
# Apply transformations on panel level | |
st.write("") | |
if panel_1: | |
panel = panel_1 + panel_2 if panel_2 else panel_1 | |
else: | |
panel = [] | |
# Function to build transformation widgets | |
def transformation_widgets(category, transform_params, date_granularity): | |
# Transformation Options | |
transformation_options = { | |
"Media": ["Lag", "Moving Average", "Saturation", "Power", "Adstock"], | |
"Internal": ["Lead", "Lag", "Moving Average"], | |
"Exogenous": ["Lead", "Lag", "Moving Average"], | |
} | |
with st.expander(f"{category} Transformations"): | |
# Let users select which transformations to apply | |
transformations_to_apply = st.multiselect( | |
"Select transformations to apply", | |
options=transformation_options[category], | |
default=[], | |
key=f"transformation_{category}", | |
) | |
# Determine the number of transformations to put in each column | |
transformations_per_column = ( | |
len(transformations_to_apply) // 2 + len(transformations_to_apply) % 2 | |
) | |
# Create two columns | |
col1, col2 = st.columns(2) | |
# Assign transformations to each column | |
transformations_col1 = transformations_to_apply[:transformations_per_column] | |
transformations_col2 = transformations_to_apply[transformations_per_column:] | |
# Define a helper function to create widgets for each transformation | |
def create_transformation_widgets(column, transformations): | |
with column: | |
for transformation in transformations: | |
# Conditionally create widgets for selected transformations | |
if transformation == "Lead": | |
st.markdown(f"**Lead ({date_granularity})**") | |
lead = st.slider( | |
"Lead periods", | |
1, | |
10, | |
(1, 2), | |
1, | |
key=f"lead_{category}", | |
label_visibility="collapsed", | |
) | |
start = lead[0] | |
end = lead[1] | |
step = 1 | |
transform_params[category]["Lead"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Lag": | |
st.markdown(f"**Lag ({date_granularity})**") | |
lag = st.slider( | |
"Lag periods", | |
1, | |
10, | |
(1, 2), | |
1, | |
key=f"lag_{category}", | |
label_visibility="collapsed", | |
) | |
start = lag[0] | |
end = lag[1] | |
step = 1 | |
transform_params[category]["Lag"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Moving Average": | |
st.markdown(f"**Moving Average ({date_granularity})**") | |
window = st.slider( | |
"Window size for Moving Average", | |
1, | |
10, | |
(1, 2), | |
1, | |
key=f"ma_{category}", | |
label_visibility="collapsed", | |
) | |
start = window[0] | |
end = window[1] | |
step = 1 | |
transform_params[category]["Moving Average"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Saturation": | |
st.markdown("**Saturation (%)**") | |
saturation_point = st.slider( | |
f"Saturation Percentage", | |
0, | |
100, | |
(10, 20), | |
10, | |
key=f"sat_{category}", | |
label_visibility="collapsed", | |
) | |
start = saturation_point[0] | |
end = saturation_point[1] | |
step = 10 | |
transform_params[category]["Saturation"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Power": | |
st.markdown("**Power**") | |
power = st.slider( | |
f"Power", | |
0, | |
10, | |
(2, 4), | |
1, | |
key=f"power_{category}", | |
label_visibility="collapsed", | |
) | |
start = power[0] | |
end = power[1] | |
step = 1 | |
transform_params[category]["Power"] = np.arange( | |
start, end + step, step | |
) | |
if transformation == "Adstock": | |
st.markdown("**Adstock**") | |
rate = st.slider( | |
f"Factor ({category})", | |
0.0, | |
1.0, | |
(0.5, 0.7), | |
0.05, | |
key=f"adstock_{category}", | |
label_visibility="collapsed", | |
) | |
start = rate[0] | |
end = rate[1] | |
step = 0.05 | |
adstock_range = [ | |
round(a, 3) for a in np.arange(start, end + step, step) | |
] | |
transform_params[category]["Adstock"] = adstock_range | |
# Create widgets in each column | |
create_transformation_widgets(col1, transformations_col1) | |
create_transformation_widgets(col2, transformations_col2) | |
# Function to apply Lag transformation | |
def apply_lag(df, lag): | |
return df.shift(lag) | |
# Function to apply Lead transformation | |
def apply_lead(df, lead): | |
return df.shift(-lead) | |
# Function to apply Moving Average transformation | |
def apply_moving_average(df, window_size): | |
return df.rolling(window=window_size).mean() | |
# Function to apply Saturation transformation | |
def apply_saturation(df, saturation_percent_100): | |
# Convert saturation percentage from 100-based to fraction | |
saturation_percent = saturation_percent_100 / 100.0 | |
# Calculate saturation point and steepness | |
column_max = df.max() | |
column_min = df.min() | |
saturation_point = (column_min + column_max) / 2 | |
numerator = np.log( | |
(1 / (saturation_percent if saturation_percent != 1 else 1 - 1e-9)) - 1 | |
) | |
denominator = np.log(saturation_point / max(column_max, 1e-9)) | |
steepness = numerator / max( | |
denominator, 1e-9 | |
) # Avoid division by zero with a small constant | |
# Apply the saturation transformation | |
transformed_series = df.apply( | |
lambda x: (1 / (1 + (saturation_point / x) ** steepness)) * x | |
) | |
return transformed_series | |
# Function to apply Power transformation | |
def apply_power(df, power): | |
return df**power | |
# Function to apply Adstock transformation | |
def apply_adstock(df, factor): | |
x = 0 | |
# Use the walrus operator to update x iteratively with the Adstock formula | |
adstock_var = [x := x * factor + v for v in df] | |
ans = pd.Series(adstock_var, index=df.index) | |
return ans | |
# Function to generate transformed columns names | |
def generate_transformed_columns(original_columns, transform_params): | |
transformed_columns, summary = {}, {} | |
for category, columns in original_columns.items(): | |
for column in columns: | |
transformed_columns[column] = [] | |
summary_details = ( | |
[] | |
) # List to hold transformation details for the current column | |
if category in transform_params: | |
for transformation, values in transform_params[category].items(): | |
# Generate transformed column names for each value | |
for value in values: | |
transformed_name = f"{column}@{transformation}_{value}" | |
transformed_columns[column].append(transformed_name) | |
# Format the values list as a string with commas and "and" before the last item | |
if len(values) > 1: | |
formatted_values = ( | |
", ".join(map(str, values[:-1])) + " and " + str(values[-1]) | |
) | |
else: | |
formatted_values = str(values[0]) | |
# Add transformation details | |
summary_details.append(f"{transformation} ({formatted_values})") | |
# Only add to summary if there are transformation details for the column | |
if summary_details: | |
formatted_summary = "⮕ ".join(summary_details) | |
# Use <strong> tags to make the column name bold | |
summary[column] = f"<strong>{column}</strong>: {formatted_summary}" | |
# Generate a comprehensive summary string for all columns | |
summary_items = [ | |
f"{idx + 1}. {details}" for idx, details in enumerate(summary.values()) | |
] | |
summary_string = "\n".join(summary_items) | |
return transformed_columns, summary_string | |
# Function to apply transformations to DataFrame slices based on specified categories and parameters | |
def apply_category_transformations(df, bin_dict, transform_params, panel): | |
# Dictionary for function mapping | |
transformation_functions = { | |
"Lead": apply_lead, | |
"Lag": apply_lag, | |
"Moving Average": apply_moving_average, | |
"Saturation": apply_saturation, | |
"Power": apply_power, | |
"Adstock": apply_adstock, | |
} | |
# Initialize category_df as an empty DataFrame | |
category_df = pd.DataFrame() | |
# Iterate through each category specified in transform_params | |
for category in ["Media", "Internal", "Exogenous"]: | |
if ( | |
category not in transform_params | |
or category not in bin_dict | |
or not transform_params[category] | |
): | |
continue # Skip categories without transformations | |
# Slice the DataFrame based on the columns specified in bin_dict for the current category | |
df_slice = df[bin_dict[category] + panel] | |
# Iterate through each transformation and its parameters for the current category | |
for transformation, parameters in transform_params[category].items(): | |
transformation_function = transformation_functions[transformation] | |
# Check if there is panel data to group by | |
if len(panel) > 0: | |
# Apply the transformation to each group | |
category_df = pd.concat( | |
[ | |
df_slice.groupby(panel) | |
.transform(transformation_function, p) | |
.add_suffix(f"@{transformation}_{p}") | |
for p in parameters | |
], | |
axis=1, | |
) | |
# Replace all NaN or null values in category_df with 0 | |
category_df.fillna(0, inplace=True) | |
# Update df_slice | |
df_slice = pd.concat( | |
[df[panel], category_df], | |
axis=1, | |
) | |
else: | |
for p in parameters: | |
# Apply the transformation function to each column | |
temp_df = df_slice.apply( | |
lambda x: transformation_function(x, p), axis=0 | |
).rename(lambda x: f"{x}@{transformation}_{p}", axis="columns") | |
# Concatenate the transformed DataFrame slice to the category DataFrame | |
category_df = pd.concat([category_df, temp_df], axis=1) | |
# Replace all NaN or null values in category_df with 0 | |
category_df.fillna(0, inplace=True) | |
# Update df_slice | |
df_slice = pd.concat( | |
[df[panel], category_df], | |
axis=1, | |
) | |
# If category_df has been modified, concatenate it with the panel and response metrics from the original DataFrame | |
if not category_df.empty: | |
final_df = pd.concat([df, category_df], axis=1) | |
else: | |
# If no transformations were applied, use the original DataFrame | |
final_df = df | |
return final_df | |
# Function to infers the granularity of the date column in a DataFrame | |
def infer_date_granularity(df): | |
# Find the most common difference | |
common_freq = pd.Series(df["date"].unique()).diff().dt.days.dropna().mode()[0] | |
# Map the most common difference to a granularity | |
if common_freq == 1: | |
return "daily" | |
elif common_freq == 7: | |
return "weekly" | |
elif 28 <= common_freq <= 31: | |
return "monthly" | |
else: | |
return "irregular" | |
######################################################################################################################################################### | |
# User input for transformations | |
######################################################################################################################################################### | |
# Infer date granularity | |
date_granularity = infer_date_granularity(final_df_loaded) | |
# Initialize the main dictionary to store the transformation parameters for each category | |
transform_params = {"Media": {}, "Internal": {}, "Exogenous": {}} | |
# User input for transformations | |
st.markdown("### Select Transformations to Apply") | |
for category in ["Media", "Internal", "Exogenous"]: | |
# Skip Internal | |
if category == "Internal": | |
continue | |
transformation_widgets(category, transform_params, date_granularity) | |
######################################################################################################################################################### | |
# Apply transformations | |
######################################################################################################################################################### | |
# Apply category-based transformations to the DataFrame | |
if st.button("Accept and Proceed", use_container_width=True): | |
with st.spinner("Applying transformations..."): | |
final_df = apply_category_transformations( | |
final_df_loaded, bin_dict_loaded, transform_params, panel | |
) | |
# Generate a dictionary mapping original column names to lists of transformed column names | |
transformed_columns_dict, summary_string = generate_transformed_columns( | |
original_columns, transform_params | |
) | |
# Store into transformed dataframe and summary session state | |
st.session_state["final_df"] = final_df | |
st.session_state["summary_string"] = summary_string | |
######################################################################################################################################################### | |
# Display the transformed DataFrame and summary | |
######################################################################################################################################################### | |
# Display the transformed DataFrame in the Streamlit app | |
st.markdown("### Transformed DataFrame") | |
st.dataframe(st.session_state["final_df"], hide_index=True) | |
# Total rows and columns | |
total_rows, total_columns = st.session_state["final_df"].shape | |
st.markdown( | |
f"<p style='text-align: justify;'>The transformed DataFrame contains <strong>{total_rows}</strong> rows and <strong>{total_columns}</strong> columns.</p>", | |
unsafe_allow_html=True, | |
) | |
# Display the summary of transformations as markdown | |
if st.session_state["summary_string"]: | |
with st.expander("Summary of Transformations"): | |
st.markdown("### Summary of Transformations") | |
st.markdown(st.session_state["summary_string"], unsafe_allow_html=True) | |
def save_to_pickle(file_path, final_df): | |
# Open the file in write-binary mode and dump the objects | |
with open(file_path, "wb") as f: | |
pickle.dump({"final_df_transformed": final_df}, f) | |
# Data is now saved to file | |
if st.button("Accept and Save", use_container_width=True): | |
save_to_pickle( | |
"final_df_transformed.pkl", st.session_state["final_df"] | |
) | |
st.toast("💾 Saved Successfully!") |