# Importing necessary libraries import streamlit as st st.set_page_config( page_title="Saved Scenarios", page_icon="⚖️", layout="wide", initial_sidebar_state="collapsed", ) import io import os import json import pickle import sqlite3 import zipfile import numpy as np import pandas as pd from classes import numerize from openpyxl import Workbook from collections import OrderedDict from utilities import ( project_selection, initialize_data, set_header, load_local_css, name_formating, ) # Styling load_local_css("styles.css") set_header() # Create project_dct if "project_dct" not in st.session_state: project_selection() st.stop() database_file = r"DB\User.db" conn = sqlite3.connect( database_file, check_same_thread=False ) # connection with sql db c = conn.cursor() # Display project info col_project_data = st.columns([2, 1]) with col_project_data[0]: st.markdown(f"**Welcome {st.session_state['username']}**") with col_project_data[1]: st.markdown(f"**Current Project: {st.session_state['project_name']}**") # Page Title st.title("Saved Scenarios") scenarios_name_placeholder = st.empty() # Function to get saved scenarios dictionary def get_saved_scenarios_dict(): # Path to the saved scenarios file saved_scenarios_dict_path = os.path.join( st.session_state["project_path"], "saved_scenarios.pkl" ) # Load existing scenarios if the file exists if os.path.exists(saved_scenarios_dict_path): with open(saved_scenarios_dict_path, "rb") as f: saved_scenarios_dict = pickle.load(f) else: saved_scenarios_dict = OrderedDict() return saved_scenarios_dict # Function to format values based on their size def format_value(value): return round(value, 4) if value < 1 else round(value, 1) # Function to recursively convert non-serializable types to serializable ones def convert_to_serializable(obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, dict): return {key: convert_to_serializable(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_serializable(element) for element in obj] elif isinstance(obj, (int, float, str, bool, type(None))): return obj else: # Fallback: convert the object to a string return str(obj) # Function to generate zip file of current scenario @st.cache_data(show_spinner=False) def download_as_zip( df, scenario_data, excel_name="optimization_results.xlsx", json_name="scenario_params.json", ): # Create an in-memory bytes buffer for the ZIP file buffer = io.BytesIO() # Create a ZipFile object in memory with zipfile.ZipFile(buffer, "w") as zip_file: # Save the DataFrame to an Excel file in the zip using openpyxl excel_buffer = io.BytesIO() workbook = Workbook() sheet = workbook.active sheet.title = "Results" # Write DataFrame headers for col_num, column_title in enumerate(df.columns, 1): sheet.cell(row=1, column=col_num, value=column_title) # Write DataFrame data for row_num, row_data in enumerate(df.values, 2): for col_num, cell_value in enumerate(row_data, 1): sheet.cell(row=row_num, column=col_num, value=cell_value) # Save the workbook to the in-memory buffer workbook.save(excel_buffer) excel_buffer.seek(0) # Rewind the buffer to the beginning zip_file.writestr(excel_name, excel_buffer.getvalue()) # Save the dictionary to a JSON file in the zip json_buffer = io.BytesIO() json_buffer.write( json.dumps(convert_to_serializable(scenario_data), indent=4).encode("utf-8") ) json_buffer.seek(0) # Rewind the buffer to the beginning zip_file.writestr(json_name, json_buffer.getvalue()) buffer.seek(0) # Rewind the buffer to the beginning return buffer # Function to delete the selected scenario from the saved scenarios dictionary def delete_selected_scenarios(selected_scenario): # Path to the saved scenarios file saved_scenarios_dict_path = os.path.join( st.session_state["project_path"], "saved_scenarios.pkl" ) # Load existing scenarios if the file exists if os.path.exists(saved_scenarios_dict_path): with open(saved_scenarios_dict_path, "rb") as f: saved_scenarios_dict = pickle.load(f) else: saved_scenarios_dict = OrderedDict() try: # Attempt to delete the selected scenario del saved_scenarios_dict[selected_scenario] # Save the updated dictionary back to the file with open(saved_scenarios_dict_path, "wb") as f: pickle.dump(saved_scenarios_dict, f) except KeyError: # If the scenario is not found in the dictionary, ignore the error pass # Get saved scenarios dictionary and scenario name list saved_scenarios_dict = get_saved_scenarios_dict() scenarios_list = list(saved_scenarios_dict.keys()) # Check if the list of saved scenarios is empty if len(scenarios_list) == 0: # Display a warning message if no scenarios are saved st.warning("No scenarios saved. Please save a scenario to load.", icon="⚠️") st.stop() # Display a dropdown saved scenario list selected_scenario = st.selectbox( "Pick a Scenario", sorted(scenarios_list), key="selected_scenario" ) selected_scenario_data = saved_scenarios_dict[selected_scenario] # Scenarios Name metrics_name = selected_scenario_data["metrics_selected"] panel_name = selected_scenario_data["panel_selected"] optimization_name = selected_scenario_data["optimization"] # Display the scenario details with bold "Metric," "Panel," and "Optimization" scenarios_name_placeholder.markdown( f"**Metric**: {name_formating(metrics_name)}; **Panel**: {name_formating(panel_name)}; **Optimization**: {name_formating(optimization_name)}" ) # Create columns for download and delete buttons download_col, delete_col = st.columns(2) channels_list = list(selected_scenario_data["channels"].keys()) # List to hold data for all channels channels_data = [] # Iterate through each channel and gather required data for channel in channels_list: channel_conversion_rate = selected_scenario_data["channels"][channel][ "conversion_rate" ] channel_actual_spends = ( selected_scenario_data["channels"][channel]["actual_total_spends"] * channel_conversion_rate ) channel_optimized_spends = ( selected_scenario_data["channels"][channel]["modified_total_spends"] * channel_conversion_rate ) channel_actual_metrics = selected_scenario_data["channels"][channel][ "actual_total_sales" ] channel_optimized_metrics = selected_scenario_data["channels"][channel][ "modified_total_sales" ] channel_roi_mroi_data = selected_scenario_data["channel_roi_mroi"][channel] # Extract the ROI and MROI data actual_roi = channel_roi_mroi_data["actual_roi"] optimized_roi = channel_roi_mroi_data["optimized_roi"] actual_mroi = channel_roi_mroi_data["actual_mroi"] optimized_mroi = channel_roi_mroi_data["optimized_mroi"] # Calculate spends per metric spends_per_metrics_actual = channel_actual_spends / channel_actual_metrics spends_per_metrics_optimized = channel_optimized_spends / channel_optimized_metrics # Append the collected data as a dictionary to the list channels_data.append( { "Channel Name": channel, "Spends Actual": numerize(channel_actual_spends), "Spends Optimized": numerize(channel_optimized_spends), f"{name_formating(metrics_name)} Actual": numerize(channel_actual_metrics), f"{name_formating(metrics_name)} Optimized": numerize( channel_optimized_metrics ), "ROI Actual": format_value(actual_roi), "ROI Optimized": format_value(optimized_roi), "MROI Actual": format_value(actual_mroi), "MROI Optimized": format_value(optimized_mroi), f"Spends per {name_formating(metrics_name)} Actual": round( spends_per_metrics_actual, 2 ), f"Spends per {name_formating(metrics_name)} Optimized": round( spends_per_metrics_optimized, 2 ), } ) # Create a DataFrame from the collected data df = pd.DataFrame(channels_data) # Display the DataFrame st.dataframe(df, hide_index=True) # Generate download able data for selected scenario buffer = download_as_zip( df, selected_scenario_data, excel_name="optimization_results.xlsx", json_name="scenario_params.json", ) # Provide the buffer as a downloadable ZIP file in Streamlit download_col.download_button( label="Download", data=buffer, file_name=f"{selected_scenario}_scenario_data.zip", mime="application/zip", use_container_width=True, ) # Button to trigger the deletion of the selected scenario delete_col.button( "Delete", use_container_width=True, on_click=delete_selected_scenarios, args=(selected_scenario,), )