Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from plotly.subplots import make_subplots | |
from sklearn.ensemble import IsolationForest | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.cluster import DBSCAN | |
from scipy import stats | |
from typing import Dict, List, Tuple, Any, Optional | |
import warnings | |
warnings.filterwarnings('ignore') | |
class OutlierDetective: | |
def __init__(self): | |
self.df = None | |
self.outlier_results = {} | |
self.numeric_columns = [] | |
def load_data(self, file_path: str) -> pd.DataFrame: | |
"""Load data from various file formats""" | |
try: | |
if file_path.endswith('.csv'): | |
df = pd.read_csv(file_path, encoding='utf-8') | |
elif file_path.endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file_path) | |
elif file_path.endswith('.json'): | |
df = pd.read_json(file_path) | |
elif file_path.endswith('.parquet'): | |
df = pd.read_parquet(file_path) | |
else: | |
df = pd.read_csv(file_path) | |
self.df = df | |
# Identify numeric columns | |
self.numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist() | |
return df | |
except Exception as e: | |
raise Exception(f"Error loading file: {str(e)}") | |
def detect_iqr_outliers(self, column: str) -> Dict[str, Any]: | |
"""Detect outliers using Interquartile Range (IQR) method""" | |
if column not in self.numeric_columns: | |
return {} | |
series = self.df[column].dropna() | |
Q1 = series.quantile(0.25) | |
Q3 = series.quantile(0.75) | |
IQR = Q3 - Q1 | |
lower_bound = Q1 - 1.5 * IQR | |
upper_bound = Q3 + 1.5 * IQR | |
outlier_mask = (series < lower_bound) | (series > upper_bound) | |
outlier_indices = series[outlier_mask].index.tolist() | |
outlier_values = series[outlier_mask].tolist() | |
return { | |
'method': 'IQR', | |
'lower_bound': lower_bound, | |
'upper_bound': upper_bound, | |
'outlier_indices': outlier_indices, | |
'outlier_values': outlier_values, | |
'outlier_count': len(outlier_indices), | |
'outlier_percentage': (len(outlier_indices) / len(series)) * 100, | |
'explanation': f"Values below {lower_bound:.2f} or above {upper_bound:.2f} are considered outliers" | |
} | |
def detect_zscore_outliers(self, column: str, threshold: float = 3) -> Dict[str, Any]: | |
"""Detect outliers using Z-score method""" | |
if column not in self.numeric_columns: | |
return {} | |
series = self.df[column].dropna() | |
z_scores = np.abs(stats.zscore(series)) | |
outlier_mask = z_scores > threshold | |
outlier_indices = series[outlier_mask].index.tolist() | |
outlier_values = series[outlier_mask].tolist() | |
outlier_zscores = z_scores[outlier_mask].tolist() | |
return { | |
'method': 'Z-Score', | |
'threshold': threshold, | |
'outlier_indices': outlier_indices, | |
'outlier_values': outlier_values, | |
'outlier_zscores': outlier_zscores, | |
'outlier_count': len(outlier_indices), | |
'outlier_percentage': (len(outlier_indices) / len(series)) * 100, | |
'explanation': f"Values with |z-score| > {threshold} are considered outliers" | |
} | |
def detect_modified_zscore_outliers(self, column: str, threshold: float = 3.5) -> Dict[str, Any]: | |
"""Detect outliers using Modified Z-score (MAD) method""" | |
if column not in self.numeric_columns: | |
return {} | |
series = self.df[column].dropna() | |
median = series.median() | |
mad = stats.median_abs_deviation(series) | |
if mad == 0: | |
return { | |
'method': 'Modified Z-Score', | |
'outlier_count': 0, | |
'outlier_percentage': 0, | |
'explanation': "MAD is zero - no outliers detected using this method" | |
} | |
modified_z_scores = 0.6745 * (series - median) / mad | |
outlier_mask = np.abs(modified_z_scores) > threshold | |
outlier_indices = series[outlier_mask].index.tolist() | |
outlier_values = series[outlier_mask].tolist() | |
outlier_scores = modified_z_scores[outlier_mask].tolist() | |
return { | |
'method': 'Modified Z-Score', | |
'threshold': threshold, | |
'median': median, | |
'mad': mad, | |
'outlier_indices': outlier_indices, | |
'outlier_values': outlier_values, | |
'outlier_scores': outlier_scores, | |
'outlier_count': len(outlier_indices), | |
'outlier_percentage': (len(outlier_indices) / len(series)) * 100, | |
'explanation': f"Values with |modified z-score| > {threshold} are considered outliers (robust to extreme values)" | |
} | |
def detect_isolation_forest_outliers(self, columns: List[str], contamination: float = 0.1) -> Dict[str, Any]: | |
"""Detect multivariate outliers using Isolation Forest""" | |
if not columns or len(columns) < 1: | |
return {} | |
# Filter to only numeric columns that exist | |
valid_columns = [col for col in columns if col in self.numeric_columns] | |
if not valid_columns: | |
return {} | |
# Prepare data | |
data = self.df[valid_columns].dropna() | |
if len(data) < 10: # Need minimum data points | |
return {} | |
# Standardize the data | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(data) | |
# Fit Isolation Forest | |
iso_forest = IsolationForest(contamination=contamination, random_state=42) | |
outlier_labels = iso_forest.fit_predict(scaled_data) | |
# Get outlier indices and scores | |
outlier_mask = outlier_labels == -1 | |
outlier_indices = data[outlier_mask].index.tolist() | |
outlier_scores = iso_forest.score_samples(scaled_data) | |
outlier_score_values = outlier_scores[outlier_mask].tolist() | |
return { | |
'method': 'Isolation Forest', | |
'contamination': contamination, | |
'columns_used': valid_columns, | |
'outlier_indices': outlier_indices, | |
'outlier_scores': outlier_score_values, | |
'outlier_count': len(outlier_indices), | |
'outlier_percentage': (len(outlier_indices) / len(data)) * 100, | |
'explanation': f"Multivariate outlier detection using {len(valid_columns)} features with {contamination*100}% expected contamination" | |
} | |
def detect_dbscan_outliers(self, columns: List[str], eps: float = 0.5, min_samples: int = 5) -> Dict[str, Any]: | |
"""Detect outliers using DBSCAN clustering""" | |
if not columns or len(columns) < 1: | |
return {} | |
# Filter to only numeric columns that exist | |
valid_columns = [col for col in columns if col in self.numeric_columns] | |
if not valid_columns: | |
return {} | |
# Prepare data | |
data = self.df[valid_columns].dropna() | |
if len(data) < min_samples * 2: # Need minimum data points | |
return {} | |
# Standardize the data | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(data) | |
# Apply DBSCAN | |
dbscan = DBSCAN(eps=eps, min_samples=min_samples) | |
cluster_labels = dbscan.fit_predict(scaled_data) | |
# Points labeled as -1 are outliers | |
outlier_mask = cluster_labels == -1 | |
outlier_indices = data[outlier_mask].index.tolist() | |
# Count clusters | |
n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0) | |
return { | |
'method': 'DBSCAN', | |
'eps': eps, | |
'min_samples': min_samples, | |
'columns_used': valid_columns, | |
'n_clusters': n_clusters, | |
'outlier_indices': outlier_indices, | |
'outlier_count': len(outlier_indices), | |
'outlier_percentage': (len(outlier_indices) / len(data)) * 100, | |
'explanation': f"Density-based outlier detection found {n_clusters} clusters using {len(valid_columns)} features" | |
} | |
def analyze_outliers(self, selected_columns: List[str] = None, methods: List[str] = None) -> Dict[str, Any]: | |
"""Comprehensive outlier analysis""" | |
if self.df is None: | |
return {} | |
if selected_columns is None: | |
selected_columns = self.numeric_columns | |
else: | |
# Filter to only numeric columns | |
selected_columns = [col for col in selected_columns if col in self.numeric_columns] | |
if not selected_columns: | |
return {} | |
if methods is None: | |
methods = ['IQR', 'Z-Score', 'Modified Z-Score', 'Isolation Forest'] | |
results = {} | |
# Single-column methods | |
for column in selected_columns: | |
results[column] = {} | |
if 'IQR' in methods: | |
results[column]['IQR'] = self.detect_iqr_outliers(column) | |
if 'Z-Score' in methods: | |
results[column]['Z-Score'] = self.detect_zscore_outliers(column) | |
if 'Modified Z-Score' in methods: | |
results[column]['Modified Z-Score'] = self.detect_modified_zscore_outliers(column) | |
# Multi-column methods | |
if len(selected_columns) > 1: | |
if 'Isolation Forest' in methods: | |
results['Multivariate'] = {} | |
results['Multivariate']['Isolation Forest'] = self.detect_isolation_forest_outliers(selected_columns) | |
if 'DBSCAN' in methods: | |
if 'Multivariate' not in results: | |
results['Multivariate'] = {} | |
results['Multivariate']['DBSCAN'] = self.detect_dbscan_outliers(selected_columns) | |
self.outlier_results = results | |
return results | |
def generate_outlier_report(self) -> str: | |
"""Generate comprehensive outlier analysis report""" | |
if not self.outlier_results: | |
return "No outlier analysis results available. Please run the analysis first." | |
report = "#Outlier Detection Report\n\n" | |
total_outliers_by_method = {} | |
all_outlier_indices = set() | |
for column, methods in self.outlier_results.items(): | |
if column == 'Multivariate': | |
continue | |
for method, result in methods.items(): | |
if isinstance(result, dict) and 'outlier_count' in result: | |
total_outliers_by_method.setdefault(method, 0) | |
total_outliers_by_method[method] += result['outlier_count'] | |
if 'outlier_indices' in result: | |
all_outlier_indices.update(result['outlier_indices']) | |
if 'Multivariate' in self.outlier_results: | |
for method, result in self.outlier_results['Multivariate'].items(): | |
if isinstance(result, dict) and 'outlier_count' in result: | |
total_outliers_by_method[method] = result['outlier_count'] | |
if 'outlier_indices' in result: | |
all_outlier_indices.update(result['outlier_indices']) | |
report += "## Summary\n" | |
report += f"- **Total rows analyzed:** {len(self.df):,}\n" | |
report += f"- **Unique outlier rows found:** {len(all_outlier_indices)}\n" | |
report += f"- **Percentage of outlier rows:** {(len(all_outlier_indices)/len(self.df)*100):.2f}%\n\n" | |
report += "### Outliers by Method:\n" | |
for method, count in total_outliers_by_method.items(): | |
report += f"- **{method}:** {count} outliers\n" | |
report += "\n## Detailed Results\n\n" | |
for column, methods in self.outlier_results.items(): | |
if column == 'Multivariate': | |
continue | |
report += f"### Column: `{column}`\n\n" | |
for method, result in methods.items(): | |
if not isinstance(result, dict) or ('outlier_count' in result and result['outlier_count'] == 0): | |
report += f"**{method}:** No outliers detected\n" | |
continue | |
report += f"**{method}:**\n" | |
report += f"- Outliers found: {result['outlier_count']} ({result['outlier_percentage']:.2f}%)\n" | |
report += f"- Explanation: {result['explanation']}\n" | |
if 'outlier_values' in result and result['outlier_values']: | |
sample_values = result['outlier_values'][:5] | |
formatted_values = ', '.join([f'{v:.3f}' if isinstance(v, (int, float)) else str(v) for v in sample_values]) | |
report += f"- Example outliers: {formatted_values}" | |
if len(result['outlier_values']) > 5: | |
report += f" (and {len(result['outlier_values']) - 5} more)" | |
report += "\n" | |
report += "\n" | |
if 'Multivariate' in self.outlier_results: | |
report += "### Multivariate Analysis\n\n" | |
for method, result in self.outlier_results['Multivariate'].items(): | |
if not isinstance(result, dict) or 'outlier_count' not in result: | |
continue | |
report += f"**{method}:**\n" | |
report += f"- Outliers found: {result['outlier_count']} ({result['outlier_percentage']:.2f}%)\n" | |
report += f"- Explanation: {result['explanation']}\n\n" | |
return report | |
if __name__ == "__main__": | |
def run_outlier_detection(file): | |
detector = OutlierDetective() | |
df = detector.load_data(file.name) | |
detector.analyze_outliers() | |
return detector.generate_outlier_report() | |
iface = gr.Interface(fn=run_outlier_detection, | |
inputs=gr.File(label="Upload a dataset"), | |
outputs="text", | |
title="Outlier Detection App") | |
iface.launch() |