Spaces:
Sleeping
Sleeping
import os | |
import uuid | |
import json | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from flask import Flask, request, jsonify, send_file | |
from flask_cors import CORS | |
from werkzeug.utils import secure_filename | |
import threading | |
import time | |
import logging | |
from scipy import stats | |
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster | |
from sklearn.model_selection import train_test_split, cross_val_score | |
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler | |
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor | |
from sklearn.linear_model import LinearRegression, LogisticRegression, Ridge, Lasso | |
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering | |
from sklearn.decomposition import PCA | |
from sklearn.metrics import mean_squared_error, r2_score, classification_report, confusion_matrix | |
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression | |
import matplotlib | |
matplotlib.use('Agg') | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from plotly.utils import PlotlyJSONEncoder | |
import io | |
import base64 | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import atexit | |
import warnings | |
warnings.filterwarnings('ignore') | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
CORS(app) | |
# Configuration | |
UPLOAD_FOLDER = '/tmp/uploads' | |
PROCESSED_FOLDER = '/tmp/processed' | |
MODELS_FOLDER = '/tmp/models' | |
MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB for enterprise | |
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv', 'feather'} | |
FILE_EXPIRY_HOURS = 24 # Extended for enterprise use | |
# Ensure directories exist | |
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]: | |
os.makedirs(folder, exist_ok=True) | |
# Enhanced file storage with metadata | |
file_storage = {} | |
model_storage = {} | |
analysis_history = {} | |
class EnterpriseAnalytics: | |
"""Enterprise-grade analytics engine""" | |
def __init__(self): | |
self.scaler = StandardScaler() | |
self.models = {} | |
def advanced_data_profiling(self, df): | |
"""Comprehensive data profiling like enterprise tools""" | |
profile = { | |
'dataset_overview': { | |
'rows': len(df), | |
'columns': len(df.columns), | |
'memory_usage': df.memory_usage(deep=True).sum(), | |
'duplicate_rows': df.duplicated().sum() | |
}, | |
'column_analysis': {}, | |
'data_quality': {}, | |
'relationships': {}, | |
'recommendations': [] | |
} | |
for col in df.columns: | |
col_data = df[col] | |
col_profile = { | |
'dtype': str(col_data.dtype), | |
'missing_count': col_data.isnull().sum(), | |
'missing_percentage': (col_data.isnull().sum() / len(df)) * 100, | |
'unique_values': col_data.nunique(), | |
'cardinality': col_data.nunique() / len(df) if len(df) > 0 else 0 | |
} | |
if pd.api.types.is_numeric_dtype(col_data): | |
col_profile.update({ | |
'statistics': { | |
'mean': col_data.mean(), | |
'median': col_data.median(), | |
'std': col_data.std(), | |
'min': col_data.min(), | |
'max': col_data.max(), | |
'q25': col_data.quantile(0.25), | |
'q75': col_data.quantile(0.75), | |
'skewness': stats.skew(col_data.dropna()), | |
'kurtosis': stats.kurtosis(col_data.dropna()) | |
}, | |
'distribution': 'normal' if abs(stats.skew(col_data.dropna())) < 0.5 else 'skewed' | |
}) | |
else: | |
col_profile.update({ | |
'top_categories': col_data.value_counts().head(10).to_dict(), | |
'category_distribution': 'uniform' if col_data.value_counts().std() < col_data.value_counts().mean() * 0.5 else 'imbalanced' | |
}) | |
profile['column_analysis'][col] = col_profile | |
# Data quality assessment | |
profile['data_quality'] = { | |
'completeness_score': (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100, | |
'uniqueness_score': (df.nunique().sum() / (len(df) * len(df.columns))) * 100, | |
'consistency_score': self._calculate_consistency_score(df) | |
} | |
# Generate recommendations | |
profile['recommendations'] = self._generate_recommendations(df, profile) | |
return profile | |
def _calculate_consistency_score(self, df): | |
"""Calculate data consistency score""" | |
score = 100 | |
for col in df.select_dtypes(include=['object']): | |
# Check for inconsistent formatting | |
values = df[col].dropna().astype(str) | |
if len(values) > 0: | |
# Check for mixed case | |
if len(set([v.lower() for v in values])) != len(set(values)): | |
score -= 5 | |
# Check for leading/trailing spaces | |
if any(v != v.strip() for v in values): | |
score -= 5 | |
return max(0, score) | |
def _generate_recommendations(self, df, profile): | |
"""Generate actionable recommendations""" | |
recommendations = [] | |
# High missing value columns | |
for col, analysis in profile['column_analysis'].items(): | |
if analysis['missing_percentage'] > 20: | |
recommendations.append({ | |
'type': 'data_quality', | |
'priority': 'high', | |
'message': f"Column '{col}' has {analysis['missing_percentage']:.1f}% missing values. Consider imputation or removal.", | |
'action': 'handle_missing_values' | |
}) | |
# High cardinality categorical columns | |
for col, analysis in profile['column_analysis'].items(): | |
if analysis.get('cardinality', 0) > 0.8 and df[col].dtype == 'object': | |
recommendations.append({ | |
'type': 'feature_engineering', | |
'priority': 'medium', | |
'message': f"Column '{col}' has high cardinality. Consider feature encoding or dimensionality reduction.", | |
'action': 'encode_categorical' | |
}) | |
# Skewed distributions | |
for col, analysis in profile['column_analysis'].items(): | |
if 'statistics' in analysis and abs(analysis['statistics']['skewness']) > 2: | |
recommendations.append({ | |
'type': 'data_transformation', | |
'priority': 'medium', | |
'message': f"Column '{col}' is highly skewed. Consider log transformation or scaling.", | |
'action': 'transform_distribution' | |
}) | |
return recommendations | |
def advanced_feature_engineering(self, df, target_column=None): | |
"""Enterprise-level feature engineering""" | |
engineered_features = {} | |
# Numeric feature engineering | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
for col in numeric_cols: | |
if col != target_column: | |
# Polynomial features | |
engineered_features[f'{col}_squared'] = df[col] ** 2 | |
engineered_features[f'{col}_log'] = np.log1p(df[col].abs()) | |
# Binning | |
engineered_features[f'{col}_binned'] = pd.cut(df[col], bins=5, labels=False) | |
# Rolling statistics (if data has time component) | |
if len(df) > 10: | |
engineered_features[f'{col}_rolling_mean'] = df[col].rolling(window=min(5, len(df)//2)).mean() | |
# Categorical feature engineering | |
categorical_cols = df.select_dtypes(include=['object']).columns | |
for col in categorical_cols: | |
if col != target_column: | |
# Frequency encoding | |
freq_map = df[col].value_counts().to_dict() | |
engineered_features[f'{col}_frequency'] = df[col].map(freq_map) | |
# Target encoding (if target is provided) | |
if target_column and target_column in df.columns: | |
target_mean = df.groupby(col)[target_column].mean() | |
engineered_features[f'{col}_target_encoded'] = df[col].map(target_mean) | |
# Interaction features | |
if len(numeric_cols) >= 2: | |
col_pairs = [(numeric_cols[i], numeric_cols[j]) | |
for i in range(len(numeric_cols)) | |
for j in range(i+1, min(i+3, len(numeric_cols)))] # Limit combinations | |
for col1, col2 in col_pairs: | |
if col1 != target_column and col2 != target_column: | |
engineered_features[f'{col1}_{col2}_interaction'] = df[col1] * df[col2] | |
engineered_features[f'{col1}_{col2}_ratio'] = df[col1] / (df[col2] + 1e-8) | |
return pd.DataFrame(engineered_features, index=df.index) | |
def automated_ml_pipeline(self, df, target_column, problem_type='auto'): | |
"""Enterprise AutoML pipeline""" | |
results = { | |
'preprocessing': {}, | |
'feature_selection': {}, | |
'models': {}, | |
'best_model': {}, | |
'predictions': {}, | |
'feature_importance': {} | |
} | |
# Determine problem type | |
if problem_type == 'auto': | |
if df[target_column].dtype in ['object', 'category'] or df[target_column].nunique() < 10: | |
problem_type = 'classification' | |
else: | |
problem_type = 'regression' | |
# Preprocessing | |
feature_cols = [col for col in df.columns if col != target_column] | |
X = df[feature_cols].copy() | |
y = df[target_column].copy() | |
# Handle missing values | |
X_numeric = X.select_dtypes(include=[np.number]) | |
X_categorical = X.select_dtypes(include=['object']) | |
if not X_numeric.empty: | |
X_numeric = X_numeric.fillna(X_numeric.median()) | |
if not X_categorical.empty: | |
X_categorical = X_categorical.fillna(X_categorical.mode().iloc[0] if not X_categorical.mode().empty else 'Unknown') | |
# Encode categorical variables | |
if not X_categorical.empty: | |
le = LabelEncoder() | |
for col in X_categorical.columns: | |
X_categorical[col] = le.fit_transform(X_categorical[col].astype(str)) | |
X_processed = pd.concat([X_numeric, X_categorical], axis=1) | |
# Handle target variable for classification | |
if problem_type == 'classification' and y.dtype == 'object': | |
le_target = LabelEncoder() | |
y = le_target.fit_transform(y) | |
# Feature selection | |
if len(X_processed.columns) > 10: | |
selector = SelectKBest(f_regression, k=min(10, len(X_processed.columns))) | |
X_selected = selector.fit_transform(X_processed, y) | |
selected_features = X_processed.columns[selector.get_support()].tolist() | |
X_processed = pd.DataFrame(X_selected, columns=selected_features) | |
results['feature_selection']['selected_features'] = selected_features | |
# Split data | |
X_train, X_test, y_train, y_test = train_test_split( | |
X_processed, y, test_size=0.2, random_state=42 | |
) | |
# Scale features | |
scaler = StandardScaler() | |
X_train_scaled = scaler.fit_transform(X_train) | |
X_test_scaled = scaler.transform(X_test) | |
# Model selection based on problem type | |
if problem_type == 'regression': | |
models = { | |
'Linear Regression': LinearRegression(), | |
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42), | |
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42), | |
'Ridge Regression': Ridge() | |
} | |
else: | |
models = { | |
'Logistic Regression': LogisticRegression(random_state=42), | |
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), | |
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42) | |
} | |
# Train and evaluate models | |
best_score = -np.inf if problem_type == 'regression' else 0 | |
best_model_name = None | |
for name, model in models.items(): | |
try: | |
# Cross-validation | |
if problem_type == 'regression': | |
scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2') | |
score = scores.mean() | |
else: | |
scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='accuracy') | |
score = scores.mean() | |
# Train final model | |
model.fit(X_train_scaled, y_train) | |
y_pred = model.predict(X_test_scaled) | |
if problem_type == 'regression': | |
test_score = r2_score(y_test, y_pred) | |
mse = mean_squared_error(y_test, y_pred) | |
results['models'][name] = { | |
'cv_score': score, | |
'test_r2': test_score, | |
'test_mse': mse, | |
'predictions': y_pred.tolist() | |
} | |
else: | |
test_score = model.score(X_test_scaled, y_test) | |
results['models'][name] = { | |
'cv_score': score, | |
'test_accuracy': test_score, | |
'predictions': y_pred.tolist() | |
} | |
# Track best model | |
if score > best_score: | |
best_score = score | |
best_model_name = name | |
# Feature importance | |
if hasattr(model, 'feature_importances_'): | |
importance = dict(zip(X_processed.columns, model.feature_importances_)) | |
results['feature_importance'] = dict(sorted(importance.items(), key=lambda x: x[1], reverse=True)) | |
except Exception as e: | |
logger.error(f"Error training {name}: {str(e)}") | |
continue | |
results['best_model'] = { | |
'name': best_model_name, | |
'score': best_score, | |
'problem_type': problem_type | |
} | |
results['preprocessing'] = { | |
'numeric_features': X_numeric.columns.tolist() if not X_numeric.empty else [], | |
'categorical_features': X_categorical.columns.tolist() if not X_categorical.empty else [], | |
'scaling_applied': True, | |
'missing_values_handled': True | |
} | |
return results | |
def advanced_clustering_analysis(self, df, n_clusters=None): | |
"""Enterprise clustering with multiple algorithms""" | |
# Prepare data | |
numeric_df = df.select_dtypes(include=[np.number]) | |
if numeric_df.empty: | |
raise ValueError("No numeric columns for clustering") | |
# Handle missing values | |
numeric_df = numeric_df.fillna(numeric_df.median()) | |
# Scale data | |
scaler = StandardScaler() | |
X_scaled = scaler.fit_transform(numeric_df) | |
results = { | |
'algorithms': {}, | |
'optimal_clusters': {}, | |
'silhouette_scores': {}, | |
'recommendations': [] | |
} | |
# Determine optimal number of clusters if not provided | |
if n_clusters is None: | |
# Elbow method for K-means | |
inertias = [] | |
k_range = range(2, min(11, len(numeric_df) // 2)) | |
for k in k_range: | |
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10) | |
kmeans.fit(X_scaled) | |
inertias.append(kmeans.inertia_) | |
# Find elbow point (simplified) | |
if len(inertias) > 2: | |
diffs = np.diff(inertias) | |
second_diffs = np.diff(diffs) | |
n_clusters = k_range[np.argmax(second_diffs) + 1] if len(second_diffs) > 0 else 3 | |
else: | |
n_clusters = 3 | |
# Apply multiple clustering algorithms | |
algorithms = { | |
'K-Means': KMeans(n_clusters=n_clusters, random_state=42, n_init=10), | |
'Hierarchical': AgglomerativeClustering(n_clusters=n_clusters), | |
'DBSCAN': DBSCAN(eps=0.5, min_samples=5) | |
} | |
for name, algo in algorithms.items(): | |
try: | |
if name == 'DBSCAN': | |
labels = algo.fit_predict(X_scaled) | |
n_clusters_found = len(set(labels)) - (1 if -1 in labels else 0) | |
else: | |
labels = algo.fit_predict(X_scaled) | |
n_clusters_found = n_clusters | |
# Calculate silhouette score | |
if len(set(labels)) > 1: | |
from sklearn.metrics import silhouette_score | |
sil_score = silhouette_score(X_scaled, labels) | |
else: | |
sil_score = 0 | |
results['algorithms'][name] = { | |
'labels': labels.tolist(), | |
'n_clusters': n_clusters_found, | |
'silhouette_score': sil_score | |
} | |
results['silhouette_scores'][name] = sil_score | |
except Exception as e: | |
logger.error(f"Error in {name} clustering: {str(e)}") | |
continue | |
# PCA for visualization | |
if len(numeric_df.columns) > 2: | |
pca = PCA(n_components=2) | |
X_pca = pca.fit_transform(X_scaled) | |
results['pca_components'] = X_pca.tolist() | |
results['pca_explained_variance'] = pca.explained_variance_ratio_.tolist() | |
# Generate recommendations | |
best_algo = max(results['silhouette_scores'].items(), key=lambda x: x[1])[0] | |
results['recommendations'].append({ | |
'type': 'clustering', | |
'message': f"Best clustering algorithm: {best_algo} with silhouette score: {results['silhouette_scores'][best_algo]:.3f}", | |
'optimal_clusters': results['algorithms'][best_algo]['n_clusters'] | |
}) | |
return results | |
def time_series_analysis(self, df, date_column, value_column): | |
"""Advanced time series analysis""" | |
# Convert date column | |
df[date_column] = pd.to_datetime(df[date_column]) | |
df = df.sort_values(date_column) | |
# Set date as index | |
ts_df = df.set_index(date_column)[value_column] | |
results = { | |
'trend_analysis': {}, | |
'seasonality': {}, | |
'forecasting': {}, | |
'anomalies': {}, | |
'statistics': {} | |
} | |
# Basic statistics | |
results['statistics'] = { | |
'mean': ts_df.mean(), | |
'std': ts_df.std(), | |
'min': ts_df.min(), | |
'max': ts_df.max(), | |
'trend': 'increasing' if ts_df.iloc[-1] > ts_df.iloc[0] else 'decreasing' | |
} | |
# Trend analysis using linear regression | |
X = np.arange(len(ts_df)).reshape(-1, 1) | |
y = ts_df.values | |
lr = LinearRegression() | |
lr.fit(X, y) | |
trend_slope = lr.coef_[0] | |
results['trend_analysis'] = { | |
'slope': trend_slope, | |
'direction': 'increasing' if trend_slope > 0 else 'decreasing', | |
'strength': abs(trend_slope) | |
} | |
# Simple anomaly detection using IQR | |
Q1 = ts_df.quantile(0.25) | |
Q3 = ts_df.quantile(0.75) | |
IQR = Q3 - Q1 | |
anomalies = ts_df[(ts_df < Q1 - 1.5 * IQR) | (ts_df > Q3 + 1.5 * IQR)] | |
results['anomalies'] = { | |
'count': len(anomalies), | |
'dates': anomalies.index.strftime('%Y-%m-%d').tolist(), | |
'values': anomalies.values.tolist() | |
} | |
# Simple forecasting (moving average) | |
window = min(7, len(ts_df) // 4) | |
if window > 0: | |
forecast_periods = min(10, len(ts_df) // 4) | |
last_values = ts_df.tail(window).mean() | |
results['forecasting'] = { | |
'method': 'moving_average', | |
'forecast_periods': forecast_periods, | |
'forecast_values': [last_values] * forecast_periods | |
} | |
return results | |
# Initialize analytics engine | |
analytics_engine = EnterpriseAnalytics() | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def cleanup_old_files(): | |
"""Enhanced cleanup with model cleanup""" | |
try: | |
# Existing cleanup logic... | |
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]: | |
for root, dirs, files in os.walk(folder): | |
for file in files: | |
filepath = os.path.join(root, file) | |
if get_file_age(filepath) > FILE_EXPIRY_HOURS: | |
os.remove(filepath) | |
logger.info(f"Cleaned up old file: {filepath}") | |
# Clean up storage entries | |
current_time = datetime.now() | |
for storage in [file_storage, model_storage, analysis_history]: | |
sessions_to_remove = [] | |
for session_id, session_data in storage.items(): | |
if isinstance(session_data, dict): | |
items_to_remove = [] | |
for item_id, item_info in session_data.items(): | |
if 'timestamp' in item_info: | |
item_time = datetime.fromisoformat(item_info['timestamp']) | |
if (current_time - item_time).total_seconds() > FILE_EXPIRY_HOURS * 3600: | |
items_to_remove.append(item_id) | |
for item_id in items_to_remove: | |
del session_data[item_id] | |
if not session_data: | |
sessions_to_remove.append(session_id) | |
for session_id in sessions_to_remove: | |
del storage[session_id] | |
except Exception as e: | |
logger.error(f"Error during cleanup: {str(e)}") | |
def get_file_age(filepath): | |
"""Get file age in hours""" | |
if os.path.exists(filepath): | |
file_time = os.path.getmtime(filepath) | |
return (time.time() - file_time) / 3600 | |
return float('inf') | |
def load_data_file(filepath, filename): | |
"""Enhanced data loading with more formats""" | |
try: | |
file_ext = filename.rsplit('.', 1)[1].lower() | |
if file_ext == 'csv': | |
return pd.read_csv(filepath) | |
elif file_ext in ['xlsx', 'xls']: | |
return pd.read_excel(filepath) | |
elif file_ext == 'json': | |
return pd.read_json(filepath) | |
elif file_ext == 'parquet': | |
return pd.read_parquet(filepath) | |
elif file_ext == 'tsv': | |
return pd.read_csv(filepath, sep='\t') | |
elif file_ext == 'feather': | |
return pd.read_feather(filepath) | |
else: | |
raise ValueError(f"Unsupported file format: {file_ext}") | |
except Exception as e: | |
raise Exception(f"Error loading file: {str(e)}") | |
# Setup enhanced scheduler | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(func=cleanup_old_files, trigger="interval", hours=1) | |
scheduler.start() | |
atexit.register(lambda: scheduler.shutdown()) | |
# API Endpoints | |
def health_check(): | |
return jsonify({ | |
'status': 'healthy', | |
'version': '2.0.0-enterprise', | |
'features': ['advanced_profiling', 'automl', 'clustering', 'time_series'], | |
'timestamp': datetime.now().isoformat() | |
}) | |
def upload_file(): | |
try: | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file provided'}), 400 | |
file = request.files['file'] | |
session_id = request.form.get('sessionId') | |
if not session_id: | |
return jsonify({'error': 'Session ID required'}), 400 | |
if file.filename == '': | |
return jsonify({'error': 'No file selected'}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({'error': 'File type not supported'}), 400 | |
# Check file size | |
file.seek(0, 2) | |
file_size = file.tell() | |
file.seek(0) | |
if file_size > MAX_FILE_SIZE: | |
return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400 | |
# Generate unique file ID and secure filename | |
file_id = str(uuid.uuid4()) | |
filename = secure_filename(file.filename) | |
# Create session directory | |
session_dir = os.path.join(UPLOAD_FOLDER, session_id) | |
os.makedirs(session_dir, exist_ok=True) | |
# Save file | |
filepath = os.path.join(session_dir, f"{file_id}_{filename}") | |
file.save(filepath) | |
# Enhanced file metadata | |
if session_id not in file_storage: | |
file_storage[session_id] = {} | |
file_storage[session_id][file_id] = { | |
'filename': filename, | |
'filepath': filepath, | |
'size': file_size, | |
'timestamp': datetime.now().isoformat(), | |
'format': filename.rsplit('.', 1)[1].lower(), | |
'status': 'uploaded' | |
} | |
return jsonify({ | |
'fileId': file_id, | |
'filename': filename, | |
'size': file_size, | |
'format': filename.rsplit('.', 1)[1].lower(), | |
'message': 'File uploaded successfully' | |
}) | |
except Exception as e: | |
logger.error(f"Upload error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def profile_data(file_id): | |
"""Advanced data profiling endpoint""" | |
try: | |
session_id = request.args.get('sessionId') | |
if not session_id or session_id not in file_storage: | |
return jsonify({'error': 'Invalid session'}), 400 | |
if file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Perform advanced profiling | |
profile = analytics_engine.advanced_data_profiling(df) | |
return jsonify(profile) | |
except Exception as e: | |
logger.error(f"Profiling error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def run_automl(): | |
"""Automated ML pipeline endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
target_column = data.get('targetColumn') | |
problem_type = data.get('problemType', 'auto') | |
if not all([session_id, file_id, target_column]): | |
return jsonify({'error': 'Session ID, File ID, and target column required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
if target_column not in df.columns: | |
return jsonify({'error': f'Target column {target_column} not found'}), 400 | |
# Run AutoML pipeline | |
results = analytics_engine.automated_ml_pipeline(df, target_column, problem_type) | |
# Save results | |
result_id = str(uuid.uuid4()) | |
result_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
os.makedirs(result_dir, exist_ok=True) | |
result_filepath = os.path.join(result_dir, f"{result_id}_automl.json") | |
with open(result_filepath, 'w') as f: | |
json.dump(results, f, indent=2, default=str) | |
return jsonify({ | |
'resultId': result_id, | |
'results': results, | |
'analysisType': 'automl', | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"AutoML error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def run_clustering(): | |
"""Advanced clustering analysis endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
n_clusters = data.get('nClusters') | |
if not all([session_id, file_id]): | |
return jsonify({'error': 'Session ID and File ID required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Run clustering analysis | |
results = analytics_engine.advanced_clustering_analysis(df, n_clusters) | |
# Save results | |
result_id = str(uuid.uuid4()) | |
result_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
os.makedirs(result_dir, exist_ok=True) | |
result_filepath = os.path.join(result_dir, f"{result_id}_clustering.json") | |
with open(result_filepath, 'w') as f: | |
json.dump(results, f, indent=2, default=str) | |
return jsonify({ | |
'resultId': result_id, | |
'results': results, | |
'analysisType': 'clustering', | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Clustering error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def run_timeseries(): | |
"""Time series analysis endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
date_column = data.get('dateColumn') | |
value_column = data.get('valueColumn') | |
if not all([session_id, file_id, date_column, value_column]): | |
return jsonify({'error': 'Session ID, File ID, date column, and value column required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
if date_column not in df.columns or value_column not in df.columns: | |
return jsonify({'error': 'Date or value column not found'}), 400 | |
# Run time series analysis | |
results = analytics_engine.time_series_analysis(df, date_column, value_column) | |
# Save results | |
result_id = str(uuid.uuid4()) | |
result_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
os.makedirs(result_dir, exist_ok=True) | |
result_filepath = os.path.join(result_dir, f"{result_id}_timeseries.json") | |
with open(result_filepath, 'w') as f: | |
json.dump(results, f, indent=2, default=str) | |
return jsonify({ | |
'resultId': result_id, | |
'results': results, | |
'analysisType': 'timeseries', | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Time series error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def run_feature_engineering(): | |
"""Feature engineering endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
target_column = data.get('targetColumn') | |
if not all([session_id, file_id]): | |
return jsonify({'error': 'Session ID and File ID required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Generate engineered features | |
engineered_df = analytics_engine.advanced_feature_engineering(df, target_column) | |
# Save engineered dataset | |
engineered_file_id = str(uuid.uuid4()) | |
engineered_filepath = os.path.join( | |
PROCESSED_FOLDER, session_id, f"{engineered_file_id}_engineered.csv" | |
) | |
os.makedirs(os.path.dirname(engineered_filepath), exist_ok=True) | |
# Combine original and engineered features | |
combined_df = pd.concat([df, engineered_df], axis=1) | |
combined_df.to_csv(engineered_filepath, index=False) | |
# Store engineered file info | |
if session_id not in file_storage: | |
file_storage[session_id] = {} | |
file_storage[session_id][engineered_file_id] = { | |
'filename': f"{file_info['filename'].split('.')[0]}_engineered.csv", | |
'filepath': engineered_filepath, | |
'size': os.path.getsize(engineered_filepath), | |
'timestamp': datetime.now().isoformat(), | |
'format': 'csv', | |
'status': 'engineered', | |
'parent_file': file_id | |
} | |
return jsonify({ | |
'engineeredFileId': engineered_file_id, | |
'originalFeatures': len(df.columns), | |
'engineeredFeatures': len(engineered_df.columns), | |
'totalFeatures': len(combined_df.columns), | |
'featureNames': engineered_df.columns.tolist(), | |
'message': 'Feature engineering completed successfully' | |
}) | |
except Exception as e: | |
logger.error(f"Feature engineering error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def create_advanced_visualization(): | |
"""Advanced visualization endpoint with Plotly""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
chart_type = data.get('chartType') | |
parameters = data.get('parameters', {}) | |
if not all([session_id, file_id, chart_type]): | |
return jsonify({'error': 'Session ID, File ID, and chart type required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Create advanced visualizations using Plotly | |
if chart_type == 'correlation_heatmap': | |
numeric_df = df.select_dtypes(include=[np.number]) | |
corr_matrix = numeric_df.corr() | |
fig = px.imshow(corr_matrix, | |
title='Correlation Heatmap', | |
color_continuous_scale='RdBu_r', | |
aspect='auto') | |
elif chart_type == 'distribution_plots': | |
column = parameters.get('column') | |
if not column or column not in df.columns: | |
return jsonify({'error': 'Column not specified or not found'}), 400 | |
fig = px.histogram(df, x=column, | |
title=f'Distribution of {column}', | |
marginal='box') | |
elif chart_type == 'scatter_matrix': | |
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:4]) | |
fig = px.scatter_matrix(df[columns], | |
title='Scatter Matrix', | |
dimensions=columns) | |
elif chart_type == 'parallel_coordinates': | |
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5]) | |
fig = px.parallel_coordinates(df, | |
dimensions=columns, | |
title='Parallel Coordinates Plot') | |
elif chart_type == 'box_plots': | |
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5]) | |
fig = px.box(df[columns], | |
title='Box Plots Comparison') | |
elif chart_type == '3d_scatter': | |
x_col = parameters.get('x_column') | |
y_col = parameters.get('y_column') | |
z_col = parameters.get('z_column') | |
if not all([x_col, y_col, z_col]): | |
return jsonify({'error': '3D scatter requires x, y, and z columns'}), 400 | |
fig = px.scatter_3d(df, x=x_col, y=y_col, z=z_col, | |
title=f'3D Scatter: {x_col} vs {y_col} vs {z_col}') | |
else: | |
return jsonify({'error': 'Unsupported chart type'}), 400 | |
# Convert to JSON | |
chart_json = json.dumps(fig, cls=PlotlyJSONEncoder) | |
return jsonify({ | |
'chart': chart_json, | |
'chartType': chart_type, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Visualization error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def assess_data_quality(): | |
"""Data quality assessment endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
if not all([session_id, file_id]): | |
return jsonify({'error': 'Session ID and File ID required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
quality_report = { | |
'overall_score': 0, | |
'dimensions': { | |
'completeness': {}, | |
'consistency': {}, | |
'validity': {}, | |
'uniqueness': {}, | |
'accuracy': {} | |
}, | |
'issues': [], | |
'recommendations': [] | |
} | |
# Completeness assessment | |
total_cells = len(df) * len(df.columns) | |
missing_cells = df.isnull().sum().sum() | |
completeness_score = ((total_cells - missing_cells) / total_cells) * 100 | |
quality_report['dimensions']['completeness'] = { | |
'score': completeness_score, | |
'missing_values': df.isnull().sum().to_dict(), | |
'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict() | |
} | |
# Consistency assessment | |
consistency_issues = [] | |
for col in df.select_dtypes(include=['object']): | |
# Check for inconsistent formatting | |
values = df[col].dropna().astype(str) | |
if len(values) > 0: | |
# Mixed case issues | |
lowercase_values = set(v.lower() for v in values) | |
if len(lowercase_values) != len(set(values)): | |
consistency_issues.append(f"Column '{col}' has mixed case values") | |
# Leading/trailing spaces | |
if any(v != v.strip() for v in values): | |
consistency_issues.append(f"Column '{col}' has leading/trailing spaces") | |
consistency_score = max(0, 100 - len(consistency_issues) * 10) | |
quality_report['dimensions']['consistency'] = { | |
'score': consistency_score, | |
'issues': consistency_issues | |
} | |
# Validity assessment (basic data type validation) | |
validity_issues = [] | |
for col in df.columns: | |
if df[col].dtype == 'object': | |
# Check for potential numeric columns stored as strings | |
try: | |
pd.to_numeric(df[col].dropna(), errors='raise') | |
validity_issues.append(f"Column '{col}' appears to be numeric but stored as text") | |
except: | |
pass | |
validity_score = max(0, 100 - len(validity_issues) * 15) | |
quality_report['dimensions']['validity'] = { | |
'score': validity_score, | |
'issues': validity_issues | |
} | |
# Uniqueness assessment | |
uniqueness_scores = {} | |
for col in df.columns: | |
unique_ratio = df[col].nunique() / len(df) if len(df) > 0 else 0 | |
uniqueness_scores[col] = unique_ratio * 100 | |
avg_uniqueness = np.mean(list(uniqueness_scores.values())) | |
quality_report['dimensions']['uniqueness'] = { | |
'score': avg_uniqueness, | |
'column_scores': uniqueness_scores, | |
'duplicate_rows': df.duplicated().sum() | |
} | |
# Overall score calculation | |
dimension_scores = [ | |
completeness_score, | |
consistency_score, | |
validity_score, | |
avg_uniqueness | |
] | |
quality_report['overall_score'] = np.mean(dimension_scores) | |
# Generate recommendations | |
if completeness_score < 80: | |
quality_report['recommendations'].append({ | |
'type': 'completeness', | |
'priority': 'high', | |
'message': 'Consider imputing missing values or removing incomplete records' | |
}) | |
if consistency_score < 70: | |
quality_report['recommendations'].append({ | |
'type': 'consistency', | |
'priority': 'medium', | |
'message': 'Standardize text formatting and remove extra spaces' | |
}) | |
if validity_score < 80: | |
quality_report['recommendations'].append({ | |
'type': 'validity', | |
'priority': 'medium', | |
'message': 'Review data types and convert where appropriate' | |
}) | |
return jsonify(quality_report) | |
except Exception as e: | |
logger.error(f"Data quality error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def run_statistical_tests(): | |
"""Statistical hypothesis testing endpoint""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
test_type = data.get('testType') | |
parameters = data.get('parameters', {}) | |
if not all([session_id, file_id, test_type]): | |
return jsonify({'error': 'Session ID, File ID, and test type required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
results = {'test_type': test_type, 'results': {}} | |
if test_type == 'normality': | |
column = parameters.get('column') | |
if not column or column not in df.columns: | |
return jsonify({'error': 'Column not specified or not found'}), 400 | |
data_col = df[column].dropna() | |
# Shapiro-Wilk test | |
shapiro_stat, shapiro_p = stats.shapiro(data_col.sample(min(5000, len(data_col)))) | |
# Anderson-Darling test | |
anderson_result = stats.anderson(data_col) | |
results['results'] = { | |
'shapiro_wilk': { | |
'statistic': shapiro_stat, | |
'p_value': shapiro_p, | |
'is_normal': shapiro_p > 0.05 | |
}, | |
'anderson_darling': { | |
'statistic': anderson_result.statistic, | |
'critical_values': anderson_result.critical_values.tolist(), | |
'significance_levels': anderson_result.significance_level.tolist() | |
} | |
} | |
elif test_type == 'correlation_significance': | |
col1 = parameters.get('column1') | |
col2 = parameters.get('column2') | |
if not all([col1, col2]) or col1 not in df.columns or col2 not in df.columns: | |
return jsonify({'error': 'Both columns must be specified and exist'}), 400 | |
# Pearson correlation | |
pearson_corr, pearson_p = stats.pearsonr(df[col1].dropna(), df[col2].dropna()) | |
# Spearman correlation | |
spearman_corr, spearman_p = stats.spearmanr(df[col1].dropna(), df[col2].dropna()) | |
results['results'] = { | |
'pearson': { | |
'correlation': pearson_corr, | |
'p_value': pearson_p, | |
'significant': pearson_p < 0.05 | |
}, | |
'spearman': { | |
'correlation': spearman_corr, | |
'p_value': spearman_p, | |
'significant': spearman_p < 0.05 | |
} | |
} | |
elif test_type == 'group_comparison': | |
group_col = parameters.get('groupColumn') | |
value_col = parameters.get('valueColumn') | |
if not all([group_col, value_col]): | |
return jsonify({'error': 'Group and value columns required'}), 400 | |
groups = [group for name, group in df.groupby(group_col)[value_col] if len(group) > 1] | |
if len(groups) == 2: | |
# Two-sample t-test | |
t_stat, t_p = stats.ttest_ind(groups[0], groups[1]) | |
# Mann-Whitney U test | |
u_stat, u_p = stats.mannwhitneyu(groups[0], groups[1]) | |
results['results'] = { | |
'two_sample_ttest': { | |
'statistic': t_stat, | |
'p_value': t_p, | |
'significant': t_p < 0.05 | |
}, | |
'mann_whitney_u': { | |
'statistic': u_stat, | |
'p_value': u_p, | |
'significant': u_p < 0.05 | |
} | |
} | |
elif len(groups) > 2: | |
# ANOVA | |
f_stat, f_p = stats.f_oneway(*groups) | |
# Kruskal-Wallis test | |
h_stat, h_p = stats.kruskal(*groups) | |
results['results'] = { | |
'anova': { | |
'statistic': f_stat, | |
'p_value': f_p, | |
'significant': f_p < 0.05 | |
}, | |
'kruskal_wallis': { | |
'statistic': h_stat, | |
'p_value': h_p, | |
'significant': h_p < 0.05 | |
} | |
} | |
else: | |
return jsonify({'error': 'Unsupported test type'}), 400 | |
return jsonify(results) | |
except Exception as e: | |
logger.error(f"Statistical test error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def get_analysis_history(session_id): | |
"""Get analysis history for a session""" | |
try: | |
if session_id not in analysis_history: | |
return jsonify({'history': []}) | |
return jsonify({'history': list(analysis_history[session_id].values())}) | |
except Exception as e: | |
logger.error(f"History error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def export_analysis_report(): | |
"""Export comprehensive analysis report""" | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
analyses = data.get('analyses', []) # List of analysis result IDs | |
if not session_id: | |
return jsonify({'error': 'Session ID required'}), 400 | |
# Compile report | |
report = { | |
'session_id': session_id, | |
'generated_at': datetime.now().isoformat(), | |
'analyses': [], | |
'summary': { | |
'total_analyses': len(analyses), | |
'data_files_processed': len(file_storage.get(session_id, {})), | |
'recommendations': [] | |
} | |
} | |
# Load each analysis result | |
for analysis_id in analyses: | |
try: | |
result_files = [ | |
f for f in os.listdir(os.path.join(PROCESSED_FOLDER, session_id)) | |
if f.startswith(analysis_id) | |
] | |
if result_files: | |
filepath = os.path.join(PROCESSED_FOLDER, session_id, result_files[0]) | |
with open(filepath, 'r') as f: | |
analysis_data = json.load(f) | |
report['analyses'].append({ | |
'id': analysis_id, | |
'type': result_files[0].split('_')[1].split('.')[0], | |
'data': analysis_data | |
}) | |
except Exception as e: | |
logger.error(f"Error loading analysis {analysis_id}: {str(e)}") | |
continue | |
# Generate summary recommendations | |
if report['analyses']: | |
report['summary']['recommendations'] = [ | |
"Review data quality scores and address high-priority issues", | |
"Consider feature engineering for improved model performance", | |
"Validate statistical assumptions before drawing conclusions", | |
"Monitor model performance with cross-validation results" | |
] | |
# Save report | |
report_id = str(uuid.uuid4()) | |
report_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
os.makedirs(report_dir, exist_ok=True) | |
report_filepath = os.path.join(report_dir, f"{report_id}_report.json") | |
with open(report_filepath, 'w') as f: | |
json.dump(report, f, indent=2, default=str) | |
return jsonify({ | |
'reportId': report_id, | |
'message': 'Report generated successfully', | |
'downloadUrl': f'/api/download/{report_id}?sessionId={session_id}&format=json' | |
}) | |
except Exception as e: | |
logger.error(f"Report export error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
# Update existing endpoints with enhanced functionality | |
def preview_file(file_id): | |
try: | |
session_id = request.args.get('sessionId') | |
if not session_id or session_id not in file_storage: | |
return jsonify({'error': 'Invalid session'}), 400 | |
if file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Enhanced preview with data insights | |
preview_data = { | |
'basic_info': { | |
'columns': df.columns.tolist(), | |
'dtypes': df.dtypes.astype(str).to_dict(), | |
'shape': df.shape, | |
'memory_usage': df.memory_usage(deep=True).sum() | |
}, | |
'sample_data': { | |
'head': df.head(5).to_dict('records'), | |
'tail': df.tail(5).to_dict('records') | |
}, | |
'data_quality': { | |
'missing_values': df.isnull().sum().to_dict(), | |
'duplicate_rows': df.duplicated().sum(), | |
'unique_values': df.nunique().to_dict() | |
}, | |
'quick_stats': {} | |
} | |
# Quick statistics for numeric columns | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) > 0: | |
preview_data['quick_stats']['numeric'] = df[numeric_cols].describe().to_dict() | |
# Quick statistics for categorical columns | |
categorical_cols = df.select_dtypes(include=['object']).columns | |
if len(categorical_cols) > 0: | |
preview_data['quick_stats']['categorical'] = {} | |
for col in categorical_cols[:5]: # Limit to first 5 categorical columns | |
preview_data['quick_stats']['categorical'][col] = { | |
'top_values': df[col].value_counts().head(5).to_dict() | |
} | |
return jsonify(preview_data) | |
except Exception as e: | |
logger.error(f"Preview error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def home(): | |
return jsonify({ | |
'message': 'Enterprise Data Analytics Platform', | |
'version': '2.0.0-enterprise', | |
'features': { | |
'core': ['data_profiling', 'quality_assessment', 'statistical_tests'], | |
'machine_learning': ['automl', 'clustering', 'feature_engineering'], | |
'time_series': ['trend_analysis', 'forecasting', 'anomaly_detection'], | |
'visualization': ['advanced_charts', 'interactive_plots', 'correlation_heatmaps'], | |
'enterprise': ['report_generation', 'analysis_history', 'data_governance'] | |
}, | |
'endpoints': { | |
'data_management': ['/api/upload', '/api/preview/<file_id>', '/api/profile/<file_id>'], | |
'analytics': ['/api/automl', '/api/clustering', '/api/timeseries'], | |
'quality': ['/api/data-quality', '/api/statistical-tests'], | |
'visualization': ['/api/advanced-visualization'], | |
'enterprise': ['/api/export-report', '/api/analysis-history/<session_id>'] | |
}, | |
'timestamp': datetime.now().isoformat() | |
}) | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=False) # Production ready |