Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
""" | |
🚀 AutoEDA: AI-Powered Exploratory Data Analysis Tool | |
An advanced Gradio application for automated exploratory data analysis, | |
data profiling, and AI-driven insights using Google's Gemini API. | |
Key Features: | |
- Unified Analysis Workflow: Upload a CSV and get a full report across all tabs. | |
- AI-Powered Storytelling: Generates a narrative overview, use cases, and findings. | |
- Actionable AI Suggestions: Provides data cleaning recommendations. | |
- Interactive Visualizations: Users can select columns to generate plots dynamically. | |
- In-depth Profiling: Detailed statistics for numeric and categorical data. | |
- Column-Level Drilldown: Inspect individual features in detail. | |
- Report Download: Export the AI-generated analysis as a Markdown file. | |
Author: World-Class MCP Expert | |
Version: 2.0 | |
""" | |
from __future__ import annotations | |
import warnings | |
import logging | |
import os | |
import pandas as pd | |
import numpy as np | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
import gradio as gr | |
import google.generativeai as genai | |
from typing import Optional, Dict, Any, Tuple, List | |
from datetime import datetime | |
# --- Configuration & Setup --- | |
warnings.filterwarnings('ignore') | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Core Analysis Logic (The "Engine") --- | |
class DataAnalyzer: | |
""" | |
A comprehensive class to encapsulate all data analysis operations. | |
It holds the dataframe and provides methods for profiling, visualization, | |
and AI-powered analysis, ensuring data is processed only once. | |
""" | |
def __init__(self, df: pd.DataFrame): | |
if not isinstance(df, pd.DataFrame): | |
raise TypeError("Input must be a pandas DataFrame.") | |
self.df = df | |
self._metadata: Optional[Dict[str, Any]] = None | |
logging.info(f"DataAnalyzer initialized with DataFrame of shape: {self.df.shape}") | |
def metadata(self) -> Dict[str, Any]: | |
"""Lazy-loads and caches dataset metadata.""" | |
if self._metadata is None: | |
self._metadata = self._extract_metadata() | |
return self._metadata | |
def _extract_metadata(self) -> Dict[str, Any]: | |
"""Extracts comprehensive metadata from the DataFrame.""" | |
logging.info("Extracting dataset metadata...") | |
rows, cols = self.df.shape | |
numeric_cols = self.df.select_dtypes(include=np.number).columns.tolist() | |
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns.tolist() | |
datetime_cols = self.df.select_dtypes(include=['datetime64']).columns.tolist() | |
# High correlation pairs | |
high_corr_pairs = [] | |
if len(numeric_cols) > 1: | |
corr_matrix = self.df[numeric_cols].corr().abs() | |
upper_tri = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) | |
high_corr_pairs = ( | |
upper_tri.stack() | |
.reset_index() | |
.rename(columns={'level_0': 'Var 1', 'level_1': 'Var 2', 0: 'Correlation'}) | |
.query('Correlation > 0.7') | |
.sort_values('Correlation', ascending=False) | |
.head(5) | |
.to_dict('records') | |
) | |
return { | |
'shape': (rows, cols), | |
'columns': self.df.columns.tolist(), | |
'numeric_cols': numeric_cols, | |
'categorical_cols': categorical_cols, | |
'datetime_cols': datetime_cols, | |
'memory_usage': f"{self.df.memory_usage(deep=True).sum() / 1e6:.2f} MB", | |
'total_missing': int(self.df.isnull().sum().sum()), | |
'data_quality_score': round((self.df.notna().sum().sum() / self.df.size) * 100, 1), | |
'high_corr_pairs': high_corr_pairs, | |
} | |
def get_profiling_report(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
"""Generates detailed data profiling tables.""" | |
logging.info("Generating data profiling report.") | |
# Missing data | |
missing = self.df.isnull().sum() | |
missing_df = pd.DataFrame({ | |
'Missing Values': missing, | |
'Percentage (%)': (missing / len(self.df) * 100).round(2) | |
}).reset_index().rename(columns={'index': 'Column'}).sort_values('Missing Values', ascending=False) | |
# Numeric stats | |
numeric_stats_df = self.df[self.metadata['numeric_cols']].describe().round(3).T.reset_index().rename(columns={'index': 'Column'}) | |
# Categorical stats | |
cat_stats_list = [] | |
for col in self.metadata['categorical_cols']: | |
stats = { | |
'Column': col, | |
'Unique Values': self.df[col].nunique(), | |
'Top Value': self.df[col].mode().iloc[0] if not self.df[col].mode().empty else 'N/A', | |
'Frequency': self.df[col].value_counts().iloc[0] if not self.df[col].value_counts().empty else 0 | |
} | |
cat_stats_list.append(stats) | |
categorical_stats_df = pd.DataFrame(cat_stats_list) | |
return missing_df, numeric_stats_df, categorical_stats_df | |
def get_initial_visuals(self) -> Tuple[go.Figure, go.Figure, go.Figure]: | |
"""Creates a set of standard, non-interactive overview plots.""" | |
logging.info("Generating initial overview visualizations.") | |
# Data type distribution | |
dtype_counts = self.df.dtypes.astype(str).value_counts() | |
dtype_fig = px.pie( | |
values=dtype_counts.values, names=dtype_counts.index, | |
title="📊 Data Type Distribution", hole=0.3 | |
) | |
dtype_fig.update_traces(textposition='inside', textinfo='percent+label') | |
# Missing data overview | |
missing_fig = px.bar( | |
x=self.df.isnull().sum(), y=self.df.columns, | |
orientation='h', title="🕳️ Missing Values Overview", | |
labels={'x': 'Number of Missing Values', 'y': 'Column'}, | |
).update_yaxes(categoryorder="total ascending") | |
# Correlation heatmap | |
corr_fig = go.Figure() | |
if len(self.metadata['numeric_cols']) > 1: | |
corr_matrix = self.df[self.metadata['numeric_cols']].corr() | |
corr_fig = px.imshow( | |
corr_matrix, text_auto=".2f", aspect="auto", | |
title="🔗 Correlation Matrix (Numeric Features)", | |
color_continuous_scale='RdBu_r' | |
) | |
else: | |
corr_fig.update_layout(title="🔗 Correlation Matrix (Not enough numeric columns)") | |
return dtype_fig, missing_fig, corr_fig | |
def generate_ai_report(self, api_key: str) -> str: | |
"""Generates a full data story and analysis using the Gemini API.""" | |
logging.info("Generating AI report with Gemini.") | |
prompt = f""" | |
As an expert data analyst and storyteller, your task is to analyze the provided dataset summary and generate a comprehensive, insightful, and accessible report. | |
**Dataset Metadata:** | |
- **Shape:** {self.metadata['shape'][0]} rows, {self.metadata['shape'][1]} columns. | |
- **Column Names:** {', '.join(self.metadata['columns'])} | |
- **Numeric Columns:** {', '.join(self.metadata['numeric_cols'])} | |
- **Categorical Columns:** {', '.join(self.metadata['categorical_cols'])} | |
- **Overall Data Quality:** {self.metadata['data_quality_score']}% | |
- **Total Missing Values:** {self.metadata['total_missing']:,} | |
- **Highly Correlated Pairs (>0.7):** {self.metadata['high_corr_pairs'] if self.metadata['high_corr_pairs'] else 'None detected.'} | |
- **Sample Data (First 3 Rows):** | |
{self.df.head(3).to_markdown()} | |
**Your Report Structure (Use Markdown):** | |
# 🚀 AI-Powered Data Analysis Report | |
## 📖 1. The Story of the Data | |
* **What is this dataset about?** (Deduce the purpose and subject matter of the data.) | |
* **What domain or industry does it belong to?** (e.g., E-commerce, Finance, Healthcare.) | |
* **Who might use this data?** (e.g., Marketers, Scientists, Financial Analysts.) | |
## 🎯 2. Key Insights & Interesting Findings | |
- **Finding 1:** (Describe a significant pattern, trend, or anomaly. Use emojis to highlight.) | |
- **Finding 2:** (Mention another interesting discovery, perhaps from correlations or categorical data.) | |
- **Finding 3:** (Highlight a potential business or research opportunity revealed by the data.) | |
## 🧹 3. Data Quality & Cleaning Recommendations | |
* **Overall Quality Assessment:** (Comment on the {self.metadata['data_quality_score']}% score and {self.metadata['total_missing']} missing values.) | |
* **Actionable Steps:** | |
- **Recommendation 1:** (e.g., "For column 'X' with Y% missing values, consider imputation using the mean/median/mode.") | |
- **Recommendation 2:** (e.g., "Columns 'A' and 'B' are highly correlated ({'e.g., ' + str(self.metadata['high_corr_pairs'][0]) if self.metadata['high_corr_pairs'] else ''}). Consider dropping one for modeling to avoid multicollinearity.") | |
- **Recommendation 3:** (e.g., "Column 'Z' is categorical but stored as a number. Recommend converting it to a category type.") | |
## 🔮 4. Potential Next Steps & Use Cases | |
- **Analysis Idea 1:** (e.g., "Build a predictive model for customer churn.") | |
- **Dashboard Idea 2:** (e.g., "Create a sales performance dashboard tracking KPIs over time.") | |
- **Research Question 3:** (e.g., "Investigate the factors influencing employee attrition.") | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-flash-latest') | |
response = model.generate_content(prompt) | |
return response.text | |
except Exception as e: | |
logging.error(f"Gemini API call failed: {e}") | |
return f"❌ **Error generating AI report.**\n**Reason:** {str(e)}\n\nPlease check your API key and network connection. A fallback analysis could not be generated." | |
# --- Gradio UI & Event Handlers --- | |
def process_uploaded_file(file_obj: gr.File, api_key: str) -> tuple: | |
""" | |
Main function to process the uploaded file. It runs all analyses | |
and returns updates for all UI components in one go. | |
""" | |
if file_obj is None: | |
raise gr.Error("📁 Please upload a CSV file first!") | |
if not api_key: | |
raise gr.Error("🔑 Please enter your Gemini API key!") | |
try: | |
df = pd.read_csv(file_obj.name) | |
analyzer = DataAnalyzer(df) | |
# Perform all analyses | |
ai_report = analyzer.generate_ai_report(api_key) | |
missing_df, num_stats, cat_stats = analyzer.get_profiling_report() | |
dtype_fig, missing_fig, corr_fig = analyzer.get_initial_visuals() | |
# Prepare UI updates | |
all_cols = analyzer.metadata['columns'] | |
num_cols = analyzer.metadata['numeric_cols'] | |
cat_cols = analyzer.metadata['categorical_cols'] | |
# The return dictionary maps UI components to their new values/configurations | |
return { | |
state_analyzer: analyzer, | |
# Overview Tab | |
md_ai_report: ai_report, | |
btn_download_report: gr.Button(visible=True), | |
# Profiling Tab | |
df_missing_data: missing_df, | |
df_numeric_stats: num_stats, | |
df_categorical_stats: cat_stats, | |
# Visuals Tab | |
plot_dtype: dtype_fig, | |
plot_missing: missing_fig, | |
plot_corr: corr_fig, | |
# Interactive Visuals Tab | |
dd_hist_col: gr.Dropdown(choices=num_cols, label="Select Numeric Column for Histogram", visible=True), | |
dd_scatter_x: gr.Dropdown(choices=num_cols, label="Select X-axis (Numeric)", visible=True), | |
dd_scatter_y: gr.Dropdown(choices=num_cols, label="Select Y-axis (Numeric)", visible=True), | |
dd_scatter_color: gr.Dropdown(choices=all_cols, label="Select Color (Categorical/Numeric)", visible=True), | |
dd_box_cat: gr.Dropdown(choices=cat_cols, label="Select Categorical Column for Box Plot", visible=True), | |
dd_box_num: gr.Dropdown(choices=num_cols, label="Select Numeric Column for Box Plot", visible=True), | |
# Column Drilldown Tab | |
dd_drilldown_col: gr.Dropdown(choices=all_cols, label="Select Column to Analyze", visible=True), | |
} | |
except Exception as e: | |
logging.error(f"An error occurred during file processing: {e}", exc_info=True) | |
raise gr.Error(f"Processing failed! Error: {str(e)}") | |
# --- Interactive Plotting Functions --- | |
def create_histogram(analyzer: DataAnalyzer, col: str) -> go.Figure: | |
if not col: return go.Figure() | |
return px.histogram(analyzer.df, x=col, title=f"Distribution of {col}", marginal="box") | |
def create_scatterplot(analyzer: DataAnalyzer, x_col: str, y_col: str, color_col: str) -> go.Figure: | |
if not x_col or not y_col: return go.Figure() | |
return px.scatter(analyzer.df, x=x_col, y=y_col, color=color_col, | |
title=f"Scatter Plot: {x_col} vs. {y_col}") | |
def create_boxplot(analyzer: DataAnalyzer, cat_col: str, num_col: str) -> go.Figure: | |
if not cat_col or not num_col: return go.Figure() | |
return px.box(analyzer.df, x=cat_col, y=num_col, title=f"Box Plot: {num_col} by {cat_col}") | |
def analyze_single_column(analyzer: DataAnalyzer, col: str) -> Tuple[str, go.Figure]: | |
if not col: return "", go.Figure() | |
col_series = analyzer.df[col] | |
# Generate stats markdown | |
stats_md = f"### 🔎 Analysis of Column: `{col}`\n" | |
stats_md += f"- **Data Type:** `{col_series.dtype}`\n" | |
stats_md += f"- **Missing Values:** {col_series.isnull().sum()} ({col_series.isnull().mean():.2%})\n" | |
stats_md += f"- **Unique Values:** {col_series.nunique()}\n" | |
# Generate plot based on type | |
fig = go.Figure() | |
if pd.api.types.is_numeric_dtype(col_series): | |
stats_md += f"- **Mean:** {col_series.mean():.2f}\n" | |
stats_md += f"- **Median:** {col_series.median():.2f}\n" | |
stats_md += f"- **Std Dev:** {col_series.std():.2f}\n" | |
fig = create_histogram(analyzer, col) | |
elif pd.api.types.is_categorical_dtype(col_series) or pd.api.types.is_object_dtype(col_series): | |
top5 = col_series.value_counts().head(5) | |
stats_md += f"- **Top 5 Values:**\n" | |
for val, count in top5.items(): | |
stats_md += f" - `{val}`: {count} times\n" | |
fig = px.bar(top5, x=top5.index, y=top5.values, title=f"Top 5 Value Counts for {col}") | |
fig.update_xaxes(title=col) | |
fig.update_yaxes(title="Count") | |
return stats_md, fig | |
def download_report(analyzer: DataAnalyzer, ai_report_text: str) -> str: | |
"""Saves the AI report and basic stats to a markdown file for download.""" | |
if not analyzer: return None | |
filename = f"AI_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" | |
# Create the full report content | |
full_report = f"# AutoEDA Analysis Report\n\n" | |
full_report += f"**Date Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" | |
full_report += f"**Dataset Shape:** {analyzer.metadata['shape'][0]} rows x {analyzer.metadata['shape'][1]} columns\n\n" | |
full_report += "---\n\n" | |
full_report += ai_report_text | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(full_report) | |
logging.info(f"Generated download report: {filename}") | |
return filename | |
# --- Gradio Interface Definition --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"), title="🚀 AutoEDA Pro") as demo: | |
# State object to hold the DataAnalyzer instance | |
state_analyzer = gr.State() | |
gr.Markdown("# 🚀 AutoEDA Pro: Your AI Data Science Assistant") | |
gr.Markdown("Upload a CSV, enter your Gemini API key, and click 'Analyze!' to unlock a comprehensive, AI-powered report on your data.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file_input = gr.File(label="📁 Upload your CSV File", file_types=[".csv"]) | |
with gr.Column(scale=2): | |
api_key_input = gr.Textbox(label="🔑 Google Gemini API Key", type="password", placeholder="Enter your key here...") | |
with gr.Column(scale=1, min_width=150): | |
analyze_btn = gr.Button("✨ Analyze!", variant="primary", scale=1) | |
with gr.Tabs(): | |
with gr.Tab("🤖 AI Report & Overview"): | |
md_ai_report = gr.Markdown("Your AI-generated report will appear here...") | |
btn_download_report = gr.Button("⬇️ Download Full Report", visible=False) | |
with gr.Tab("📊 Data Profiling"): | |
gr.Markdown("### Detailed Data Profile") | |
gr.Markdown("**Missing Data Analysis**") | |
df_missing_data = gr.DataFrame(interactive=False) | |
gr.Markdown("**Numeric Feature Statistics**") | |
df_numeric_stats = gr.DataFrame(interactive=False) | |
gr.Markdown("**Categorical Feature Statistics**") | |
df_categorical_stats = gr.DataFrame(interactive=False) | |
with gr.Tab("📈 Overview Visuals"): | |
gr.Markdown("### At-a-Glance Visualizations") | |
with gr.Row(): | |
plot_dtype = gr.Plot() | |
plot_missing = gr.Plot() | |
with gr.Row(): | |
plot_corr = gr.Plot() | |
with gr.Tab("🎨 Interactive Visuals"): | |
gr.Markdown("### Explore Your Data Visually") | |
with gr.Row(): | |
with gr.Column(): | |
dd_hist_col = gr.Dropdown(label="Select Column", visible=False) | |
plot_hist = gr.Plot() | |
with gr.Column(): | |
dd_box_cat = gr.Dropdown(label="Select Category", visible=False) | |
dd_box_num = gr.Dropdown(label="Select Value", visible=False) | |
plot_box = gr.Plot() | |
with gr.Row(): | |
gr.Markdown("#### Scatter Plot Explorer") | |
with gr.Row(): | |
dd_scatter_x = gr.Dropdown(label="X-axis", visible=False) | |
dd_scatter_y = gr.Dropdown(label="Y-axis", visible=False) | |
dd_scatter_color = gr.Dropdown(label="Color", visible=False) | |
plot_scatter = gr.Plot() | |
with gr.Tab("🔍 Column Drilldown"): | |
gr.Markdown("### Deep Dive into a Single Column") | |
dd_drilldown_col = gr.Dropdown(label="Select Column", visible=False) | |
with gr.Row(): | |
md_drilldown_stats = gr.Markdown() | |
plot_drilldown = gr.Plot() | |
# --- Event Listeners --- | |
# Main analysis trigger | |
analyze_btn.click( | |
fn=process_uploaded_file, | |
inputs=[file_input, api_key_input], | |
outputs=[ | |
state_analyzer, md_ai_report, btn_download_report, | |
df_missing_data, df_numeric_stats, df_categorical_stats, | |
plot_dtype, plot_missing, plot_corr, | |
dd_hist_col, dd_scatter_x, dd_scatter_y, dd_scatter_color, | |
dd_box_cat, dd_box_num, dd_drilldown_col | |
] | |
) | |
# Interactive plot triggers | |
dd_hist_col.change(fn=create_histogram, inputs=[state_analyzer, dd_hist_col], outputs=plot_hist) | |
dd_scatter_x.change(fn=create_scatterplot, inputs=[state_analyzer, dd_scatter_x, dd_scatter_y, dd_scatter_color], outputs=plot_scatter) | |
dd_scatter_y.change(fn=create_scatterplot, inputs=[state_analyzer, dd_scatter_x, dd_scatter_y, dd_scatter_color], outputs=plot_scatter) | |
dd_scatter_color.change(fn=create_scatterplot, inputs=[state_analyzer, dd_scatter_x, dd_scatter_y, dd_scatter_color], outputs=plot_scatter) | |
dd_box_cat.change(fn=create_boxplot, inputs=[state_analyzer, dd_box_cat, dd_box_num], outputs=plot_box) | |
dd_box_num.change(fn=create_boxplot, inputs=[state_analyzer, dd_box_cat, dd_box_num], outputs=plot_box) | |
# Drilldown trigger | |
dd_drilldown_col.change(fn=analyze_single_column, inputs=[state_analyzer, dd_drilldown_col], outputs=[md_drilldown_stats, plot_drilldown]) | |
# Download trigger | |
btn_download_report.click(fn=download_report, inputs=[state_analyzer, md_ai_report], outputs=gr.File(label="Download Report")) | |
gr.Markdown("---") | |
gr.Markdown("💡 **Tip**: Get your free Google Gemini API key from [Google AI Studio](https://aistudio.google.com/app/apikey).") | |
gr.Markdown("MCP Expert System v2.0 - Analysis Complete.") | |
if __name__ == "__main__": | |
demo.launch(debug=True) |