v6Mastercardapp / pages /10_Saved_Scenarios.py
BlendMMM's picture
Upload 11 files
ff89010 verified
import streamlit as st
from numerize.numerize import numerize
import io
import pandas as pd
from utilities import (
format_numbers,
decimal_formater,
channel_name_formating,
load_local_css,
set_header,
initialize_data,
load_authenticator,
)
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
import pickle
import streamlit_authenticator as stauth
import yaml
from yaml import SafeLoader
from classes import class_from_dict
from utilities import update_db
st.set_page_config(layout="wide")
load_local_css("styles.css")
set_header()
# for k, v in st.session_state.items():
# if k not in ['logout', 'login','config'] and not k.startswith('FormSubmitter'):
# st.session_state[k] = v
def create_scenario_summary(scenario_dict):
summary_rows = []
for channel_dict in scenario_dict["channels"]:
name_mod = channel_name_formating(channel_dict["name"])
summary_rows.append(
[
name_mod,
channel_dict.get("actual_total_spends")
* channel_dict.get("conversion_rate"),
channel_dict.get("modified_total_spends")
* channel_dict.get("conversion_rate"),
channel_dict.get("actual_total_sales"),
channel_dict.get("modified_total_sales"),
channel_dict.get("actual_total_sales")
/ (
channel_dict.get("actual_total_spends")
* channel_dict.get("conversion_rate")
),
channel_dict.get("modified_total_sales")
/ (
channel_dict.get("modified_total_spends")
* channel_dict.get("conversion_rate")
),
channel_dict.get("actual_mroi"),
channel_dict.get("modified_mroi"),
channel_dict.get("actual_total_spends")
* channel_dict.get("conversion_rate")
/ channel_dict.get("actual_total_sales"),
channel_dict.get("modified_total_spends")
* channel_dict.get("conversion_rate")
/ channel_dict.get("modified_total_sales"),
]
)
summary_rows.append(
[
"Total",
scenario_dict.get("actual_total_spends"),
scenario_dict.get("modified_total_spends"),
scenario_dict.get("actual_total_sales"),
scenario_dict.get("modified_total_sales"),
scenario_dict.get("actual_total_sales")
/ scenario_dict.get("actual_total_spends"),
scenario_dict.get("modified_total_sales")
/ scenario_dict.get("modified_total_spends"),
"-",
"-",
scenario_dict.get("actual_total_spends")
/ scenario_dict.get("actual_total_sales"),
scenario_dict.get("modified_total_spends")
/ scenario_dict.get("modified_total_sales"),
]
)
columns_index = pd.MultiIndex.from_product(
[[""], ["Channel"]], names=["first", "second"]
)
columns_index = columns_index.append(
pd.MultiIndex.from_product(
[
["Spends", "NRPU", "ROI", "MROI", "Spend per NRPU"],
["Actual", "Simulated"],
],
names=["first", "second"],
)
)
return pd.DataFrame(summary_rows, columns=columns_index)
def summary_df_to_worksheet(df, ws):
heading_fill = PatternFill(
fill_type="solid", start_color="FF11B6BD", end_color="FF11B6BD"
)
for j, header in enumerate(df.columns.values):
col = j + 1
for i in range(1, 3):
ws.cell(row=i, column=j + 1, value=header[i - 1]).font = Font(
bold=True, color="FF11B6BD"
)
ws.cell(row=i, column=j + 1).fill = heading_fill
if col > 1 and (col - 6) % 5 == 0:
ws.merge_cells(start_row=1, end_row=1, start_column=col - 3, end_column=col)
ws.cell(row=1, column=col).alignment = Alignment(horizontal="center")
for i, row in enumerate(df.itertuples()):
for j, value in enumerate(row):
if j == 0:
continue
elif (j - 2) % 4 == 0 or (j - 3) % 4 == 0:
ws.cell(row=i + 3, column=j, value=value).number_format = "$#,##0.0"
else:
ws.cell(row=i + 3, column=j, value=value)
from openpyxl.utils import get_column_letter
from openpyxl.styles import Font, PatternFill
import logging
def scenario_df_to_worksheet(df, ws):
heading_fill = PatternFill(
start_color="FF11B6BD", end_color="FF11B6BD", fill_type="solid"
)
for j, header in enumerate(df.columns.values):
cell = ws.cell(row=1, column=j + 1, value=header)
cell.font = Font(bold=True, color="FF11B6BD")
cell.fill = heading_fill
for i, row in enumerate(df.itertuples()):
for j, value in enumerate(
row[1:], start=1
): # Start from index 1 to skip the index column
try:
cell = ws.cell(row=i + 2, column=j, value=value)
if isinstance(value, (int, float)):
cell.number_format = "$#,##0.0"
elif isinstance(value, str):
cell.value = value[:32767]
else:
cell.value = str(value)
except ValueError as e:
logging.error(
f"Error assigning value '{value}' to cell {get_column_letter(j)}{i+2}: {e}"
)
cell.value = None # Assign None to the cell where the error occurred
return ws
def download_scenarios():
"""
Makes a excel with all saved scenarios and saves it locally
"""
## create summary page
if len(scenarios_to_download) == 0:
return
wb = Workbook()
wb.iso_dates = True
wb.remove(wb.active)
st.session_state["xlsx_buffer"] = io.BytesIO()
summary_df = None
# print(scenarios_to_download)
for scenario_name in scenarios_to_download:
scenario_dict = st.session_state["saved_scenarios"][scenario_name]
_spends = []
column_names = ["Date"]
_sales = None
dates = None
summary_rows = []
for channel in scenario_dict["channels"]:
if dates is None:
dates = channel.get("dates")
_spends.append(dates)
if _sales is None:
_sales = channel.get("modified_sales")
else:
_sales += channel.get("modified_sales")
_spends.append(
channel.get("modified_spends") * channel.get("conversion_rate")
)
column_names.append(channel.get("name"))
name_mod = channel_name_formating(channel["name"])
summary_rows.append(
[
name_mod,
channel.get("modified_total_spends")
* channel.get("conversion_rate"),
channel.get("modified_total_sales"),
channel.get("modified_total_sales")
/ channel.get("modified_total_spends")
* channel.get("conversion_rate"),
channel.get("modified_mroi"),
channel.get("modified_total_sales")
/ channel.get("modified_total_spends")
* channel.get("conversion_rate"),
]
)
_spends.append(_sales)
column_names.append("NRPU")
scenario_df = pd.DataFrame(_spends).T
scenario_df.columns = column_names
## write to sheet
ws = wb.create_sheet(scenario_name)
scenario_df_to_worksheet(scenario_df, ws)
summary_rows.append(
[
"Total",
scenario_dict.get("modified_total_spends"),
scenario_dict.get("modified_total_sales"),
scenario_dict.get("modified_total_sales")
/ scenario_dict.get("modified_total_spends"),
"-",
scenario_dict.get("modified_total_spends")
/ scenario_dict.get("modified_total_sales"),
]
)
columns_index = pd.MultiIndex.from_product(
[[""], ["Channel"]], names=["first", "second"]
)
columns_index = columns_index.append(
pd.MultiIndex.from_product(
[[scenario_name], ["Spends", "NRPU", "ROI", "MROI", "Spends per NRPU"]],
names=["first", "second"],
)
)
if summary_df is None:
summary_df = pd.DataFrame(summary_rows, columns=columns_index)
summary_df = summary_df.set_index(("", "Channel"))
else:
_df = pd.DataFrame(summary_rows, columns=columns_index)
_df = _df.set_index(("", "Channel"))
summary_df = summary_df.merge(_df, left_index=True, right_index=True)
ws = wb.create_sheet("Summary", 0)
summary_df_to_worksheet(summary_df.reset_index(), ws)
wb.save(st.session_state["xlsx_buffer"])
st.session_state["disable_download_button"] = False
def disable_download_button():
st.session_state["disable_download_button"] = True
def transform(x):
if x.name == ("", "Channel"):
return x
elif x.name[0] == "ROI" or x.name[0] == "MROI":
return x.apply(
lambda y: (
y
if isinstance(y, str)
else decimal_formater(
format_numbers(y, include_indicator=False, n_decimals=4),
n_decimals=4,
)
)
)
else:
return x.apply(lambda y: y if isinstance(y, str) else format_numbers(y))
def delete_scenario():
if selected_scenario in st.session_state["saved_scenarios"]:
del st.session_state["saved_scenarios"][selected_scenario]
with open("../saved_scenarios.pkl", "wb") as f:
pickle.dump(st.session_state["saved_scenarios"], f)
def load_scenario():
if selected_scenario in st.session_state["saved_scenarios"]:
st.session_state["scenario"] = class_from_dict(selected_scenario_details)
authenticator = st.session_state.get("authenticator")
if authenticator is None:
authenticator = load_authenticator()
name, authentication_status, username = authenticator.login("Login", "main")
auth_status = st.session_state.get("authentication_status")
if auth_status == True:
is_state_initiaized = st.session_state.get("initialized", False)
if not is_state_initiaized:
# print("Scenario page state reloaded")
initialize_data()
saved_scenarios = st.session_state["saved_scenarios"]
if len(saved_scenarios) == 0:
st.header("No saved scenarios")
else:
selected_scenario_list = list(saved_scenarios.keys())
if "selected_scenario_selectbox_key" not in st.session_state:
st.session_state["selected_scenario_selectbox_key"] = (
selected_scenario_list[
st.session_state["project_dct"]["saved_scenarios"][
"selected_scenario_selectbox_key"
]
]
)
col_a, col_b = st.columns(2)
selected_scenario = col_a.selectbox(
"Pick a scenario to view details",
selected_scenario_list,
# key="selected_scenario_selectbox_key",
index=st.session_state["project_dct"]["saved_scenarios"][
"selected_scenario_selectbox_key"
],
)
st.session_state["project_dct"]["saved_scenarios"][
"selected_scenario_selectbox_key"
] = selected_scenario_list.index(selected_scenario)
scenarios_to_download = col_b.multiselect(
"Select scenarios to download",
list(saved_scenarios.keys()),
on_change=disable_download_button,
)
with col_a:
col3, col4 = st.columns(2)
col4.button(
"Delete scenarios",
on_click=delete_scenario,
use_container_width=True,
)
col3.button(
"Load Scenario",
on_click=load_scenario,
use_container_width=True,
)
with col_b:
col1, col2 = st.columns(2)
col1.button(
"Prepare download",
on_click=download_scenarios,
use_container_width=True,
)
col2.download_button(
label="Download Scenarios",
data=st.session_state["xlsx_buffer"].getvalue(),
file_name="scenarios.xlsx",
mime="application/vnd.ms-excel",
disabled=st.session_state["disable_download_button"],
on_click=disable_download_button,
use_container_width=True,
)
# column_1, column_2, column_3 = st.columns((6, 1, 1))
# with column_1:
# st.header(selected_scenario)
# with column_2:
# st.button("Delete scenarios", on_click=delete_scenario)
# with column_3:
# st.button("Load Scenario", on_click=load_scenario)
selected_scenario_details = saved_scenarios[selected_scenario]
#st.write(pd.DataFrame(selected_scenario_details))
pd.set_option("display.max_colwidth", 100)
st.markdown(
create_scenario_summary(selected_scenario_details)
.transform(transform)
.style.set_table_styles(
[
{"selector": "th", "props": [("background-color", "#FFFFFF")]},
{
"selector": "tr:nth-child(even)",
"props": [("background-color", "#FFFFFF")],
},
]
)
.to_html(),
unsafe_allow_html=True,
)
elif auth_status == False:
st.error("Username/Password is incorrect")
if auth_status != True:
try:
username_forgot_pw, email_forgot_password, random_password = (
authenticator.forgot_password("Forgot password")
)
if username_forgot_pw:
st.success("New password sent securely")
# Random password to be transferred to user securely
elif username_forgot_pw == False:
st.error("Username not found")
except Exception as e:
st.error(e)