Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
import io | |
import json | |
import warnings | |
import google.generativeai as genai | |
import os | |
from typing import List, Dict, Any, Tuple, Optional | |
# --- Configuration & Constants --- | |
warnings.filterwarnings('ignore') | |
CSS = """ | |
/* --- Phoenix UI Professional Dark CSS --- */ | |
body { --body-background-fill: #111827; } | |
.stat-card { border-radius: 12px !important; padding: 20px !important; background: #1f2937 !important; border: 1px solid #374151 !important; text-align: center; transition: all 0.3s ease; } | |
.stat-card:hover { transform: translateY(-5px); box-shadow: 0 10px 15px -3px rgba(0,0,0,0.1), 0 4px 6px -2px rgba(0,0,0,0.05); } | |
.stat-card-title { font-size: 16px; font-weight: 500; color: #9ca3af !important; margin-bottom: 8px; } | |
.stat-card-value { font-size: 32px; font-weight: 700; color: #f9fafb !important; } | |
.sidebar { background-color: #111827 !important; padding: 15px; border-right: 1px solid #374151 !important; min-height: 100vh; } | |
.sidebar .gr-button { width: 100%; text-align: left !important; background: none !important; border: none !important; box-shadow: none !important; color: #d1d5db !important; font-size: 16px !important; padding: 12px 10px !important; margin-bottom: 8px !important; border-radius: 8px !important; transition: background-color 0.2s ease; } | |
.sidebar .gr-button:hover { background-color: #374151 !important; } | |
.sidebar .gr-button.selected { background-color: #4f46e5 !important; font-weight: 600 !important; color: white !important; } | |
.explanation-block { background-color: #1e3a8a !important; border-left: 4px solid #3b82f6 !important; padding: 12px; color: #e5e7eb !important; border-radius: 4px; } | |
""" | |
class DataExplorerApp: | |
"""A professional-grade, AI-powered data exploration application.""" | |
def __init__(self): | |
"""Initializes the application state and builds the UI.""" | |
self.state: Dict[str, Any] = {} | |
self.demo = self._create_layout() | |
self._register_event_handlers() | |
def _create_layout(self) -> gr.Blocks: | |
"""Defines all UI components and arranges them in the layout.""" | |
with gr.Blocks(theme=gr.themes.Glass(primary_hue="indigo", secondary_hue="blue"), css=CSS, title="Professional AI Data Explorer") as demo: | |
# --- State Management --- | |
self.state_var = gr.State({}) | |
# --- Component Definition --- | |
# Sidebar | |
self.cockpit_btn = gr.Button("π Data Cockpit", elem_classes="selected", elem_id="cockpit") | |
self.deep_dive_btn = gr.Button("π Deep Dive Builder", elem_id="deep_dive") | |
self.copilot_btn = gr.Button("π€ Chief Data Scientist", elem_id="co-pilot") | |
self.file_input = gr.File(label="π Upload CSV File", file_types=[".csv"]) | |
self.status_output = gr.Markdown("Status: Awaiting data...") | |
self.api_key_input = gr.Textbox(label="π Gemini API Key", type="password", placeholder="Enter key to enable AI...") | |
self.suggestion_btn = gr.Button("Get Smart Suggestions", variant="secondary", interactive=False) | |
# Cockpit | |
self.rows_stat = gr.Textbox("0", interactive=False, elem_classes="stat-card-value") | |
self.cols_stat = gr.Textbox("0", interactive=False, elem_classes="stat-card-value") | |
self.quality_stat = gr.Textbox("0%", interactive=False, elem_classes="stat-card-value") | |
self.time_cols_stat = gr.Textbox("0", interactive=False, elem_classes="stat-card-value") | |
self.suggestion_buttons = [gr.Button(visible=False) for _ in range(5)] | |
# Deep Dive | |
self.plot_type_dd = gr.Dropdown(['histogram', 'bar', 'scatter', 'box'], label="Plot Type", value='histogram') | |
self.x_col_dd = gr.Dropdown([], label="X-Axis / Column", interactive=False) | |
self.y_col_dd = gr.Dropdown([], label="Y-Axis (for Scatter/Box)", visible=False, interactive=False) | |
self.add_plot_btn = gr.Button("Add to Dashboard", variant="primary", interactive=False) | |
self.clear_plots_btn = gr.Button("Clear Dashboard") | |
self.dashboard_gallery = gr.Gallery(label="π Your Custom Dashboard", height="auto", columns=2, preview=True) | |
# Co-pilot | |
self.chatbot = gr.Chatbot(height=500, label="Conversation", show_copy_button=True) | |
self.copilot_explanation = gr.Markdown(visible=False, elem_classes="explanation-block") | |
self.copilot_code = gr.Code(language="python", visible=False, label="Executed Code") | |
self.copilot_plot = gr.Plot(visible=False, label="Generated Visualization") | |
self.copilot_table = gr.Dataframe(visible=False, label="Generated Table", wrap=True) | |
self.chat_input = gr.Textbox(label="Your Question", placeholder="e.g., 'What is the relationship between age and salary?'", scale=4) | |
self.chat_submit_btn = gr.Button("Ask AI", variant="primary") | |
# --- Layout Arrangement --- | |
with gr.Row(): | |
with gr.Column(scale=1, elem_classes="sidebar"): | |
gr.Markdown("## π AI Explorer Pro") | |
self.cockpit_btn; self.deep_dive_btn; self.copilot_btn; gr.Markdown("---") | |
self.file_input; self.status_output; gr.Markdown("---"); self.api_key_input; self.suggestion_btn | |
with gr.Column(scale=4): | |
self.welcome_page = gr.Column(visible=True) | |
with self.welcome_page: | |
gr.Markdown("# Welcome to the AI Data Explorer Pro\n> Please **upload a CSV file** and **enter your Gemini API key** to begin your analysis.") | |
self.cockpit_page = gr.Column(visible=False) | |
with self.cockpit_page: | |
gr.Markdown("## π Data Cockpit: At-a-Glance Overview") | |
with gr.Row(): | |
with gr.Column(elem_classes="stat-card"): gr.Markdown("<div class='stat-card-title'>Rows</div>"); self.rows_stat | |
with gr.Column(elem_classes="stat-card"): gr.Markdown("<div class='stat-card-title'>Columns</div>"); self.cols_stat | |
with gr.Column(elem_classes="stat-card"): gr.Markdown("<div class='stat-card-title'>Data Quality</div>"); self.quality_stat | |
with gr.Column(elem_classes="stat-card"): gr.Markdown("<div class='stat-card-title'>Date/Time Cols</div>"); self.time_cols_stat | |
with gr.Accordion(label="β¨ AI Smart Suggestions", open=True): [btn for btn in self.suggestion_buttons] | |
self.deep_dive_page = gr.Column(visible=False) | |
with self.deep_dive_page: | |
gr.Markdown("## π Deep Dive: Manual Dashboard Builder"); gr.Markdown("Construct your own visualizations to investigate specific relationships.") | |
with gr.Row(): self.plot_type_dd; self.x_col_dd; self.y_col_dd | |
with gr.Row(): self.add_plot_btn; self.clear_plots_btn | |
self.dashboard_gallery | |
self.copilot_page = gr.Column(visible=False) | |
with self.copilot_page: | |
gr.Markdown("## π€ Chief Data Scientist: Your AI Partner"); self.chatbot | |
with gr.Accordion("AI's Detailed Response", open=True): self.copilot_explanation; self.copilot_code; self.copilot_plot; self.copilot_table | |
with gr.Row(): self.chat_input; self.chat_submit_btn | |
return demo | |
def _register_event_handlers(self): | |
"""Connects UI components to their backend logic functions.""" | |
# Navigation | |
nav_buttons = [self.cockpit_btn, self.deep_dive_btn, self.copilot_btn] | |
pages = [self.cockpit_page, self.deep_dive_page, self.copilot_page] | |
for i, btn in enumerate(nav_buttons): | |
btn.click( | |
lambda id=btn.elem_id: self._switch_page(id), outputs=pages | |
).then( | |
lambda i=i: [gr.update(elem_classes="selected" if j==i else "") for j in range(len(nav_buttons))], outputs=nav_buttons | |
) | |
# File Upload | |
self.file_input.upload(self.load_and_process_file, inputs=[self.file_input], outputs=[ | |
self.state_var, self.status_output, self.welcome_page, self.cockpit_page, | |
self.rows_stat, self.cols_stat, self.quality_stat, self.time_cols_stat, | |
self.x_col_dd, self.y_col_dd, self.add_plot_btn | |
]).then(lambda: self._switch_page("cockpit"), outputs=pages) \ | |
.then(lambda: [gr.update(elem_classes="selected"), gr.update(elem_classes=""), gr.update(elem_classes="")], outputs=nav_buttons) | |
# API Key Input | |
self.api_key_input.change(lambda x: gr.update(interactive=bool(x)), inputs=[self.api_key_input], outputs=[self.suggestion_btn]) | |
# Deep Dive Page Logic | |
self.plot_type_dd.change(self._update_plot_controls, inputs=[self.plot_type_dd], outputs=[self.y_col_dd]) | |
self.add_plot_btn.click(self.add_plot_to_dashboard, inputs=[self.state_var, self.x_col_dd, self.y_col_dd, self.plot_type_dd], outputs=[self.state_var, self.dashboard_gallery]) | |
self.clear_plots_btn.click(self.clear_dashboard, inputs=[self.state_var], outputs=[self.state_var, self.dashboard_gallery]) | |
# Co-pilot & Suggestions | |
self.suggestion_btn.click(self.get_ai_suggestions, inputs=[self.state_var, self.api_key_input], outputs=self.suggestion_buttons) | |
for btn in self.suggestion_buttons: | |
btn.click(self.handle_suggestion_click, inputs=[btn], outputs=[self.cockpit_page, self.deep_dive_page, self.copilot_page, self.chat_input]) \ | |
.then(lambda: self._switch_page("co-pilot"), outputs=pages) \ | |
.then(lambda: (gr.update(elem_classes=""), gr.update(elem_classes=""), gr.update(elem_classes="selected")), outputs=nav_buttons) | |
self.chat_submit_btn.click(self.respond_to_chat, [self.state_var, self.api_key_input, self.chat_input, self.chatbot], [self.chatbot, self.copilot_explanation, self.copilot_code, self.copilot_plot, self.copilot_table]).then(lambda: "", outputs=[self.chat_input]) | |
self.chat_input.submit(self.respond_to_chat, [self.state_var, self.api_key_input, self.chat_input, self.chatbot], [self.chatbot, self.copilot_explanation, self.copilot_code, self.copilot_plot, self.copilot_table]).then(lambda: "", outputs=[self.chat_input]) | |
def launch(self): | |
"""Launches the Gradio application.""" | |
self.demo.launch(debug=True) | |
# --- Backend Logic Methods --- | |
def _switch_page(self, page_id: str) -> Tuple[gr.update, ...]: | |
return gr.update(visible=page_id=="cockpit"), gr.update(visible=page_id=="deep_dive"), gr.update(visible=page_id=="co-pilot") | |
def _update_plot_controls(self, plot_type: str) -> gr.update: | |
is_bivariate = plot_type in ['scatter', 'box'] | |
return gr.update(visible=is_bivariate) | |
def load_and_process_file(self, file_obj: Any) -> Tuple[Any, ...]: | |
try: | |
df = pd.read_csv(file_obj.name, low_memory=False) | |
for col in df.select_dtypes(include=['object']).columns: | |
try: df[col] = pd.to_datetime(df[col], errors='raise') | |
except (ValueError, TypeError): continue | |
metadata = self._extract_dataset_metadata(df) | |
state = {'df': df, 'metadata': metadata, 'dashboard_plots': []} | |
status_msg = f"β **{os.path.basename(file_obj.name)}** loaded." | |
rows, cols, quality = metadata['shape'][0], metadata['shape'][1], metadata['data_quality'] | |
return (state, status_msg, gr.update(visible=False), gr.update(visible=True), | |
f"{rows:,}", f"{cols}", f"{quality}%", f"{len(metadata['datetime_cols'])}", | |
gr.update(choices=metadata['columns'], interactive=True), gr.update(choices=metadata['columns'], interactive=True), gr.update(interactive=True)) | |
except Exception as e: | |
gr.Error(f"File Load Error: {e}") | |
return {}, f"β Error: {e}", gr.update(visible=True), gr.update(visible=False), "0", "0", "0%", "0", gr.update(choices=[], interactive=False), gr.update(choices=[], interactive=False), gr.update(interactive=False) | |
def _extract_dataset_metadata(self, df: pd.DataFrame) -> Dict[str, Any]: | |
rows, cols = df.shape | |
quality = round((df.notna().sum().sum() / (rows * cols)) * 100, 1) if rows * cols > 0 else 0 | |
return {'shape': (rows, cols), 'columns': df.columns.tolist(), | |
'numeric_cols': df.select_dtypes(include=np.number).columns.tolist(), | |
'categorical_cols': df.select_dtypes(include=['object', 'category']).columns.tolist(), | |
'datetime_cols': df.select_dtypes(include=['datetime64', 'datetime64[ns]']).columns.tolist(), | |
'dtypes_head': df.head().to_string()} | |
def add_plot_to_dashboard(self, state: Dict, x_col: str, y_col: str, plot_type: str) -> Tuple[Dict, List]: | |
if not x_col: | |
gr.Warning("Please select at least an X-axis column.") | |
return state, state.get('dashboard_plots', []) | |
df = state['df'] | |
title = f"{plot_type.capitalize()}: {y_col} by {x_col}" if y_col else f"Distribution of {x_col}" | |
try: | |
if plot_type == 'histogram': fig = px.histogram(df, x=x_col, title=title) | |
elif plot_type == 'box': fig = px.box(df, x=x_col, y=y_col, title=title) | |
elif plot_type == 'scatter': fig = px.scatter(df, x=x_col, y=y_col, title=title, trendline="ols", trendline_color_override="red") | |
elif plot_type == 'bar': | |
counts = df[x_col].value_counts().nlargest(20) | |
fig = px.bar(counts, x=counts.index, y=counts.values, title=f"Top 20 Categories for {x_col}", labels={'index': x_col, 'y': 'Count'}) | |
if fig: | |
fig.update_layout(template="plotly_dark") | |
state['dashboard_plots'].append(fig) | |
gr.Info(f"Added '{title}' to the dashboard.") | |
return state, state['dashboard_plots'] | |
except Exception as e: | |
gr.Error(f"Plotting Error: {e}"); return state, state.get('dashboard_plots', []) | |
def clear_dashboard(self, state: Dict) -> Tuple[Dict, List]: | |
state['dashboard_plots'] = [] | |
gr.Info("Dashboard cleared.") | |
return state, [] | |
def get_ai_suggestions(self, state: Dict, api_key: str) -> List[gr.update]: | |
if not api_key: gr.Warning("API Key is required for suggestions."); return [gr.update(visible=False)]*5 | |
if not state: gr.Warning("Please load data first."); return [gr.update(visible=False)]*5 | |
metadata = state['metadata'] | |
prompt = f"""Based on this metadata (columns: {metadata['columns']}), generate 4 impactful analytical questions. Return ONLY a JSON list of strings.""" | |
try: | |
genai.configure(api_key=api_key) | |
suggestions = json.loads(genai.GenerativeModel('gemini-1.5-flash').generate_content(prompt).text) | |
return [gr.Button(s, visible=True) for s in suggestions] + [gr.Button(visible=False)] * (5 - len(suggestions)) | |
except Exception as e: gr.Error(f"AI Suggestion Error: {e}"); return [gr.update(visible=False)]*5 | |
def handle_suggestion_click(self, question: str) -> Tuple[gr.update, ...]: | |
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), question | |
def respond_to_chat(self, state: Dict, api_key: str, user_message: str, history: List) -> Tuple[List, ...]: | |
if not api_key or not state: | |
msg = "I need a Gemini API key and a dataset to work." | |
history.append((user_message, msg)); return history, *[gr.update(visible=False)]*4 | |
history.append((user_message, "Thinking... π€")); yield history, *[gr.update(visible=False)]*4 | |
metadata = state['metadata'] | |
prompt = f"""You are 'Chief Data Scientist', an expert AI analyst. Your goal is to answer a user's question about a pandas DataFrame (`df`) by writing and executing Python code. | |
**Instructions:** | |
1. **Analyze the Request:** Understand the user's intent, even if it's vague. | |
2. **Choose the Best Method:** Decide if the answer is a table (e.g., `df.describe()`), a single value, or a visualization. If a plot is needed, choose the BEST plot type (e.g., 'histogram' for distribution, 'scatter' for two numerics, 'bar' for categorical counts). | |
3. **Formulate a Plan:** Briefly explain your plan of attack. | |
4. **Write Code:** Generate the Python code. Use pandas (`pd`), numpy (`np`), and plotly express (`px`). | |
- For plots, assign to `fig` and add `template='plotly_dark'`. | |
- For tables, assign the final DataFrame to `result_df`. | |
5. **Provide Insights:** After the result, give a one or two-sentence INSIGHT. What does the result mean? What is the business implication? | |
6. **Respond ONLY with a single JSON object with keys: "plan", "code", "insight".** | |
**DataFrame Metadata:** {metadata['dtypes_head']} | |
**User Question:** "{user_message}" | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
response_json = json.loads(genai.GenerativeModel('gemini-1.5-flash').generate_content(prompt).text.strip().replace("```json", "").replace("```", "")) | |
plan, code_to_run, insight = response_json.get("plan"), response_json.get("code"), response_json.get("insight") | |
stdout, fig_result, df_result, error = self._safe_exec(code_to_run, {'df': state['df'], 'px': px, 'pd': pd, 'np': np}) | |
history[-1] = (user_message, f"**Plan:** {plan}") | |
explanation = f"**Insight:** {insight}" | |
if stdout: explanation += f"\n\n**Console Output:**\n```\n{stdout}\n```" | |
if error: gr.Error(f"AI Code Execution Failed: {error}") | |
yield (history, gr.update(visible=bool(explanation)), gr.update(visible=bool(code_to_run), value=code_to_run), | |
gr.update(visible=bool(fig_result), value=fig_result), gr.update(visible=bool(df_result is not None), value=df_result)) | |
except Exception as e: | |
history[-1] = (user_message, f"I'm sorry, I encountered an error. Please try rephrasing your question. (Error: {e})") | |
yield history, *[gr.update(visible=False)]*4 | |
def _safe_exec(self, code_string: str, local_vars: Dict) -> Tuple[Any, ...]: | |
output_buffer = io.StringIO() | |
try: | |
with redirect_stdout(output_buffer): exec(code_string, globals(), local_vars) | |
return output_buffer.getvalue(), local_vars.get('fig'), local_vars.get('result_df'), None | |
except Exception as e: return None, None, None, f"Execution Error: {str(e)}" | |
if __name__ == "__main__": | |
app = DataExplorerApp() | |
app.launch() |