Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
import io | |
import json | |
import warnings | |
import google.generativeai as genai | |
import os | |
from contextlib import redirect_stdout | |
# --- Configuration --- | |
warnings.filterwarnings('ignore') | |
# --- Expert-Crafted Dark Theme CSS --- | |
CSS = """ | |
/* --- Phoenix UI Custom Dark CSS --- */ | |
/* Stat Card Styling */ | |
.stat-card { | |
border-radius: 12px !important; | |
padding: 20px !important; | |
background: #1f2937 !important; /* Dark blue-gray background */ | |
border: 1px solid #374151 !important; | |
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); | |
text-align: center; | |
} | |
.stat-card-title { font-size: 16px; font-weight: 500; color: #9ca3af !important; margin-bottom: 8px; } | |
.stat-card-value { font-size: 32px; font-weight: 700; color: #f9fafb !important; } | |
/* General Layout & Feel */ | |
.gradio-container { font-family: 'Inter', sans-serif; } | |
.gr-button { box-shadow: 0 1px 2px 0 rgba(0,0,0,0.05); } | |
/* Sidebar Styling */ | |
.sidebar { | |
background-color: #111827 !important; /* Very dark blue-gray */ | |
padding: 15px; | |
border-right: 1px solid #374151 !important; | |
min-height: 100vh; | |
} | |
.sidebar .gr-button { | |
width: 100%; | |
text-align: left !important; | |
background: none !important; | |
border: none !important; | |
box-shadow: none !important; | |
color: #d1d5db !important; /* Light gray text for readability */ | |
font-size: 16px !important; | |
padding: 12px 10px !important; | |
margin-bottom: 8px !important; | |
border-radius: 8px !important; | |
} | |
.sidebar .gr-button:hover { background-color: #374151 !important; } /* Hover state */ | |
.sidebar .gr-button.selected { background-color: #4f46e5 !important; font-weight: 600 !important; color: white !important; } /* Selected state with primary color */ | |
/* AI Co-pilot Styling */ | |
.code-block { border: 1px solid #374151 !important; border-radius: 8px; } | |
.explanation-block { | |
background-color: #1e3a8a !important; /* Dark blue background */ | |
border-left: 4px solid #3b82f6 !important; /* Brighter blue border */ | |
padding: 12px; | |
color: #e5e7eb !important; | |
} | |
""" | |
# --- Helper Functions --- | |
def safe_exec(code_string: str, local_vars: dict): | |
output_buffer = io.StringIO() | |
try: | |
with redirect_stdout(output_buffer): | |
exec(code_string, globals(), local_vars) | |
stdout = output_buffer.getvalue() | |
fig = local_vars.get('fig') | |
result_df = local_vars.get('result_df') | |
return stdout, fig, result_df, None | |
except Exception as e: | |
return None, None, None, f"Execution Error: {str(e)}" | |
# --- Core Data Processing & State Management --- | |
def load_and_process_file(file_obj, state_dict): | |
if file_obj is None: return state_dict, "Please upload a file.", *[gr.update(visible=False)] * 4 | |
try: | |
df = pd.read_csv(file_obj.name, low_memory=False) | |
for col in df.select_dtypes(include=['object']).columns: | |
try: df[col] = pd.to_datetime(df[col], errors='raise') | |
except (ValueError, TypeError): continue | |
metadata = extract_dataset_metadata(df) | |
state_dict = {'df': df, 'metadata': metadata, 'filename': os.path.basename(file_obj.name), 'dashboard_plots': []} | |
status_msg = f"β **{state_dict['filename']}** loaded successfully." | |
cockpit_update = gr.update(visible=True) | |
welcome_update = gr.update(visible=False) | |
rows, cols = metadata['shape'] | |
quality = metadata['data_quality'] | |
return (state_dict, status_msg, welcome_update, cockpit_update, gr.update(visible=False), gr.update(visible=False), | |
gr.update(value=f"{rows:,}"), gr.update(value=cols), gr.update(value=f"{quality}%"), gr.update(value=f"{len(metadata['datetime_cols'])}"), | |
gr.update(choices=metadata['columns']), gr.update(choices=metadata['columns']), gr.update(choices=metadata['columns'])) | |
except Exception as e: | |
return state_dict, f"β **Error:** {e}", *[gr.update()] * 11 | |
def extract_dataset_metadata(df: pd.DataFrame): | |
rows, cols = df.shape | |
numeric_cols = df.select_dtypes(include=np.number).columns.tolist() | |
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() | |
datetime_cols = df.select_dtypes(include=['datetime64', 'datetime64[ns]']).columns.tolist() | |
data_quality = round((df.notna().sum().sum() / (rows * cols)) * 100, 1) if rows * cols > 0 else 0 | |
return {'shape': (rows, cols), 'columns': df.columns.tolist(), 'numeric_cols': numeric_cols, 'categorical_cols': categorical_cols, | |
'datetime_cols': datetime_cols, 'dtypes': df.dtypes.to_string(), 'head': df.head().to_string(), 'data_quality': data_quality} | |
# --- Page Navigation --- | |
def switch_page(page_name): | |
return (gr.update(visible=page_name=="cockpit"), gr.update(visible=page_name=="deep_dive"), gr.update(visible=page_name=="co-pilot")) | |
# --- Page 1: Data Cockpit --- | |
def get_ai_suggestions(state_dict, api_key): | |
if not api_key: return "Enter your Gemini API key to get suggestions.", *[gr.update(visible=False)]*5 | |
if not state_dict: return "Upload data first.", *[gr.update(visible=False)]*5 | |
metadata = state_dict['metadata'] | |
prompt = f""" | |
Based on the following dataset metadata, generate 3 to 5 specific, actionable, and interesting analytical questions a user might want to ask. Frame them as questions. | |
- **Columns:** {', '.join(metadata['columns'])} | |
- **Numeric:** {', '.join(metadata['numeric_cols'])} | |
- **Categorical:** {', '.join(metadata['categorical_cols'])} | |
- **Datetime:** {', '.join(metadata['datetime_cols'])} | |
Return ONLY a JSON list of strings. Example: ["What is the trend of sales over time?", "Which category has the highest average price?"] | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
response = model.generate_content(prompt) | |
suggestions = json.loads(response.text) | |
buttons = [gr.Button(s, variant="secondary", visible=True) for s in suggestions] | |
buttons += [gr.Button(visible=False)] * (5 - len(buttons)) | |
return gr.update(visible=False), *buttons | |
except Exception as e: | |
return f"Could not generate suggestions: {e}", *[gr.update(visible=False)]*5 | |
def handle_suggestion_click(question_text): | |
return (gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), question_text) | |
# --- Page 2: Deep Dive Dashboard --- | |
def add_plot_to_dashboard(state_dict, x_col, y_col, plot_type): | |
if not x_col: | |
gr.Warning("Please select at least an X-axis column.") | |
return state_dict, state_dict.get('dashboard_plots', []) | |
df = state_dict['df'] | |
title = f"{plot_type.capitalize()}: {y_col} by {x_col}" if y_col else f"Distribution of {x_col}" | |
fig = None | |
try: | |
if plot_type == 'histogram': fig = px.histogram(df, x=x_col, title=title) | |
elif plot_type == 'box': fig = px.box(df, x=x_col, y=y_col, title=title) | |
elif plot_type == 'scatter': fig = px.scatter(df, x=x_col, y=y_col, title=title, trendline="ols") | |
elif plot_type == 'bar': | |
counts = df[x_col].value_counts().nlargest(20) | |
fig = px.bar(counts, x=counts.index, y=counts.values, title=f"Top 20 Categories for {x_col}") | |
fig.update_xaxes(title=x_col) | |
if fig: | |
fig.update_layout(template="plotly_dark") | |
state_dict['dashboard_plots'].append(fig) | |
# CORRECTED: Return the list of plots to the Gallery component | |
return state_dict, state_dict['dashboard_plots'] | |
except Exception as e: | |
gr.Warning(f"Plotting Error: {e}") | |
return state_dict, state_dict.get('dashboard_plots', []) | |
def clear_dashboard(state_dict): | |
state_dict['dashboard_plots'] = [] | |
# CORRECTED: Return an empty list to clear the Gallery | |
return state_dict, [] | |
# --- Page 3: AI Co-pilot --- | |
def respond_to_chat(user_message, history, state_dict, api_key): | |
if not api_key: | |
history.append((user_message, "I need a Gemini API key to function. Please provide it in the sidebar.")) | |
return history, *[gr.update(visible=False)] * 4 | |
if not state_dict: | |
history.append((user_message, "Please upload a dataset first.")) | |
return history, *[gr.update(visible=False)] * 4 | |
history.append((user_message, None)) | |
metadata = state_dict['metadata'] | |
prompt = f""" | |
You are 'Phoenix Co-pilot', an expert AI data analyst... [Prompt remains the same] | |
**User Question:** "{user_message}" | |
**Your JSON Response:** | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
response = model.generate_content(prompt) | |
response_json = json.loads(response.text.strip().replace("```json", "").replace("```", "")) | |
thought = response_json.get("thought", "Thinking...") | |
code_to_run = response_json.get("code", "") | |
explanation = response_json.get("explanation", "Here is the result.") | |
stdout, fig_result, df_result, error = safe_exec(code_to_run, {'df': state_dict['df'], 'px': px, 'pd': pd, 'np': np}) | |
history[-1] = (user_message, f"π€ **Thought:** *{thought}*") | |
output_updates = [gr.update(visible=False, value=None)] * 4 | |
if explanation: output_updates[0] = gr.update(visible=True, value=f"**Phoenix Co-pilot:** {explanation}") | |
if code_to_run: output_updates[1] = gr.update(visible=True, value=code_to_run) | |
if fig_result: output_updates[2] = gr.update(visible=True, value=fig_result) | |
if df_result is not None: output_updates[3] = gr.update(visible=True, value=df_result) | |
if stdout: | |
new_explanation = (output_updates[0]['value'] if output_updates[0]['visible'] else "") + f"\n\n**Console Output:**\n```\n{stdout}\n```" | |
output_updates[0] = gr.update(visible=True, value=new_explanation) | |
if error: | |
output_updates[0] = gr.update(visible=True, value=f"**Phoenix Co-pilot:** I encountered an error. Here's the details:\n\n`{error}`") | |
return history, *output_updates | |
except Exception as e: | |
history[-1] = (user_message, f"A critical error occurred: {e}. The AI may have returned an invalid response.") | |
return history, *[gr.update(visible=False)] * 4 | |
# --- Gradio UI Definition --- | |
def create_gradio_interface(): | |
with gr.Blocks(theme=gr.themes.Glass(primary_hue="indigo", secondary_hue="blue"), css=CSS, title="Phoenix AI Data Explorer") as demo: | |
global_state = gr.State({}) | |
with gr.Row(): | |
# Sidebar | |
with gr.Column(scale=1, elem_classes="sidebar"): | |
gr.Markdown("## π Phoenix UI") | |
cockpit_btn = gr.Button("π Data Cockpit", elem_classes="selected") | |
deep_dive_btn = gr.Button("π Deep Dive Builder") | |
copilot_btn = gr.Button("π€ AI Co-pilot") | |
gr.Markdown("---") | |
file_input = gr.File(label="π Upload New CSV", file_types=[".csv"]) | |
status_output = gr.Markdown("Status: Awaiting data...") | |
gr.Markdown("---") | |
api_key_input = gr.Textbox(label="π Gemini API Key", type="password", placeholder="Enter key here...") | |
suggestion_btn = gr.Button("Get Smart Suggestions", variant="secondary") | |
# Main Content Area | |
with gr.Column(scale=4): | |
with gr.Column(visible=True) as welcome_page: | |
gr.Markdown("# Welcome to the AI Data Explorer (Phoenix UI)") | |
gr.Markdown("Please **upload a CSV file** and **enter your Gemini API key** in the sidebar to begin.") | |
gr.Image(value="workflow.png", label="Workflow", show_label=False, show_download_button=False, container=False) | |
with gr.Column(visible=False) as cockpit_page: | |
gr.Markdown("## π Data Cockpit") | |
# ... [Stat cards code remains the same] ... | |
suggestion_status = gr.Markdown(visible=True) | |
with gr.Accordion(label="β¨ AI Smart Suggestions", open=True): | |
suggestion_buttons = [gr.Button(visible=False) for _ in range(5)] | |
with gr.Column(visible=False) as deep_dive_page: | |
gr.Markdown("## π Deep Dive Dashboard Builder") | |
gr.Markdown("Create a custom dashboard by adding multiple plots to the gallery below.") | |
with gr.Row(): | |
plot_type_dd = gr.Dropdown(['histogram', 'bar', 'scatter', 'box'], label="Plot Type", value='histogram') | |
x_col_dd = gr.Dropdown([], label="X-Axis / Column") | |
y_col_dd = gr.Dropdown([], label="Y-Axis (for Scatter/Box)") | |
with gr.Row(): | |
add_plot_btn = gr.Button("Add to Dashboard", variant="primary") | |
clear_plots_btn = gr.Button("Clear Dashboard") | |
# CORRECTED: Replaced Accordion with Gallery | |
dashboard_gallery = gr.Gallery(label="π Your Custom Dashboard", height="auto", columns=2) | |
with gr.Column(visible=False) as copilot_page: | |
gr.Markdown("## π€ AI Co-pilot") | |
# ... [Co-pilot code remains the same] ... | |
# --- Event Handlers --- | |
pages = [cockpit_page, deep_dive_page, copilot_page] | |
nav_buttons = [cockpit_btn, deep_dive_btn, copilot_btn] | |
for i, btn in enumerate(nav_buttons): | |
btn.click(lambda i=i: switch_page(nav_buttons[i].value.lower().replace(" ", "_").split(" ")[-1]), outputs=pages) \ | |
.then(lambda i=i: [gr.update(elem_classes="selected" if j==i else "") for j in range(len(nav_buttons))], outputs=nav_buttons) | |
file_input.upload(load_and_process_file, [file_input, global_state], | |
[global_state, status_output, welcome_page, cockpit_page, deep_dive_page, copilot_page, | |
rows_stat, cols_stat, quality_stat, time_cols_stat, | |
x_col_dd, y_col_dd, plot_type_dd]) \ | |
.then(lambda: switch_page("cockpit"), outputs=pages) \ | |
.then(lambda: [gr.update(elem_classes="selected"), gr.update(elem_classes=""), gr.update(elem_classes="")], outputs=nav_buttons) | |
suggestion_btn.click(get_ai_suggestions, [global_state, api_key_input], [suggestion_status, *suggestion_buttons]) | |
for btn in suggestion_buttons: | |
btn.click(handle_suggestion_click, inputs=[btn], outputs=[cockpit_page, deep_dive_page, copilot_page, chat_input]) \ | |
.then(lambda: (gr.update(elem_classes=""), gr.update(elem_classes=""), gr.update(elem_classes="selected")), outputs=nav_buttons) | |
# CORRECTED: Event handlers now output to the gallery | |
add_plot_btn.click(add_plot_to_dashboard, [global_state, x_col_dd, y_col_dd, plot_type_dd], [global_state, dashboard_gallery]) | |
clear_plots_btn.click(clear_dashboard, [global_state], [global_state, dashboard_gallery]) | |
chat_submit_btn.click(respond_to_chat, [chat_input, chatbot, global_state, api_key_input], | |
[chatbot, copilot_explanation, copilot_code, copilot_plot, copilot_table]) \ | |
.then(lambda: "", outputs=[chat_input]) | |
chat_input.submit(respond_to_chat, [chat_input, chatbot, global_state, api_key_input], | |
[chatbot, copilot_explanation, copilot_code, copilot_plot, copilot_table]) \ | |
.then(lambda: "", outputs=[chat_input]) | |
return demo | |
if __name__ == "__main__": | |
app = create_gradio_interface() | |
app.launch(debug=True) |