Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
import io | |
import json | |
import warnings | |
import google.generativeai as genai | |
import os | |
from contextlib import redirect_stdout | |
# --- Configuration --- | |
warnings.filterwarnings('ignore') | |
CSS = """ | |
/* --- Phoenix UI Custom CSS --- */ | |
/* Stat Card Styling */ | |
.stat-card { | |
border-radius: 12px !important; | |
padding: 20px !important; | |
background: #f7fafc; /* light gray background */ | |
border: 1px solid #e2e8f0; | |
box-shadow: 0 4px 6px -1px rgba(0, 0, 0, 0.1), 0 2px 4px -1px rgba(0, 0, 0, 0.06); | |
text-align: center; | |
} | |
.stat-card-title { font-size: 16px; font-weight: 500; color: #4a5568; margin-bottom: 8px; } | |
.stat-card-value { font-size: 32px; font-weight: 700; color: #2d3748; } | |
/* General Layout & Feel */ | |
.gradio-container { font-family: 'Inter', sans-serif; } | |
.gr-button { box-shadow: 0 1px 2px 0 rgba(0,0,0,0.05); } | |
/* Sidebar Styling */ | |
.sidebar { | |
background-color: #f9fafb; | |
padding: 15px; | |
border-right: 1px solid #e5e7eb; | |
min-height: 100vh; | |
} | |
.sidebar .gr-button { | |
width: 100%; | |
text-align: left !important; | |
background: none !important; | |
border: none !important; | |
box-shadow: none !important; | |
color: #374151 !important; | |
font-size: 16px !important; | |
padding: 12px 10px !important; | |
margin-bottom: 8px !important; | |
border-radius: 8px !important; | |
} | |
.sidebar .gr-button:hover { background-color: #e5e7eb !important; } | |
.sidebar .gr-button.selected { background-color: #d1d5db !important; font-weight: 600 !important; } | |
/* AI Co-pilot Styling */ | |
.code-block { border: 1px solid #e5e7eb; border-radius: 8px; } | |
.explanation-block { background-color: #f0f9ff; border-left: 4px solid #3b82f6; padding: 12px; } | |
""" | |
# --- Helper Functions --- | |
def safe_exec(code_string: str, local_vars: dict): | |
"""Safely execute a string of Python code and capture its output.""" | |
output_buffer = io.StringIO() | |
try: | |
with redirect_stdout(output_buffer): | |
exec(code_string, globals(), local_vars) | |
stdout = output_buffer.getvalue() | |
fig = local_vars.get('fig') | |
result_df = local_vars.get('result_df') | |
return stdout, fig, result_df, None | |
except Exception as e: | |
return None, None, None, f"Execution Error: {str(e)}" | |
# --- Core Data Processing & State Management --- | |
def load_and_process_file(file_obj, state_dict): | |
"""Loads a CSV, processes it, and updates the entire UI state.""" | |
if file_obj is None: | |
return state_dict, "Please upload a file.", *[gr.update(visible=False)] * 3 | |
try: | |
df = pd.read_csv(file_obj.name, low_memory=False) | |
for col in df.select_dtypes(include=['object']).columns: | |
try: | |
df[col] = pd.to_datetime(df[col], errors='raise') | |
except (ValueError, TypeError): | |
continue | |
metadata = extract_dataset_metadata(df) | |
state_dict = { | |
'df': df, | |
'metadata': metadata, | |
'filename': os.path.basename(file_obj.name), | |
'dashboard_plots': [] | |
} | |
status_msg = f"β **{state_dict['filename']}** loaded successfully." | |
# Update UI elements with new data context | |
cockpit_update = gr.update(visible=True) | |
deep_dive_update = gr.update(visible=False) | |
copilot_update = gr.update(visible=False) | |
welcome_update = gr.update(visible=False) | |
# Stat cards | |
rows, cols = metadata['shape'] | |
quality = metadata['data_quality'] | |
return (state_dict, status_msg, welcome_update, cockpit_update, deep_dive_update, copilot_update, | |
gr.update(value=f"{rows:,}"), gr.update(value=cols), gr.update(value=f"{quality}%"), | |
gr.update(value=f"{len(metadata['datetime_cols'])}"), | |
gr.update(choices=metadata['columns']), gr.update(choices=metadata['columns']), gr.update(choices=metadata['columns'])) | |
except Exception as e: | |
return state_dict, f"β **Error:** {e}", *[gr.update()] * 10 | |
def extract_dataset_metadata(df: pd.DataFrame): | |
rows, cols = df.shape | |
numeric_cols = df.select_dtypes(include=np.number).columns.tolist() | |
categorical_cols = df.select_dtypes(include=['object', 'category']).columns.tolist() | |
datetime_cols = df.select_dtypes(include=['datetime64', 'datetime64[ns]']).columns.tolist() | |
data_quality = round((df.notna().sum().sum() / (rows * cols)) * 100, 1) if rows * cols > 0 else 0 | |
return { | |
'shape': (rows, cols), 'columns': df.columns.tolist(), | |
'numeric_cols': numeric_cols, 'categorical_cols': categorical_cols, | |
'datetime_cols': datetime_cols, 'dtypes': df.dtypes.to_string(), | |
'head': df.head().to_string(), 'data_quality': data_quality | |
} | |
# --- Page Navigation --- | |
def switch_page(page_name): | |
"""Controls visibility of main content pages.""" | |
if page_name == "cockpit": | |
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
elif page_name == "deep_dive": | |
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
elif page_name == "co-pilot": | |
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) | |
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) | |
# --- Page 1: Data Cockpit --- | |
def get_ai_suggestions(state_dict, api_key): | |
"""Generates proactive analytical suggestions from the AI.""" | |
if not api_key: return "Enter your Gemini API key to get suggestions.", gr.update(visible=False) | |
if not state_dict: return "Upload data first.", gr.update(visible=False) | |
metadata = state_dict['metadata'] | |
prompt = f""" | |
Based on the following dataset metadata, generate 3 to 5 specific, actionable, and interesting analytical questions a user might want to ask. Frame them as questions. | |
- **Columns:** {', '.join(metadata['columns'])} | |
- **Numeric:** {', '.join(metadata['numeric_cols'])} | |
- **Categorical:** {', '.join(metadata['categorical_cols'])} | |
- **Datetime:** {', '.join(metadata['datetime_cols'])} | |
Return ONLY a JSON list of strings. Example: ["What is the trend of sales over time?", "Which category has the highest average price?"] | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
response = model.generate_content(prompt) | |
suggestions = json.loads(response.text) | |
# Create a button for each suggestion | |
buttons = [gr.Button(s, variant="secondary") for s in suggestions] | |
return gr.update(visible=False), gr.Accordion(label="β¨ AI Smart Suggestions", open=True, children=buttons) | |
except Exception as e: | |
return f"Could not generate suggestions: {e}", gr.update(visible=False) | |
# --- Page 2: Deep Dive Dashboard --- | |
def add_plot_to_dashboard(state_dict, x_col, y_col, plot_type): | |
"""Generates a plot and adds it to the state-managed dashboard.""" | |
if not x_col: return state_dict, gr.update() | |
df = state_dict['df'] | |
title = f"{plot_type.capitalize()}: {y_col} by {x_col}" if y_col else f"Distribution of {x_col}" | |
fig = None | |
try: | |
if plot_type == 'histogram': fig = px.histogram(df, x=x_col, title=title) | |
elif plot_type == 'box': fig = px.box(df, x=x_col, y=y_col, title=title) | |
elif plot_type == 'scatter': fig = px.scatter(df, x=x_col, y=y_col, title=title, trendline="ols") | |
elif plot_type == 'bar': | |
counts = df[x_col].value_counts().nlargest(20) | |
fig = px.bar(counts, x=counts.index, y=counts.values, title=f"Top 20 Categories for {x_col}") | |
fig.update_xaxes(title=x_col) | |
if fig: | |
state_dict['dashboard_plots'].append(fig) | |
# Rebuild the accordion with all plots | |
accordion_children = [gr.Plot(fig, visible=True) for fig in state_dict['dashboard_plots']] | |
return state_dict, gr.Accordion(label="Your Dashboard Plots", children=accordion_children, open=True) | |
except Exception as e: | |
gr.Warning(f"Plotting Error: {e}") | |
return state_dict, gr.update() | |
def clear_dashboard(state_dict): | |
"""Clears all plots from the dashboard.""" | |
state_dict['dashboard_plots'] = [] | |
return state_dict, gr.Accordion(label="Your Dashboard Plots", children=[]) | |
# --- Page 3: AI Co-pilot --- | |
def respond_to_chat(user_message, history, state_dict, api_key): | |
"""Handles the advanced chat interaction with the AI Co-pilot.""" | |
if not api_key: | |
history.append((user_message, "I need a Gemini API key to function. Please provide it in the sidebar.")) | |
return history, *[gr.update(visible=False)] * 4 | |
if not state_dict: | |
history.append((user_message, "Please upload a dataset first.")) | |
return history, *[gr.update(visible=False)] * 4 | |
history.append((user_message, None)) | |
metadata = state_dict['metadata'] | |
prompt = f""" | |
You are 'Phoenix Co-pilot', an expert AI data analyst. Your goal is to help a user analyze a pandas DataFrame named `df`. | |
**Instructions:** | |
1. Carefully understand the user's question. | |
2. Formulate a plan (thought process). | |
3. Write Python code to execute that plan. | |
4. The code can use pandas (pd), numpy (np), and plotly.express (px). | |
5. **For plots, assign the figure to a variable `fig` (e.g., `fig = px.histogram(...)`).** | |
6. **For table-like results, assign the final DataFrame to a variable `result_df` (e.g., `result_df = df.describe()`).** | |
7. Do not modify the original `df`. Use `df.copy()` if needed. | |
8. Provide a brief, user-friendly explanation of the result. | |
9. Respond **ONLY** with a single, raw JSON object with keys: "thought", "code", "explanation". | |
**DataFrame Metadata:** | |
- Columns and dtypes: {metadata['dtypes']} | |
- First 5 rows: {metadata['head']} | |
**User Question:** "{user_message}" | |
**Your JSON Response:** | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash') | |
response = model.generate_content(prompt) | |
response_text = response.text.strip().replace("```json", "").replace("```", "") | |
response_json = json.loads(response_text) | |
thought = response_json.get("thought", "Thinking...") | |
code_to_run = response_json.get("code", "") | |
explanation = response_json.get("explanation", "Here is the result.") | |
stdout, fig_result, df_result, error = safe_exec(code_to_run, {'df': state_dict['df'], 'px': px, 'pd': pd, 'np': np}) | |
bot_message = f"π€ **Thought:** *{thought}*" | |
history[-1] = (user_message, bot_message) | |
# Prepare outputs, making them visible only if they contain content | |
output_updates = [gr.update(visible=False, value=None)] * 4 # [explanation, code, plot, table] | |
if explanation: output_updates[0] = gr.update(visible=True, value=f"**Phoenix Co-pilot:** {explanation}") | |
if code_to_run: output_updates[1] = gr.update(visible=True, value=code_to_run) | |
if fig_result: output_updates[2] = gr.update(visible=True, value=fig_result) | |
if df_result is not None: output_updates[3] = gr.update(visible=True, value=df_result) | |
if stdout: | |
# Append stdout to explanation if it exists | |
new_explanation = (output_updates[0]['value'] if output_updates[0]['value'] else "") + f"\n\n**Console Output:**\n```\n{stdout}\n```" | |
output_updates[0] = gr.update(visible=True, value=new_explanation) | |
if error: | |
error_explanation = f"**Phoenix Co-pilot:** I encountered an error. Here's the details:\n\n`{error}`" | |
output_updates[0] = gr.update(visible=True, value=error_explanation) | |
return history, *output_updates | |
except Exception as e: | |
error_msg = f"A critical error occurred: {e}. The AI may have returned an invalid response. Please try rephrasing your question." | |
history[-1] = (user_message, error_msg) | |
return history, *[gr.update(visible=False)] * 4 | |
# --- Gradio UI Definition --- | |
def create_gradio_interface(): | |
with gr.Blocks(theme=gr.themes.Monochrome(primary_hue="indigo", secondary_hue="blue"), css=CSS, title="Phoenix AI Data Explorer") as demo: | |
global_state = gr.State({}) | |
with gr.Row(): | |
# --- Sidebar --- | |
with gr.Column(scale=1, elem_classes="sidebar"): | |
gr.Markdown("# π Phoenix UI") | |
gr.Markdown("AI Data Explorer") | |
# Navigation buttons | |
cockpit_btn = gr.Button("π Data Cockpit", elem_classes="selected") | |
deep_dive_btn = gr.Button("π Deep Dive Builder") | |
copilot_btn = gr.Button("π€ AI Co-pilot") | |
gr.Markdown("---") | |
file_input = gr.File(label="π Upload New CSV", file_types=[".csv"]) | |
status_output = gr.Markdown("Status: Awaiting data...") | |
gr.Markdown("---") | |
api_key_input = gr.Textbox(label="π Gemini API Key", type="password", placeholder="Enter key here...") | |
suggestion_btn = gr.Button("Get Smart Suggestions", variant="secondary") | |
# --- Main Content Area --- | |
with gr.Column(scale=4): | |
# Welcome Page (Visible initially) | |
with gr.Column(visible=True) as welcome_page: | |
gr.Markdown("# Welcome to the AI Data Explorer (Phoenix UI)", elem_id="welcome-header") | |
gr.Markdown("Please **upload a CSV file** and **enter your Gemini API key** in the sidebar to begin.") | |
gr.Image(value="https://i.imgur.com/gY5wSjV.png", label="Workflow", show_label=False, show_download_button=False, container=False) # Placeholder image | |
# Page 1: Data Cockpit (Hidden initially) | |
with gr.Column(visible=False) as cockpit_page: | |
gr.Markdown("## π Data Cockpit") | |
with gr.Row(): | |
with gr.Column(elem_classes="stat-card"): | |
gr.Markdown("<div class='stat-card-title'>Rows</div>", elem_classes="stat-card-content") | |
rows_stat = gr.Textbox("0", show_label=False, elem_classes="stat-card-value") | |
with gr.Column(elem_classes="stat-card"): | |
gr.Markdown("<div class='stat-card-title'>Columns</div>", elem_classes="stat-card-content") | |
cols_stat = gr.Textbox("0", show_label=False, elem_classes="stat-card-value") | |
with gr.Column(elem_classes="stat-card"): | |
gr.Markdown("<div class='stat-card-title'>Data Quality</div>", elem_classes="stat-card-content") | |
quality_stat = gr.Textbox("0%", show_label=False, elem_classes="stat-card-value") | |
with gr.Column(elem_classes="stat-card"): | |
gr.Markdown("<div class='stat-card-title'>Date/Time Cols</div>", elem_classes="stat-card-content") | |
time_cols_stat = gr.Textbox("0", show_label=False, elem_classes="stat-card-value") | |
suggestion_status = gr.Markdown(visible=True) | |
suggestion_accordion = gr.Accordion(label="β¨ AI Smart Suggestions", open=False, visible=False) | |
# Page 2: Deep Dive Dashboard Builder (Hidden initially) | |
with gr.Column(visible=False) as deep_dive_page: | |
gr.Markdown("## π Deep Dive Dashboard Builder") | |
gr.Markdown("Create a custom dashboard by adding multiple plots to investigate your data.") | |
with gr.Row(): | |
plot_type_dd = gr.Dropdown(['histogram', 'bar', 'scatter', 'box'], label="Plot Type", value='histogram') | |
x_col_dd = gr.Dropdown([], label="X-Axis / Column") | |
y_col_dd = gr.Dropdown([], label="Y-Axis (for Scatter/Box)") | |
with gr.Row(): | |
add_plot_btn = gr.Button("Add to Dashboard", variant="primary") | |
clear_plots_btn = gr.Button("Clear Dashboard") | |
dashboard_accordion = gr.Accordion(label="Your Dashboard Plots", open=True) | |
# Page 3: AI Co-pilot (Hidden initially) | |
with gr.Column(visible=False) as copilot_page: | |
gr.Markdown("## π€ AI Co-pilot") | |
gr.Markdown("Ask complex questions in natural language. The Co-pilot will write and execute code to find the answer.") | |
chatbot = gr.Chatbot(height=400, label="Conversation with Co-pilot") | |
# AI's multi-modal response area | |
with gr.Accordion("Co-pilot's Response Details", open=True): | |
copilot_explanation = gr.Markdown(visible=False, elem_classes="explanation-block") | |
copilot_code = gr.Code(language="python", visible=False, label="Executed Python Code", elem_classes="code-block") | |
copilot_plot = gr.Plot(visible=False, label="Generated Visualization") | |
copilot_table = gr.Dataframe(visible=False, label="Generated Table", wrap=True) | |
with gr.Row(): | |
chat_input = gr.Textbox(label="Your Question", placeholder="e.g., 'What is the correlation between age and salary?'", scale=4) | |
chat_submit_btn = gr.Button("Submit", variant="primary") | |
# --- Event Handlers --- | |
# Page Navigation | |
pages = [cockpit_page, deep_dive_page, copilot_page] | |
nav_buttons = [cockpit_btn, deep_dive_btn, copilot_btn] | |
for i, btn in enumerate(nav_buttons): | |
btn.click(lambda i=i: (gr.update(visible=i==0), gr.update(visible=i==1), gr.update(visible=i==2)), | |
outputs=pages).then( | |
lambda: [gr.update(elem_classes="selected" if j==i else "") for j in range(len(nav_buttons))], | |
outputs=nav_buttons) | |
# File Upload | |
file_input.upload( | |
fn=load_and_process_file, | |
inputs=[file_input, global_state], | |
outputs=[global_state, status_output, welcome_page, cockpit_page, deep_dive_page, copilot_page, | |
rows_stat, cols_stat, quality_stat, time_cols_stat, | |
x_col_dd, y_col_dd, copilot_input_col_ref_for_dynamic_update] # This last one is a dummy to trigger updates | |
).then( | |
fn=lambda: (gr.update(elem_classes="selected"), gr.update(elem_classes=""), gr.update(elem_classes="")), | |
outputs=nav_buttons | |
) | |
# Suggestions Button | |
suggestion_btn.click( | |
get_ai_suggestions, | |
[global_state, api_key_input], | |
[suggestion_status, suggestion_accordion] | |
).then( | |
fn=lambda: [gr.Button.update(visible=True) for _ in range(5)], # Assumes max 5 suggestions for demo | |
outputs=[b for b in suggestion_accordion.children] if isinstance(suggestion_accordion, gr.Accordion) and suggestion_accordion.children else [] | |
) | |
# Handle suggestion button clicks to populate chat | |
if isinstance(suggestion_accordion, gr.Accordion): | |
for button in suggestion_accordion.children: | |
button.click( | |
fn=lambda q=button.value: (gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), q), | |
outputs=[cockpit_page, deep_dive_page, copilot_page, chat_input] | |
).then( | |
fn=lambda: (gr.update(elem_classes=""), gr.update(elem_classes=""), gr.update(elem_classes="selected")), | |
outputs=nav_buttons | |
) | |
# Dashboard Builder | |
add_plot_btn.click(add_plot_to_dashboard, [global_state, x_col_dd, y_col_dd, plot_type_dd], [global_state, dashboard_accordion]) | |
clear_plots_btn.click(clear_dashboard, [global_state], [global_state, dashboard_accordion]) | |
# AI Co-pilot Chat | |
copilot_input_col_ref_for_dynamic_update = x_col_dd # Dummy placeholder for dynamic updates | |
chat_submit_btn.click( | |
respond_to_chat, | |
[chat_input, chatbot, global_state, api_key_input], | |
[chatbot, copilot_explanation, copilot_code, copilot_plot, copilot_table] | |
).then(lambda: "", outputs=[chat_input]) | |
chat_input.submit( | |
respond_to_chat, | |
[chat_input, chatbot, global_state, api_key_input], | |
[chatbot, copilot_explanation, copilot_code, copilot_plot, copilot_table] | |
).then(lambda: "", outputs=[chat_input]) | |
return demo | |
if __name__ == "__main__": | |
app = create_gradio_interface() | |
app.launch(debug=True) |