CCockrum's picture
Create app.py
20adb37 verified
raw
history blame
14.1 kB
import gradio as gr
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from scipy import stats
from typing import Dict, List, Tuple, Any, Optional
import warnings
warnings.filterwarnings('ignore')
class OutlierDetective:
def __init__(self):
self.df = None
self.outlier_results = {}
self.numeric_columns = []
def load_data(self, file_path: str) -> pd.DataFrame:
"""Load data from various file formats"""
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path, encoding='utf-8')
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
elif file_path.endswith('.parquet'):
df = pd.read_parquet(file_path)
else:
df = pd.read_csv(file_path)
self.df = df
# Identify numeric columns
self.numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
return df
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
def detect_iqr_outliers(self, column: str) -> Dict[str, Any]:
"""Detect outliers using Interquartile Range (IQR) method"""
if column not in self.numeric_columns:
return {}
series = self.df[column].dropna()
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_mask = (series < lower_bound) | (series > upper_bound)
outlier_indices = series[outlier_mask].index.tolist()
outlier_values = series[outlier_mask].tolist()
return {
'method': 'IQR',
'lower_bound': lower_bound,
'upper_bound': upper_bound,
'outlier_indices': outlier_indices,
'outlier_values': outlier_values,
'outlier_count': len(outlier_indices),
'outlier_percentage': (len(outlier_indices) / len(series)) * 100,
'explanation': f"Values below {lower_bound:.2f} or above {upper_bound:.2f} are considered outliers"
}
def detect_zscore_outliers(self, column: str, threshold: float = 3) -> Dict[str, Any]:
"""Detect outliers using Z-score method"""
if column not in self.numeric_columns:
return {}
series = self.df[column].dropna()
z_scores = np.abs(stats.zscore(series))
outlier_mask = z_scores > threshold
outlier_indices = series[outlier_mask].index.tolist()
outlier_values = series[outlier_mask].tolist()
outlier_zscores = z_scores[outlier_mask].tolist()
return {
'method': 'Z-Score',
'threshold': threshold,
'outlier_indices': outlier_indices,
'outlier_values': outlier_values,
'outlier_zscores': outlier_zscores,
'outlier_count': len(outlier_indices),
'outlier_percentage': (len(outlier_indices) / len(series)) * 100,
'explanation': f"Values with |z-score| > {threshold} are considered outliers"
}
def detect_modified_zscore_outliers(self, column: str, threshold: float = 3.5) -> Dict[str, Any]:
"""Detect outliers using Modified Z-score (MAD) method"""
if column not in self.numeric_columns:
return {}
series = self.df[column].dropna()
median = series.median()
mad = stats.median_abs_deviation(series)
if mad == 0:
return {
'method': 'Modified Z-Score',
'outlier_count': 0,
'outlier_percentage': 0,
'explanation': "MAD is zero - no outliers detected using this method"
}
modified_z_scores = 0.6745 * (series - median) / mad
outlier_mask = np.abs(modified_z_scores) > threshold
outlier_indices = series[outlier_mask].index.tolist()
outlier_values = series[outlier_mask].tolist()
outlier_scores = modified_z_scores[outlier_mask].tolist()
return {
'method': 'Modified Z-Score',
'threshold': threshold,
'median': median,
'mad': mad,
'outlier_indices': outlier_indices,
'outlier_values': outlier_values,
'outlier_scores': outlier_scores,
'outlier_count': len(outlier_indices),
'outlier_percentage': (len(outlier_indices) / len(series)) * 100,
'explanation': f"Values with |modified z-score| > {threshold} are considered outliers (robust to extreme values)"
}
def detect_isolation_forest_outliers(self, columns: List[str], contamination: float = 0.1) -> Dict[str, Any]:
"""Detect multivariate outliers using Isolation Forest"""
if not columns or len(columns) < 1:
return {}
# Filter to only numeric columns that exist
valid_columns = [col for col in columns if col in self.numeric_columns]
if not valid_columns:
return {}
# Prepare data
data = self.df[valid_columns].dropna()
if len(data) < 10: # Need minimum data points
return {}
# Standardize the data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
# Fit Isolation Forest
iso_forest = IsolationForest(contamination=contamination, random_state=42)
outlier_labels = iso_forest.fit_predict(scaled_data)
# Get outlier indices and scores
outlier_mask = outlier_labels == -1
outlier_indices = data[outlier_mask].index.tolist()
outlier_scores = iso_forest.score_samples(scaled_data)
outlier_score_values = outlier_scores[outlier_mask].tolist()
return {
'method': 'Isolation Forest',
'contamination': contamination,
'columns_used': valid_columns,
'outlier_indices': outlier_indices,
'outlier_scores': outlier_score_values,
'outlier_count': len(outlier_indices),
'outlier_percentage': (len(outlier_indices) / len(data)) * 100,
'explanation': f"Multivariate outlier detection using {len(valid_columns)} features with {contamination*100}% expected contamination"
}
def detect_dbscan_outliers(self, columns: List[str], eps: float = 0.5, min_samples: int = 5) -> Dict[str, Any]:
"""Detect outliers using DBSCAN clustering"""
if not columns or len(columns) < 1:
return {}
# Filter to only numeric columns that exist
valid_columns = [col for col in columns if col in self.numeric_columns]
if not valid_columns:
return {}
# Prepare data
data = self.df[valid_columns].dropna()
if len(data) < min_samples * 2: # Need minimum data points
return {}
# Standardize the data
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
# Apply DBSCAN
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
cluster_labels = dbscan.fit_predict(scaled_data)
# Points labeled as -1 are outliers
outlier_mask = cluster_labels == -1
outlier_indices = data[outlier_mask].index.tolist()
# Count clusters
n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
return {
'method': 'DBSCAN',
'eps': eps,
'min_samples': min_samples,
'columns_used': valid_columns,
'n_clusters': n_clusters,
'outlier_indices': outlier_indices,
'outlier_count': len(outlier_indices),
'outlier_percentage': (len(outlier_indices) / len(data)) * 100,
'explanation': f"Density-based outlier detection found {n_clusters} clusters using {len(valid_columns)} features"
}
def analyze_outliers(self, selected_columns: List[str] = None, methods: List[str] = None) -> Dict[str, Any]:
"""Comprehensive outlier analysis"""
if self.df is None:
return {}
if selected_columns is None:
selected_columns = self.numeric_columns
else:
# Filter to only numeric columns
selected_columns = [col for col in selected_columns if col in self.numeric_columns]
if not selected_columns:
return {}
if methods is None:
methods = ['IQR', 'Z-Score', 'Modified Z-Score', 'Isolation Forest']
results = {}
# Single-column methods
for column in selected_columns:
results[column] = {}
if 'IQR' in methods:
results[column]['IQR'] = self.detect_iqr_outliers(column)
if 'Z-Score' in methods:
results[column]['Z-Score'] = self.detect_zscore_outliers(column)
if 'Modified Z-Score' in methods:
results[column]['Modified Z-Score'] = self.detect_modified_zscore_outliers(column)
# Multi-column methods
if len(selected_columns) > 1:
if 'Isolation Forest' in methods:
results['Multivariate'] = {}
results['Multivariate']['Isolation Forest'] = self.detect_isolation_forest_outliers(selected_columns)
if 'DBSCAN' in methods:
if 'Multivariate' not in results:
results['Multivariate'] = {}
results['Multivariate']['DBSCAN'] = self.detect_dbscan_outliers(selected_columns)
self.outlier_results = results
return results
def generate_outlier_report(self) -> str:
"""Generate comprehensive outlier analysis report"""
if not self.outlier_results:
return "No outlier analysis results available. Please run the analysis first."
report = "#Outlier Detection Report\n\n"
# Summary statistics
total_outliers_by_method = {}
all_outlier_indices = set()
for column, methods in self.outlier_results.items():
if column == 'Multivariate':
continue
for method, result in methods.items():
if isinstance(result, dict) and 'outlier_count' in result:
if method not in total_outliers_by_method:
total_outliers_by_method[method] = 0
total_outliers_by_method[method] += result['outlier_count']
if 'outlier_indices' in result:
all_outlier_indices.update(result['outlier_indices'])
# Add multivariate results
if 'Multivariate' in self.outlier_results:
for method, result in self.outlier_results['Multivariate'].items():
if isinstance(result, dict) and 'outlier_count' in result:
total_outliers_by_method[method] = result['outlier_count']
if 'outlier_indices' in result:
all_outlier_indices.update(result['outlier_indices'])
report += "## Summary\n"
report += f"- **Total rows analyzed:** {len(self.df):,}\n"
report += f"- **Unique outlier rows found:** {len(all_outlier_indices)}\n"
report += f"- **Percentage of outlier rows:** {(len(all_outlier_indices)/len(self.df)*100):.2f}%\n\n"
report += "### Outliers by Method:\n"
for method, count in total_outliers_by_method.items():
report += f"- **{method}:** {count} outliers\n"
report += "\n"
# Detailed results by column
report += "## Detailed Results\n\n"
for column, methods in self.outlier_results.items():
if column == 'Multivariate':
continue
report += f"### Column: `{column}`\n\n"
for method, result in methods.items():
if not isinstance(result, dict) or 'outlier_count' in result and result['outlier_count'] == 0:
report += f"**{method}:** No outliers detected\n"
continue
report += f"**{method}:**\n"
report += f"- Outliers found: {result['outlier_count']} ({result['outlier_percentage']:.2f}%)\n"
report += f"- Explanation: {result['explanation']}\n"
# Show some example outlier values
if 'outlier_values' in result and result['outlier_values']:
sample_values = result['outlier_values'][:5] # Show first 5
report += f"- Example outliers: {', '.join([f'{v:.3f}' if isinstance(v, (int, float)) else str(v) for v in sample_values])}"
if len(result['outlier_values']) > 5:
report += f" (and {len(result['outlier_values']) - 5} more)"
report += "\n"
report += "\n"
# Multivariate results
if 'Multivariate' in self.outlier_results:
report += "### Multivariate Analysis\n\n"
for method, result in self.outlier_results['Multivariate'].items():
if not isinstance(result, dict):
continue
report += f"**{method}:**\n"
report += f"- Outliers found: {result['outli