import gradio as gr import pandas as pd import numpy as np import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler from sklearn.cluster import DBSCAN from scipy import stats from typing import Dict, List, Tuple, Any, Optional import warnings warnings.filterwarnings('ignore') class OutlierDetective: def __init__(self): self.df = None self.outlier_results = {} self.numeric_columns = [] def load_data(self, file_path: str) -> pd.DataFrame: """Load data from various file formats""" try: if file_path.endswith('.csv'): df = pd.read_csv(file_path, encoding='utf-8') elif file_path.endswith(('.xlsx', '.xls')): df = pd.read_excel(file_path) elif file_path.endswith('.json'): df = pd.read_json(file_path) elif file_path.endswith('.parquet'): df = pd.read_parquet(file_path) else: df = pd.read_csv(file_path) self.df = df # Identify numeric columns self.numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist() return df except Exception as e: raise Exception(f"Error loading file: {str(e)}") def detect_iqr_outliers(self, column: str) -> Dict[str, Any]: """Detect outliers using Interquartile Range (IQR) method""" if column not in self.numeric_columns: return {} series = self.df[column].dropna() Q1 = series.quantile(0.25) Q3 = series.quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outlier_mask = (series < lower_bound) | (series > upper_bound) outlier_indices = series[outlier_mask].index.tolist() outlier_values = series[outlier_mask].tolist() return { 'method': 'IQR', 'lower_bound': lower_bound, 'upper_bound': upper_bound, 'outlier_indices': outlier_indices, 'outlier_values': outlier_values, 'outlier_count': len(outlier_indices), 'outlier_percentage': (len(outlier_indices) / len(series)) * 100, 'explanation': f"Values below {lower_bound:.2f} or above {upper_bound:.2f} are considered outliers" } def detect_zscore_outliers(self, column: str, threshold: float = 3) -> Dict[str, Any]: """Detect outliers using Z-score method""" if column not in self.numeric_columns: return {} series = self.df[column].dropna() z_scores = np.abs(stats.zscore(series)) outlier_mask = z_scores > threshold outlier_indices = series[outlier_mask].index.tolist() outlier_values = series[outlier_mask].tolist() outlier_zscores = z_scores[outlier_mask].tolist() return { 'method': 'Z-Score', 'threshold': threshold, 'outlier_indices': outlier_indices, 'outlier_values': outlier_values, 'outlier_zscores': outlier_zscores, 'outlier_count': len(outlier_indices), 'outlier_percentage': (len(outlier_indices) / len(series)) * 100, 'explanation': f"Values with |z-score| > {threshold} are considered outliers" } def detect_modified_zscore_outliers(self, column: str, threshold: float = 3.5) -> Dict[str, Any]: """Detect outliers using Modified Z-score (MAD) method""" if column not in self.numeric_columns: return {} series = self.df[column].dropna() median = series.median() mad = stats.median_abs_deviation(series) if mad == 0: return { 'method': 'Modified Z-Score', 'outlier_count': 0, 'outlier_percentage': 0, 'explanation': "MAD is zero - no outliers detected using this method" } modified_z_scores = 0.6745 * (series - median) / mad outlier_mask = np.abs(modified_z_scores) > threshold outlier_indices = series[outlier_mask].index.tolist() outlier_values = series[outlier_mask].tolist() outlier_scores = modified_z_scores[outlier_mask].tolist() return { 'method': 'Modified Z-Score', 'threshold': threshold, 'median': median, 'mad': mad, 'outlier_indices': outlier_indices, 'outlier_values': outlier_values, 'outlier_scores': outlier_scores, 'outlier_count': len(outlier_indices), 'outlier_percentage': (len(outlier_indices) / len(series)) * 100, 'explanation': f"Values with |modified z-score| > {threshold} are considered outliers (robust to extreme values)" } def detect_isolation_forest_outliers(self, columns: List[str], contamination: float = 0.1) -> Dict[str, Any]: """Detect multivariate outliers using Isolation Forest""" if not columns or len(columns) < 1: return {} # Filter to only numeric columns that exist valid_columns = [col for col in columns if col in self.numeric_columns] if not valid_columns: return {} # Prepare data data = self.df[valid_columns].dropna() if len(data) < 10: # Need minimum data points return {} # Standardize the data scaler = StandardScaler() scaled_data = scaler.fit_transform(data) # Fit Isolation Forest iso_forest = IsolationForest(contamination=contamination, random_state=42) outlier_labels = iso_forest.fit_predict(scaled_data) # Get outlier indices and scores outlier_mask = outlier_labels == -1 outlier_indices = data[outlier_mask].index.tolist() outlier_scores = iso_forest.score_samples(scaled_data) outlier_score_values = outlier_scores[outlier_mask].tolist() return { 'method': 'Isolation Forest', 'contamination': contamination, 'columns_used': valid_columns, 'outlier_indices': outlier_indices, 'outlier_scores': outlier_score_values, 'outlier_count': len(outlier_indices), 'outlier_percentage': (len(outlier_indices) / len(data)) * 100, 'explanation': f"Multivariate outlier detection using {len(valid_columns)} features with {contamination*100}% expected contamination" } def detect_dbscan_outliers(self, columns: List[str], eps: float = 0.5, min_samples: int = 5) -> Dict[str, Any]: """Detect outliers using DBSCAN clustering""" if not columns or len(columns) < 1: return {} # Filter to only numeric columns that exist valid_columns = [col for col in columns if col in self.numeric_columns] if not valid_columns: return {} # Prepare data data = self.df[valid_columns].dropna() if len(data) < min_samples * 2: # Need minimum data points return {} # Standardize the data scaler = StandardScaler() scaled_data = scaler.fit_transform(data) # Apply DBSCAN dbscan = DBSCAN(eps=eps, min_samples=min_samples) cluster_labels = dbscan.fit_predict(scaled_data) # Points labeled as -1 are outliers outlier_mask = cluster_labels == -1 outlier_indices = data[outlier_mask].index.tolist() # Count clusters n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0) return { 'method': 'DBSCAN', 'eps': eps, 'min_samples': min_samples, 'columns_used': valid_columns, 'n_clusters': n_clusters, 'outlier_indices': outlier_indices, 'outlier_count': len(outlier_indices), 'outlier_percentage': (len(outlier_indices) / len(data)) * 100, 'explanation': f"Density-based outlier detection found {n_clusters} clusters using {len(valid_columns)} features" } def analyze_outliers(self, selected_columns: List[str] = None, methods: List[str] = None) -> Dict[str, Any]: """Comprehensive outlier analysis""" if self.df is None: return {} if selected_columns is None: selected_columns = self.numeric_columns else: # Filter to only numeric columns selected_columns = [col for col in selected_columns if col in self.numeric_columns] if not selected_columns: return {} if methods is None: methods = ['IQR', 'Z-Score', 'Modified Z-Score', 'Isolation Forest'] results = {} # Single-column methods for column in selected_columns: results[column] = {} if 'IQR' in methods: results[column]['IQR'] = self.detect_iqr_outliers(column) if 'Z-Score' in methods: results[column]['Z-Score'] = self.detect_zscore_outliers(column) if 'Modified Z-Score' in methods: results[column]['Modified Z-Score'] = self.detect_modified_zscore_outliers(column) # Multi-column methods if len(selected_columns) > 1: if 'Isolation Forest' in methods: results['Multivariate'] = {} results['Multivariate']['Isolation Forest'] = self.detect_isolation_forest_outliers(selected_columns) if 'DBSCAN' in methods: if 'Multivariate' not in results: results['Multivariate'] = {} results['Multivariate']['DBSCAN'] = self.detect_dbscan_outliers(selected_columns) self.outlier_results = results return results def generate_outlier_report(self) -> str: """Generate comprehensive outlier analysis report""" if not self.outlier_results: return "No outlier analysis results available. Please run the analysis first." report = "#Outlier Detection Report\n\n" total_outliers_by_method = {} all_outlier_indices = set() for column, methods in self.outlier_results.items(): if column == 'Multivariate': continue for method, result in methods.items(): if isinstance(result, dict) and 'outlier_count' in result: total_outliers_by_method.setdefault(method, 0) total_outliers_by_method[method] += result['outlier_count'] if 'outlier_indices' in result: all_outlier_indices.update(result['outlier_indices']) if 'Multivariate' in self.outlier_results: for method, result in self.outlier_results['Multivariate'].items(): if isinstance(result, dict) and 'outlier_count' in result: total_outliers_by_method[method] = result['outlier_count'] if 'outlier_indices' in result: all_outlier_indices.update(result['outlier_indices']) report += "## Summary\n" report += f"- **Total rows analyzed:** {len(self.df):,}\n" report += f"- **Unique outlier rows found:** {len(all_outlier_indices)}\n" report += f"- **Percentage of outlier rows:** {(len(all_outlier_indices)/len(self.df)*100):.2f}%\n\n" report += "### Outliers by Method:\n" for method, count in total_outliers_by_method.items(): report += f"- **{method}:** {count} outliers\n" report += "\n## Detailed Results\n\n" for column, methods in self.outlier_results.items(): if column == 'Multivariate': continue report += f"### Column: `{column}`\n\n" for method, result in methods.items(): if not isinstance(result, dict) or ('outlier_count' in result and result['outlier_count'] == 0): report += f"**{method}:** No outliers detected\n" continue report += f"**{method}:**\n" report += f"- Outliers found: {result['outlier_count']} ({result['outlier_percentage']:.2f}%)\n" report += f"- Explanation: {result['explanation']}\n" if 'outlier_values' in result and result['outlier_values']: sample_values = result['outlier_values'][:5] formatted_values = ', '.join([f'{v:.3f}' if isinstance(v, (int, float)) else str(v) for v in sample_values]) report += f"- Example outliers: {formatted_values}" if len(result['outlier_values']) > 5: report += f" (and {len(result['outlier_values']) - 5} more)" report += "\n" report += "\n" if 'Multivariate' in self.outlier_results: report += "### Multivariate Analysis\n\n" for method, result in self.outlier_results['Multivariate'].items(): if not isinstance(result, dict) or 'outlier_count' not in result: continue report += f"**{method}:**\n" report += f"- Outliers found: {result['outlier_count']} ({result['outlier_percentage']:.2f}%)\n" report += f"- Explanation: {result['explanation']}\n\n" return report if __name__ == "__main__": def run_outlier_detection(file): detector = OutlierDetective() df = detector.load_data(file.name) detector.analyze_outliers() return detector.generate_outlier_report() iface = gr.Interface(fn=run_outlier_detection, inputs=gr.File(label="Upload a dataset"), outputs="text", title="Outlier Detection App") iface.launch()