Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
# | |
# PROJECT: CognitiveEDA - The AI-Augmented Data Discovery Platform | |
# | |
# DESCRIPTION: An enterprise-grade Gradio application that revolutionizes Exploratory | |
# Data Analysis (EDA). By integrating Google's Gemini Pro LLM, this | |
# tool transcends traditional data profiling. It automates the generation | |
# of statistical summaries, interactive visualizations, and, most | |
# importantly, a rich, narrative-driven analysis. It delivers | |
# executive summaries, data quality assessments, actionable insights, | |
# and strategic recommendations in a single, streamlined workflow. | |
# | |
# ARCHITECTURE: The application is built upon a robust, object-oriented foundation. | |
# - DataAnalyzer (Core Engine): An encapsulated class that holds the | |
# DataFrame state and performs all statistical calculations and | |
# metadata extraction efficiently, ensuring data is processed once. | |
# - AI Integration: A dedicated module communicates with the Gemini API, | |
# using a sophisticated, structured prompt to ensure consistent, | |
# high-quality analytical narratives. | |
# - Gradio Interface (UI Layer): A multi-tabbed, interactive dashboard | |
# that logically separates the AI narrative, data profiling, static | |
# visuals, and interactive exploration tools. State is managed | |
# efficiently to provide a responsive user experience. | |
# | |
# FEATURES: | |
# - AI-Powered Executive Summary: Generates a high-level overview for stakeholders. | |
# - Automated Data Quality Audit: Provides a quality score and actionable cleaning steps. | |
# - Insight Discovery Engine: Uncovers hidden patterns, correlations, and anomalies. | |
# - Strategic Recommendations: Suggests next steps, modeling approaches, and business use cases. | |
# - Comprehensive Profiling: Detailed statistical tables for all data types. | |
# - Interactive Visualization Suite: Dynamic plots for deep-dive analysis. | |
# - One-Click Report Export: Downloads the complete AI-generated analysis as a Markdown file. | |
# | |
# AUTHOR: An MCP Expert in Data & AI Solutions | |
# VERSION: 3.0 (Enterprise Edition) | |
# LAST-UPDATE: 2023-10-27 | |
from __future__ import annotations | |
import warnings | |
import logging | |
import os | |
from datetime import datetime | |
from typing import Any, Dict, List, Optional, Tuple | |
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import google.generativeai as genai | |
# --- Configuration & Constants --- | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - [%(levelname)s] - (%(filename)s:%(lineno)d) - %(message)s' | |
) | |
warnings.filterwarnings('ignore', category=FutureWarning) | |
class Config: | |
"""Application-wide configuration settings.""" | |
APP_TITLE = "π CognitiveEDA: AI-Augmented Data Discovery Platform" | |
GEMINI_MODEL = 'gemini-1.5-flash-latest' | |
CORR_THRESHOLD = 0.75 # Threshold for highlighting high correlation | |
TOP_N_CATEGORIES = 10 # For bar charts of categorical features | |
# --- Core Analysis Engine --- | |
class DataAnalyzer: | |
""" | |
Encapsulates all data analysis logic, acting as the single source of truth | |
for the uploaded dataset and its derived metadata. | |
""" | |
def __init__(self, df: pd.DataFrame): | |
if not isinstance(df, pd.DataFrame): | |
raise TypeError("Input must be a pandas DataFrame.") | |
self.df = df | |
self._metadata: Optional[Dict[str, Any]] = None | |
logging.info(f"DataAnalyzer instantiated with DataFrame of shape: {self.df.shape}") | |
def metadata(self) -> Dict[str, Any]: | |
"""Lazy-loads and caches comprehensive dataset metadata for efficient reuse.""" | |
if self._metadata is None: | |
logging.info("First access to metadata, performing extraction...") | |
self._metadata = self._extract_metadata() | |
return self._metadata | |
def _extract_metadata(self) -> Dict[str, Any]: | |
"""Performs a deep scan of the DataFrame to extract key characteristics.""" | |
rows, cols = self.df.shape | |
numeric_cols = self.df.select_dtypes(include=np.number).columns.tolist() | |
categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns.tolist() | |
# Advanced: High correlation pair detection | |
high_corr_pairs = [] | |
if len(numeric_cols) > 1: | |
corr_matrix = self.df[numeric_cols].corr().abs() | |
upper_tri = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)) | |
high_corr_series = upper_tri.stack() | |
high_corr_pairs = ( | |
high_corr_series[high_corr_series > Config.CORR_THRESHOLD] | |
.reset_index() | |
.rename(columns={'level_0': 'Feature 1', 'level_1': 'Feature 2', 0: 'Correlation'}) | |
.to_dict('records') | |
) | |
return { | |
'shape': (rows, cols), | |
'columns': self.df.columns.tolist(), | |
'numeric_cols': numeric_cols, | |
'categorical_cols': categorical_cols, | |
'memory_usage_mb': f"{self.df.memory_usage(deep=True).sum() / 1e6:.2f}", | |
'total_missing': int(self.df.isnull().sum().sum()), | |
'data_quality_score': round((self.df.notna().sum().sum() / self.df.size) * 100, 2), | |
'high_corr_pairs': high_corr_pairs, | |
} | |
def get_profiling_tables(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: | |
"""Generates structured DataFrames for data profiling.""" | |
logging.info("Generating profiling tables for missing, numeric, and categorical data.") | |
# Missing data profile | |
missing = self.df.isnull().sum() | |
missing_df = pd.DataFrame({ | |
'Missing Count': missing, | |
'Missing Percentage (%)': (missing / len(self.df) * 100).round(2) | |
}).reset_index().rename(columns={'index': 'Column'}).sort_values('Missing Count', ascending=False) | |
# Numeric features profile | |
numeric_stats = self.df[self.metadata['numeric_cols']].describe(percentiles=[.01, .25, .5, .75, .99]).T | |
numeric_stats_df = numeric_stats.round(3).reset_index().rename(columns={'index': 'Column'}) | |
# Categorical features profile | |
cat_stats = self.df[self.metadata['categorical_cols']].describe(include=['object', 'category']).T | |
cat_stats_df = cat_stats.reset_index().rename(columns={'index': 'Column'}) | |
return missing_df, numeric_stats_df, cat_stats_df | |
def get_overview_visuals(self) -> Tuple[go.Figure, go.Figure, go.Figure]: | |
"""Creates a set of key visualizations for a high-level overview.""" | |
logging.info("Generating overview visualizations (types, missing data, correlation).") | |
meta = self.metadata | |
dtype_counts = self.df.dtypes.astype(str).value_counts() | |
fig_types = px.pie( | |
values=dtype_counts.values, names=dtype_counts.index, | |
title="<b>π Data Type Composition</b>", hole=0.4, | |
color_discrete_sequence=px.colors.qualitative.Pastel | |
) | |
fig_types.update_traces(textposition='outside', textinfo='percent+label') | |
missing_df = self.df.isnull().sum().reset_index(name='count').query('count > 0') | |
fig_missing = px.bar( | |
missing_df, x='index', y='count', title="<b>π³οΈ Missing Values Distribution</b>", | |
labels={'index': 'Column Name', 'count': 'Number of Missing Values'}, | |
).update_xaxes(categoryorder="total descending") | |
fig_corr = go.Figure() | |
if len(meta['numeric_cols']) > 1: | |
corr_matrix = self.df[meta['numeric_cols']].corr() | |
fig_corr = px.imshow( | |
corr_matrix, text_auto=".2f", aspect="auto", | |
title=f"<b>π Correlation Matrix (Threshold > {Config.CORR_THRESHOLD})</b>", | |
color_continuous_scale='RdBu_r', zmin=-1, zmax=1 | |
) | |
else: | |
fig_corr.update_layout(title="<b>π Correlation Matrix (Insufficient Numeric Data)</b>") | |
return fig_types, fig_missing, fig_corr | |
def generate_ai_narrative(self, api_key: str) -> str: | |
"""Orchestrates the generation of the full AI-driven report using Gemini.""" | |
logging.info("Generating AI narrative with the Gemini API.") | |
meta = self.metadata | |
# A more sophisticated, structured prompt for a better report | |
prompt = f""" | |
As "Cognitive Analyst," an elite AI data scientist, your task is to generate a comprehensive, multi-part data discovery report. | |
Analyze the following dataset context and produce a professional, insightful, and clear analysis in Markdown format. | |
**DATASET CONTEXT:** | |
- **Shape:** {meta['shape'][0]} rows, {meta['shape'][1]} columns. | |
- **Column Schema:** | |
- Numeric: {', '.join(meta['numeric_cols']) if meta['numeric_cols'] else 'None'} | |
- Categorical: {', '.join(meta['categorical_cols']) if meta['categorical_cols'] else 'None'} | |
- **Data Quality Score:** {meta['data_quality_score']}% (Percentage of non-missing cells) | |
- **Total Missing Values:** {meta['total_missing']:,} | |
- **High-Correlation Pairs (>{Config.CORR_THRESHOLD}):** {meta['high_corr_pairs'] if meta['high_corr_pairs'] else 'None detected.'} | |
- **Data Snippet (First 5 Rows):** | |
{self.df.head(5).to_markdown(index=False)} | |
**REQUIRED REPORT STRUCTURE (Strictly use this Markdown format):** | |
# π AI Data Discovery Report | |
## π 1. Executive Summary | |
* **Primary Objective:** (Deduce the most likely purpose of this dataset. What problem is it trying to solve?) | |
* **Key Finding:** (State the single most interesting or impactful insight you've discovered.) | |
* **Overall State:** (Briefly comment on the data's quality and readiness for analysis.) | |
## π§ 2. Data Profile & Quality Assessment | |
* **First Impression:** (Describe the dataset's structure, size, and composition.) | |
* **Data Quality Audit:** (Elaborate on the **{meta['data_quality_score']}%** quality score. Are the **{meta['total_missing']}** missing values concentrated in specific columns? Is this a major concern?) | |
* **Redundancy Check:** (Comment on the detected high-correlation pairs. Is there a risk of multicollinearity in modeling?) | |
## π‘ 3. Key Insights & Potential Stories | |
* **Insight 1 (e.g., Anomaly Detected π΅οΈ):** (Describe a surprising pattern, outlier, or distribution in a key numeric column.) | |
* **Insight 2 (e.g., Categorical Trend π):** (Analyze a key categorical column. What does its distribution reveal? Is there a dominant category?) | |
* **Insight 3 (e.g., Relationship Hint π):** (Speculate on a potential relationship between two or more columns, even if not highly correlated.) | |
## π οΈ 4. Actionable Recommendations | |
* **Data Cleaning:** | |
- **Step 1:** (Provide a specific recommendation for handling missing data, e.g., "For `column_name`, with X% missing, consider imputation using the median due to its skewed distribution.") | |
- **Step 2:** (Suggest actions for correlated features, e.g., "Consider dropping `Feature A` or using dimensionality reduction (PCA) due to its high correlation with `Feature B`.") | |
* **Feature Engineering:** | |
- **Idea 1:** (Suggest creating a new feature, e.g., "Combine `year` and `month` into a `date` feature for time-series analysis.") | |
* **Next Analytical Steps:** | |
- **Hypothesis to Test:** (Propose a business or research question to investigate further, e.g., "Does `customer_segment` significantly impact `total_spend`?") | |
- **Modeling Potential:** (Suggest a suitable machine learning model, e.g., "This dataset is well-suited for a classification model to predict `is_churn`.") | |
""" | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel(Config.GEMINI_MODEL) | |
response = model.generate_content(prompt) | |
return response.text | |
except Exception as e: | |
logging.error(f"Gemini API call failed: {e}", exc_info=True) | |
error_message = ( | |
"β **AI Report Generation Failed**\n\n" | |
f"**Error Details:** `{str(e)}`\n\n" | |
"**Troubleshooting Steps:**\n" | |
"1. Verify that your Google Gemini API key is correct and active.\n" | |
"2. Check your network connection and firewall settings.\n" | |
"3. Ensure the Gemini API is not experiencing an outage." | |
) | |
return error_message | |
# --- Gradio UI & Event Handlers --- | |
def create_ui(): | |
"""Defines and builds the Gradio user interface.""" | |
# --- Interactive Plotting Functions (scoped inside UI creation for clarity) --- | |
def create_histogram(analyzer: DataAnalyzer, col: str) -> go.Figure: | |
if not col or not analyzer: return go.Figure() | |
return px.histogram(analyzer.df, x=col, title=f"<b>Distribution of {col}</b>", marginal="box", template="plotly_white") | |
def create_scatterplot(analyzer: DataAnalyzer, x_col: str, y_col:str, color_col:str) -> go.Figure: | |
if not all([analyzer, x_col, y_col]): return go.Figure() | |
return px.scatter( | |
analyzer.df, x=x_col, y=y_col, color=color_col, | |
title=f"<b>Scatter Plot: {x_col} vs. {y_col}</b>", template="plotly_white", | |
color_continuous_scale=px.colors.sequential.Viridis | |
) | |
def analyze_single_column(analyzer: DataAnalyzer, col: str) -> Tuple[str, go.Figure]: | |
if not col or not analyzer: return "", go.Figure() | |
series = analyzer.df[col] | |
stats_md = f"### π **Deep Dive: `{col}`**\n" | |
stats_md += f"- **Data Type:** `{series.dtype}`\n" | |
stats_md += f"- **Unique Values:** `{series.nunique()}`\n" | |
stats_md += f"- **Missing:** `{series.isnull().sum()}` ({series.isnull().mean():.2%})\n" | |
fig = go.Figure() | |
if pd.api.types.is_numeric_dtype(series): | |
stats_md += f"- **Mean:** `{series.mean():.3f}` | **Std Dev:** `{series.std():.3f}`\n" | |
stats_md += f"- **Median:** `{series.median():.3f}` | **Min:** `{series.min():.3f}` | **Max:** `{series.max():.3f}`\n" | |
fig = create_histogram(analyzer, col) | |
else: | |
top_n = series.value_counts().nlargest(Config.TOP_N_CATEGORIES) | |
stats_md += f"- **Top Value:** `{top_n.index[0]}` ({top_n.iloc[0]} occurrences)\n" | |
fig = px.bar( | |
top_n, y=top_n.index, x=top_n.values, orientation='h', | |
title=f"<b>Top {Config.TOP_N_CATEGORIES} Categories in `{col}`</b>", | |
labels={'y': col, 'x': 'Count'}, template="plotly_white" | |
).update_yaxes(categoryorder="total ascending") | |
return stats_md, fig | |
# --- Main UI Blocks --- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan"), title=Config.APP_TITLE) as demo: | |
# Store for the main DataAnalyzer object | |
state_analyzer = gr.State() | |
gr.Markdown(f"<h1>{Config.APP_TITLE}</h1>") | |
gr.Markdown("Upload a CSV file, provide your Gemini API key, and receive an instant, AI-driven analysis of your data.") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
upload_button = gr.File(label="1. Upload CSV File", file_types=[".csv"]) | |
with gr.Column(scale=2): | |
api_key_input = gr.Textbox(label="2. Enter Google Gemini API Key", type="password") | |
with gr.Column(scale=1, min_width=150): | |
analyze_button = gr.Button("β¨ Generate Analysis", variant="primary") | |
with gr.Tabs() as tabs: | |
with gr.Tab("π€ AI Narrative", id=0): | |
ai_report_output = gr.Markdown("Your AI-generated report will appear here once analysis is complete...") | |
download_report_button = gr.Button("β¬οΈ Download Full Report", visible=False) | |
with gr.Tab(" Profile", id=1): | |
gr.Markdown("### **Detailed Data Profile**") | |
gr.Markdown("#### Missing Data Summary") | |
profile_missing_df = gr.DataFrame(interactive=False, label="Missing Values") | |
gr.Markdown("#### Numeric Features Summary") | |
profile_numeric_df = gr.DataFrame(interactive=False, label="Numeric Stats") | |
gr.Markdown("#### Categorical Features Summary") | |
profile_categorical_df = gr.DataFrame(interactive=False, label="Categorical Stats") | |
with gr.Tab("π Overview Visuals", id=2): | |
gr.Markdown("### **At-a-Glance Visualizations**") | |
with gr.Row(): | |
plot_types = gr.Plot() | |
plot_missing = gr.Plot() | |
plot_correlation = gr.Plot() | |
with gr.Tab("π¨ Interactive Explorer", id=3): | |
gr.Markdown("### **Visually Explore Feature Relationships**") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("#### Univariate Analysis") | |
dd_hist_col = gr.Dropdown(label="Select Column for Histogram", visible=False) | |
with gr.Column(scale=2): | |
plot_histogram = gr.Plot() | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("#### Bivariate Analysis (Scatter Plot)") | |
dd_scatter_x = gr.Dropdown(label="X-Axis (Numeric)", visible=False) | |
dd_scatter_y = gr.Dropdown(label="Y-Axis (Numeric)", visible=False) | |
dd_scatter_color = gr.Dropdown(label="Color By (Optional)", visible=False) | |
with gr.Column(scale=2): | |
plot_scatter = gr.Plot() | |
with gr.Tab("π Column Deep-Dive", id=4): | |
gr.Markdown("### **Inspect a Single Column in Detail**") | |
dd_drilldown_col = gr.Dropdown(label="Select Column to Analyze", visible=False) | |
with gr.Row(): | |
md_drilldown_stats = gr.Markdown() | |
plot_drilldown = gr.Plot() | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 20px; font-family: sans-serif; color: #777;"> | |
<p>π‘ Need an API key? Get one from <a href="https://aistudio.google.com/app/apikey" target="_blank">Google AI Studio</a>.</p> | |
<p>CognitiveEDA v3.0 | An MCP Expert System</p> | |
</div> | |
""") | |
# --- Event Listeners & Control Flow --- | |
outputs_for_main_analysis = [ | |
state_analyzer, ai_report_output, download_report_button, | |
profile_missing_df, profile_numeric_df, profile_categorical_df, | |
plot_types, plot_missing, plot_correlation, | |
dd_hist_col, dd_scatter_x, dd_scatter_y, dd_scatter_color, dd_drilldown_col | |
] | |
analyze_button.click( | |
fn=run_full_analysis, | |
inputs=[upload_button, api_key_input], | |
outputs=outputs_for_main_analysis | |
) | |
# Interactive plot triggers | |
dd_hist_col.change(fn=create_histogram, inputs=[state_analyzer, dd_hist_col], outputs=plot_histogram) | |
scatter_inputs = [state_analyzer, dd_scatter_x, dd_scatter_y, dd_scatter_color] | |
dd_scatter_x.change(fn=create_scatterplot, inputs=scatter_inputs, outputs=plot_scatter) | |
dd_scatter_y.change(fn=create_scatterplot, inputs=scatter_inputs, outputs=plot_scatter) | |
dd_scatter_color.change(fn=create_scatterplot, inputs=scatter_inputs, outputs=plot_scatter) | |
dd_drilldown_col.change( | |
fn=analyze_single_column, | |
inputs=[state_analyzer, dd_drilldown_col], | |
outputs=[md_drilldown_stats, plot_drilldown] | |
) | |
download_report_button.click( | |
fn=download_report_file, | |
inputs=[state_analyzer, ai_report_output], | |
outputs=gr.File(label="Download Report") | |
) | |
return demo | |
# --- Main Application Logic --- | |
def run_full_analysis(file_obj: gr.File, api_key: str) -> Dict[gr.component, Any]: | |
""" | |
Orchestrates the entire analysis pipeline upon button click. | |
Returns a dictionary to update all relevant UI components at once. | |
""" | |
if file_obj is None: | |
raise gr.Error("CRITICAL: No file uploaded. Please select a CSV file.") | |
if not api_key: | |
raise gr.Error("CRITICAL: Gemini API key is missing. Please provide your key.") | |
try: | |
logging.info(f"Processing uploaded file: {file_obj.name}") | |
df = pd.read_csv(file_obj.name) | |
analyzer = DataAnalyzer(df) | |
# --- Execute all analysis tasks concurrently (conceptually) --- | |
ai_report = analyzer.generate_ai_narrative(api_key) | |
missing_df, num_df, cat_df = analyzer.get_profiling_tables() | |
fig_types, fig_missing, fig_corr = analyzer.get_overview_visuals() | |
# --- Prepare UI component updates --- | |
meta = analyzer.metadata | |
all_cols, num_cols, cat_cols = meta['columns'], meta['numeric_cols'], meta['categorical_cols'] | |
# Return a dictionary mapping components to their new state/value | |
return { | |
# State & AI Report | |
state_analyzer: analyzer, | |
ai_report_output: ai_report, | |
download_report_button: gr.Button(visible=True), | |
# Profiling Tab | |
profile_missing_df: missing_df, | |
profile_numeric_df: num_df, | |
profile_categorical_df: cat_df, | |
# Overview Visuals Tab | |
plot_types: fig_types, | |
plot_missing: fig_missing, | |
plot_correlation: fig_corr, | |
# Interactive Explorer & Drilldown Dropdown Updates | |
dd_hist_col: gr.Dropdown(choices=num_cols, label="Select Numeric Column", visible=True), | |
dd_scatter_x: gr.Dropdown(choices=num_cols, label="X-Axis (Numeric)", visible=True), | |
dd_scatter_y: gr.Dropdown(choices=num_cols, label="Y-Axis (Numeric)", visible=True), | |
dd_scatter_color: gr.Dropdown(choices=all_cols, label="Color By (Optional)", visible=True), | |
dd_drilldown_col: gr.Dropdown(choices=all_cols, label="Select Column to Analyze", visible=True) | |
} | |
except Exception as e: | |
logging.error(f"A critical error occurred during file processing: {e}", exc_info=True) | |
raise gr.Error(f"Analysis Failed! The process stopped due to: {str(e)}") | |
def download_report_file(analyzer: DataAnalyzer, ai_report_text: str) -> str: | |
"""Generates a comprehensive Markdown file for download.""" | |
if not analyzer: | |
logging.warning("Download attempted without a valid analyzer object.") | |
return None | |
filename = f"CognitiveEDA_Report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" | |
meta = analyzer.metadata | |
# Assemble the full report | |
full_report = f"# CognitiveEDA - Data Discovery Report\n" | |
full_report += f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" | |
full_report += f"## Dataset Overview\n" | |
full_report += f"- **Shape:** {meta['shape'][0]} rows x {meta['shape'][1]} columns\n" | |
full_report += f"- **Memory Footprint:** {meta['memory_usage_mb']} MB\n" | |
full_report += f"- **Data Quality Score:** {meta['data_quality_score']}%\n\n" | |
full_report += "---\n\n" | |
full_report += ai_report_text | |
with open(filename, "w", encoding="utf-8") as f: | |
f.write(full_report) | |
logging.info(f"Report file generated successfully: {filename}") | |
return filename | |
if __name__ == "__main__": | |
app_instance = create_ui() | |
app_instance.launch(debug=True, server_name="0.0.0.0") |