Data-analytics / app.py
mike23415's picture
Update app.py
415ccf1 verified
raw
history blame
55.7 kB
import os
import uuid
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
import threading
import time
import logging
from scipy import stats
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, LogisticRegression, Ridge, Lasso
from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error, r2_score, classification_report, confusion_matrix
from sklearn.feature_selection import SelectKBest, f_regression, mutual_info_regression
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.utils import PlotlyJSONEncoder
import io
import base64
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
import warnings
warnings.filterwarnings('ignore')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app)
# Configuration
UPLOAD_FOLDER = '/tmp/uploads'
PROCESSED_FOLDER = '/tmp/processed'
MODELS_FOLDER = '/tmp/models'
MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB for enterprise
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv', 'feather'}
FILE_EXPIRY_HOURS = 24 # Extended for enterprise use
# Ensure directories exist
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]:
os.makedirs(folder, exist_ok=True)
# Enhanced file storage with metadata
file_storage = {}
model_storage = {}
analysis_history = {}
class EnterpriseAnalytics:
"""Enterprise-grade analytics engine"""
def __init__(self):
self.scaler = StandardScaler()
self.models = {}
def advanced_data_profiling(self, df):
"""Comprehensive data profiling like enterprise tools"""
profile = {
'dataset_overview': {
'rows': len(df),
'columns': len(df.columns),
'memory_usage': df.memory_usage(deep=True).sum(),
'duplicate_rows': df.duplicated().sum()
},
'column_analysis': {},
'data_quality': {},
'relationships': {},
'recommendations': []
}
for col in df.columns:
col_data = df[col]
col_profile = {
'dtype': str(col_data.dtype),
'missing_count': col_data.isnull().sum(),
'missing_percentage': (col_data.isnull().sum() / len(df)) * 100,
'unique_values': col_data.nunique(),
'cardinality': col_data.nunique() / len(df) if len(df) > 0 else 0
}
if pd.api.types.is_numeric_dtype(col_data):
col_profile.update({
'statistics': {
'mean': col_data.mean(),
'median': col_data.median(),
'std': col_data.std(),
'min': col_data.min(),
'max': col_data.max(),
'q25': col_data.quantile(0.25),
'q75': col_data.quantile(0.75),
'skewness': stats.skew(col_data.dropna()),
'kurtosis': stats.kurtosis(col_data.dropna())
},
'distribution': 'normal' if abs(stats.skew(col_data.dropna())) < 0.5 else 'skewed'
})
else:
col_profile.update({
'top_categories': col_data.value_counts().head(10).to_dict(),
'category_distribution': 'uniform' if col_data.value_counts().std() < col_data.value_counts().mean() * 0.5 else 'imbalanced'
})
profile['column_analysis'][col] = col_profile
# Data quality assessment
profile['data_quality'] = {
'completeness_score': (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100,
'uniqueness_score': (df.nunique().sum() / (len(df) * len(df.columns))) * 100,
'consistency_score': self._calculate_consistency_score(df)
}
# Generate recommendations
profile['recommendations'] = self._generate_recommendations(df, profile)
return profile
def _calculate_consistency_score(self, df):
"""Calculate data consistency score"""
score = 100
for col in df.select_dtypes(include=['object']):
# Check for inconsistent formatting
values = df[col].dropna().astype(str)
if len(values) > 0:
# Check for mixed case
if len(set([v.lower() for v in values])) != len(set(values)):
score -= 5
# Check for leading/trailing spaces
if any(v != v.strip() for v in values):
score -= 5
return max(0, score)
def _generate_recommendations(self, df, profile):
"""Generate actionable recommendations"""
recommendations = []
# High missing value columns
for col, analysis in profile['column_analysis'].items():
if analysis['missing_percentage'] > 20:
recommendations.append({
'type': 'data_quality',
'priority': 'high',
'message': f"Column '{col}' has {analysis['missing_percentage']:.1f}% missing values. Consider imputation or removal.",
'action': 'handle_missing_values'
})
# High cardinality categorical columns
for col, analysis in profile['column_analysis'].items():
if analysis.get('cardinality', 0) > 0.8 and df[col].dtype == 'object':
recommendations.append({
'type': 'feature_engineering',
'priority': 'medium',
'message': f"Column '{col}' has high cardinality. Consider feature encoding or dimensionality reduction.",
'action': 'encode_categorical'
})
# Skewed distributions
for col, analysis in profile['column_analysis'].items():
if 'statistics' in analysis and abs(analysis['statistics']['skewness']) > 2:
recommendations.append({
'type': 'data_transformation',
'priority': 'medium',
'message': f"Column '{col}' is highly skewed. Consider log transformation or scaling.",
'action': 'transform_distribution'
})
return recommendations
def advanced_feature_engineering(self, df, target_column=None):
"""Enterprise-level feature engineering"""
engineered_features = {}
# Numeric feature engineering
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if col != target_column:
# Polynomial features
engineered_features[f'{col}_squared'] = df[col] ** 2
engineered_features[f'{col}_log'] = np.log1p(df[col].abs())
# Binning
engineered_features[f'{col}_binned'] = pd.cut(df[col], bins=5, labels=False)
# Rolling statistics (if data has time component)
if len(df) > 10:
engineered_features[f'{col}_rolling_mean'] = df[col].rolling(window=min(5, len(df)//2)).mean()
# Categorical feature engineering
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if col != target_column:
# Frequency encoding
freq_map = df[col].value_counts().to_dict()
engineered_features[f'{col}_frequency'] = df[col].map(freq_map)
# Target encoding (if target is provided)
if target_column and target_column in df.columns:
target_mean = df.groupby(col)[target_column].mean()
engineered_features[f'{col}_target_encoded'] = df[col].map(target_mean)
# Interaction features
if len(numeric_cols) >= 2:
col_pairs = [(numeric_cols[i], numeric_cols[j])
for i in range(len(numeric_cols))
for j in range(i+1, min(i+3, len(numeric_cols)))] # Limit combinations
for col1, col2 in col_pairs:
if col1 != target_column and col2 != target_column:
engineered_features[f'{col1}_{col2}_interaction'] = df[col1] * df[col2]
engineered_features[f'{col1}_{col2}_ratio'] = df[col1] / (df[col2] + 1e-8)
return pd.DataFrame(engineered_features, index=df.index)
def automated_ml_pipeline(self, df, target_column, problem_type='auto'):
"""Enterprise AutoML pipeline"""
results = {
'preprocessing': {},
'feature_selection': {},
'models': {},
'best_model': {},
'predictions': {},
'feature_importance': {}
}
# Determine problem type
if problem_type == 'auto':
if df[target_column].dtype in ['object', 'category'] or df[target_column].nunique() < 10:
problem_type = 'classification'
else:
problem_type = 'regression'
# Preprocessing
feature_cols = [col for col in df.columns if col != target_column]
X = df[feature_cols].copy()
y = df[target_column].copy()
# Handle missing values
X_numeric = X.select_dtypes(include=[np.number])
X_categorical = X.select_dtypes(include=['object'])
if not X_numeric.empty:
X_numeric = X_numeric.fillna(X_numeric.median())
if not X_categorical.empty:
X_categorical = X_categorical.fillna(X_categorical.mode().iloc[0] if not X_categorical.mode().empty else 'Unknown')
# Encode categorical variables
if not X_categorical.empty:
le = LabelEncoder()
for col in X_categorical.columns:
X_categorical[col] = le.fit_transform(X_categorical[col].astype(str))
X_processed = pd.concat([X_numeric, X_categorical], axis=1)
# Handle target variable for classification
if problem_type == 'classification' and y.dtype == 'object':
le_target = LabelEncoder()
y = le_target.fit_transform(y)
# Feature selection
if len(X_processed.columns) > 10:
selector = SelectKBest(f_regression, k=min(10, len(X_processed.columns)))
X_selected = selector.fit_transform(X_processed, y)
selected_features = X_processed.columns[selector.get_support()].tolist()
X_processed = pd.DataFrame(X_selected, columns=selected_features)
results['feature_selection']['selected_features'] = selected_features
# Split data
X_train, X_test, y_train, y_test = train_test_split(
X_processed, y, test_size=0.2, random_state=42
)
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Model selection based on problem type
if problem_type == 'regression':
models = {
'Linear Regression': LinearRegression(),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'Ridge Regression': Ridge()
}
else:
models = {
'Logistic Regression': LogisticRegression(random_state=42),
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42)
}
# Train and evaluate models
best_score = -np.inf if problem_type == 'regression' else 0
best_model_name = None
for name, model in models.items():
try:
# Cross-validation
if problem_type == 'regression':
scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='r2')
score = scores.mean()
else:
scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring='accuracy')
score = scores.mean()
# Train final model
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
if problem_type == 'regression':
test_score = r2_score(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
results['models'][name] = {
'cv_score': score,
'test_r2': test_score,
'test_mse': mse,
'predictions': y_pred.tolist()
}
else:
test_score = model.score(X_test_scaled, y_test)
results['models'][name] = {
'cv_score': score,
'test_accuracy': test_score,
'predictions': y_pred.tolist()
}
# Track best model
if score > best_score:
best_score = score
best_model_name = name
# Feature importance
if hasattr(model, 'feature_importances_'):
importance = dict(zip(X_processed.columns, model.feature_importances_))
results['feature_importance'] = dict(sorted(importance.items(), key=lambda x: x[1], reverse=True))
except Exception as e:
logger.error(f"Error training {name}: {str(e)}")
continue
results['best_model'] = {
'name': best_model_name,
'score': best_score,
'problem_type': problem_type
}
results['preprocessing'] = {
'numeric_features': X_numeric.columns.tolist() if not X_numeric.empty else [],
'categorical_features': X_categorical.columns.tolist() if not X_categorical.empty else [],
'scaling_applied': True,
'missing_values_handled': True
}
return results
def advanced_clustering_analysis(self, df, n_clusters=None):
"""Enterprise clustering with multiple algorithms"""
# Prepare data
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.empty:
raise ValueError("No numeric columns for clustering")
# Handle missing values
numeric_df = numeric_df.fillna(numeric_df.median())
# Scale data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(numeric_df)
results = {
'algorithms': {},
'optimal_clusters': {},
'silhouette_scores': {},
'recommendations': []
}
# Determine optimal number of clusters if not provided
if n_clusters is None:
# Elbow method for K-means
inertias = []
k_range = range(2, min(11, len(numeric_df) // 2))
for k in k_range:
kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
kmeans.fit(X_scaled)
inertias.append(kmeans.inertia_)
# Find elbow point (simplified)
if len(inertias) > 2:
diffs = np.diff(inertias)
second_diffs = np.diff(diffs)
n_clusters = k_range[np.argmax(second_diffs) + 1] if len(second_diffs) > 0 else 3
else:
n_clusters = 3
# Apply multiple clustering algorithms
algorithms = {
'K-Means': KMeans(n_clusters=n_clusters, random_state=42, n_init=10),
'Hierarchical': AgglomerativeClustering(n_clusters=n_clusters),
'DBSCAN': DBSCAN(eps=0.5, min_samples=5)
}
for name, algo in algorithms.items():
try:
if name == 'DBSCAN':
labels = algo.fit_predict(X_scaled)
n_clusters_found = len(set(labels)) - (1 if -1 in labels else 0)
else:
labels = algo.fit_predict(X_scaled)
n_clusters_found = n_clusters
# Calculate silhouette score
if len(set(labels)) > 1:
from sklearn.metrics import silhouette_score
sil_score = silhouette_score(X_scaled, labels)
else:
sil_score = 0
results['algorithms'][name] = {
'labels': labels.tolist(),
'n_clusters': n_clusters_found,
'silhouette_score': sil_score
}
results['silhouette_scores'][name] = sil_score
except Exception as e:
logger.error(f"Error in {name} clustering: {str(e)}")
continue
# PCA for visualization
if len(numeric_df.columns) > 2:
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
results['pca_components'] = X_pca.tolist()
results['pca_explained_variance'] = pca.explained_variance_ratio_.tolist()
# Generate recommendations
best_algo = max(results['silhouette_scores'].items(), key=lambda x: x[1])[0]
results['recommendations'].append({
'type': 'clustering',
'message': f"Best clustering algorithm: {best_algo} with silhouette score: {results['silhouette_scores'][best_algo]:.3f}",
'optimal_clusters': results['algorithms'][best_algo]['n_clusters']
})
return results
def time_series_analysis(self, df, date_column, value_column):
"""Advanced time series analysis"""
# Convert date column
df[date_column] = pd.to_datetime(df[date_column])
df = df.sort_values(date_column)
# Set date as index
ts_df = df.set_index(date_column)[value_column]
results = {
'trend_analysis': {},
'seasonality': {},
'forecasting': {},
'anomalies': {},
'statistics': {}
}
# Basic statistics
results['statistics'] = {
'mean': ts_df.mean(),
'std': ts_df.std(),
'min': ts_df.min(),
'max': ts_df.max(),
'trend': 'increasing' if ts_df.iloc[-1] > ts_df.iloc[0] else 'decreasing'
}
# Trend analysis using linear regression
X = np.arange(len(ts_df)).reshape(-1, 1)
y = ts_df.values
lr = LinearRegression()
lr.fit(X, y)
trend_slope = lr.coef_[0]
results['trend_analysis'] = {
'slope': trend_slope,
'direction': 'increasing' if trend_slope > 0 else 'decreasing',
'strength': abs(trend_slope)
}
# Simple anomaly detection using IQR
Q1 = ts_df.quantile(0.25)
Q3 = ts_df.quantile(0.75)
IQR = Q3 - Q1
anomalies = ts_df[(ts_df < Q1 - 1.5 * IQR) | (ts_df > Q3 + 1.5 * IQR)]
results['anomalies'] = {
'count': len(anomalies),
'dates': anomalies.index.strftime('%Y-%m-%d').tolist(),
'values': anomalies.values.tolist()
}
# Simple forecasting (moving average)
window = min(7, len(ts_df) // 4)
if window > 0:
forecast_periods = min(10, len(ts_df) // 4)
last_values = ts_df.tail(window).mean()
results['forecasting'] = {
'method': 'moving_average',
'forecast_periods': forecast_periods,
'forecast_values': [last_values] * forecast_periods
}
return results
# Initialize analytics engine
analytics_engine = EnterpriseAnalytics()
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def cleanup_old_files():
"""Enhanced cleanup with model cleanup"""
try:
# Existing cleanup logic...
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER, MODELS_FOLDER]:
for root, dirs, files in os.walk(folder):
for file in files:
filepath = os.path.join(root, file)
if get_file_age(filepath) > FILE_EXPIRY_HOURS:
os.remove(filepath)
logger.info(f"Cleaned up old file: {filepath}")
# Clean up storage entries
current_time = datetime.now()
for storage in [file_storage, model_storage, analysis_history]:
sessions_to_remove = []
for session_id, session_data in storage.items():
if isinstance(session_data, dict):
items_to_remove = []
for item_id, item_info in session_data.items():
if 'timestamp' in item_info:
item_time = datetime.fromisoformat(item_info['timestamp'])
if (current_time - item_time).total_seconds() > FILE_EXPIRY_HOURS * 3600:
items_to_remove.append(item_id)
for item_id in items_to_remove:
del session_data[item_id]
if not session_data:
sessions_to_remove.append(session_id)
for session_id in sessions_to_remove:
del storage[session_id]
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
def get_file_age(filepath):
"""Get file age in hours"""
if os.path.exists(filepath):
file_time = os.path.getmtime(filepath)
return (time.time() - file_time) / 3600
return float('inf')
def load_data_file(filepath, filename):
"""Enhanced data loading with more formats"""
try:
file_ext = filename.rsplit('.', 1)[1].lower()
if file_ext == 'csv':
return pd.read_csv(filepath)
elif file_ext in ['xlsx', 'xls']:
return pd.read_excel(filepath)
elif file_ext == 'json':
return pd.read_json(filepath)
elif file_ext == 'parquet':
return pd.read_parquet(filepath)
elif file_ext == 'tsv':
return pd.read_csv(filepath, sep='\t')
elif file_ext == 'feather':
return pd.read_feather(filepath)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
# Setup enhanced scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(func=cleanup_old_files, trigger="interval", hours=1)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
# API Endpoints
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({
'status': 'healthy',
'version': '2.0.0-enterprise',
'features': ['advanced_profiling', 'automl', 'clustering', 'time_series'],
'timestamp': datetime.now().isoformat()
})
@app.route('/api/upload', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
session_id = request.form.get('sessionId')
if not session_id:
return jsonify({'error': 'Session ID required'}), 400
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not supported'}), 400
# Check file size
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400
# Generate unique file ID and secure filename
file_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
# Create session directory
session_dir = os.path.join(UPLOAD_FOLDER, session_id)
os.makedirs(session_dir, exist_ok=True)
# Save file
filepath = os.path.join(session_dir, f"{file_id}_{filename}")
file.save(filepath)
# Enhanced file metadata
if session_id not in file_storage:
file_storage[session_id] = {}
file_storage[session_id][file_id] = {
'filename': filename,
'filepath': filepath,
'size': file_size,
'timestamp': datetime.now().isoformat(),
'format': filename.rsplit('.', 1)[1].lower(),
'status': 'uploaded'
}
return jsonify({
'fileId': file_id,
'filename': filename,
'size': file_size,
'format': filename.rsplit('.', 1)[1].lower(),
'message': 'File uploaded successfully'
})
except Exception as e:
logger.error(f"Upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/profile/<file_id>', methods=['GET'])
def profile_data(file_id):
"""Advanced data profiling endpoint"""
try:
session_id = request.args.get('sessionId')
if not session_id or session_id not in file_storage:
return jsonify({'error': 'Invalid session'}), 400
if file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Perform advanced profiling
profile = analytics_engine.advanced_data_profiling(df)
return jsonify(profile)
except Exception as e:
logger.error(f"Profiling error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/automl', methods=['POST'])
def run_automl():
"""Automated ML pipeline endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
target_column = data.get('targetColumn')
problem_type = data.get('problemType', 'auto')
if not all([session_id, file_id, target_column]):
return jsonify({'error': 'Session ID, File ID, and target column required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
if target_column not in df.columns:
return jsonify({'error': f'Target column {target_column} not found'}), 400
# Run AutoML pipeline
results = analytics_engine.automated_ml_pipeline(df, target_column, problem_type)
# Save results
result_id = str(uuid.uuid4())
result_dir = os.path.join(PROCESSED_FOLDER, session_id)
os.makedirs(result_dir, exist_ok=True)
result_filepath = os.path.join(result_dir, f"{result_id}_automl.json")
with open(result_filepath, 'w') as f:
json.dump(results, f, indent=2, default=str)
return jsonify({
'resultId': result_id,
'results': results,
'analysisType': 'automl',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"AutoML error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/clustering', methods=['POST'])
def run_clustering():
"""Advanced clustering analysis endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
n_clusters = data.get('nClusters')
if not all([session_id, file_id]):
return jsonify({'error': 'Session ID and File ID required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Run clustering analysis
results = analytics_engine.advanced_clustering_analysis(df, n_clusters)
# Save results
result_id = str(uuid.uuid4())
result_dir = os.path.join(PROCESSED_FOLDER, session_id)
os.makedirs(result_dir, exist_ok=True)
result_filepath = os.path.join(result_dir, f"{result_id}_clustering.json")
with open(result_filepath, 'w') as f:
json.dump(results, f, indent=2, default=str)
return jsonify({
'resultId': result_id,
'results': results,
'analysisType': 'clustering',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Clustering error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/timeseries', methods=['POST'])
def run_timeseries():
"""Time series analysis endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
date_column = data.get('dateColumn')
value_column = data.get('valueColumn')
if not all([session_id, file_id, date_column, value_column]):
return jsonify({'error': 'Session ID, File ID, date column, and value column required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
if date_column not in df.columns or value_column not in df.columns:
return jsonify({'error': 'Date or value column not found'}), 400
# Run time series analysis
results = analytics_engine.time_series_analysis(df, date_column, value_column)
# Save results
result_id = str(uuid.uuid4())
result_dir = os.path.join(PROCESSED_FOLDER, session_id)
os.makedirs(result_dir, exist_ok=True)
result_filepath = os.path.join(result_dir, f"{result_id}_timeseries.json")
with open(result_filepath, 'w') as f:
json.dump(results, f, indent=2, default=str)
return jsonify({
'resultId': result_id,
'results': results,
'analysisType': 'timeseries',
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Time series error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/feature-engineering', methods=['POST'])
def run_feature_engineering():
"""Feature engineering endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
target_column = data.get('targetColumn')
if not all([session_id, file_id]):
return jsonify({'error': 'Session ID and File ID required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Generate engineered features
engineered_df = analytics_engine.advanced_feature_engineering(df, target_column)
# Save engineered dataset
engineered_file_id = str(uuid.uuid4())
engineered_filepath = os.path.join(
PROCESSED_FOLDER, session_id, f"{engineered_file_id}_engineered.csv"
)
os.makedirs(os.path.dirname(engineered_filepath), exist_ok=True)
# Combine original and engineered features
combined_df = pd.concat([df, engineered_df], axis=1)
combined_df.to_csv(engineered_filepath, index=False)
# Store engineered file info
if session_id not in file_storage:
file_storage[session_id] = {}
file_storage[session_id][engineered_file_id] = {
'filename': f"{file_info['filename'].split('.')[0]}_engineered.csv",
'filepath': engineered_filepath,
'size': os.path.getsize(engineered_filepath),
'timestamp': datetime.now().isoformat(),
'format': 'csv',
'status': 'engineered',
'parent_file': file_id
}
return jsonify({
'engineeredFileId': engineered_file_id,
'originalFeatures': len(df.columns),
'engineeredFeatures': len(engineered_df.columns),
'totalFeatures': len(combined_df.columns),
'featureNames': engineered_df.columns.tolist(),
'message': 'Feature engineering completed successfully'
})
except Exception as e:
logger.error(f"Feature engineering error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/advanced-visualization', methods=['POST'])
def create_advanced_visualization():
"""Advanced visualization endpoint with Plotly"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
chart_type = data.get('chartType')
parameters = data.get('parameters', {})
if not all([session_id, file_id, chart_type]):
return jsonify({'error': 'Session ID, File ID, and chart type required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Create advanced visualizations using Plotly
if chart_type == 'correlation_heatmap':
numeric_df = df.select_dtypes(include=[np.number])
corr_matrix = numeric_df.corr()
fig = px.imshow(corr_matrix,
title='Correlation Heatmap',
color_continuous_scale='RdBu_r',
aspect='auto')
elif chart_type == 'distribution_plots':
column = parameters.get('column')
if not column or column not in df.columns:
return jsonify({'error': 'Column not specified or not found'}), 400
fig = px.histogram(df, x=column,
title=f'Distribution of {column}',
marginal='box')
elif chart_type == 'scatter_matrix':
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:4])
fig = px.scatter_matrix(df[columns],
title='Scatter Matrix',
dimensions=columns)
elif chart_type == 'parallel_coordinates':
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5])
fig = px.parallel_coordinates(df,
dimensions=columns,
title='Parallel Coordinates Plot')
elif chart_type == 'box_plots':
columns = parameters.get('columns', df.select_dtypes(include=[np.number]).columns[:5])
fig = px.box(df[columns],
title='Box Plots Comparison')
elif chart_type == '3d_scatter':
x_col = parameters.get('x_column')
y_col = parameters.get('y_column')
z_col = parameters.get('z_column')
if not all([x_col, y_col, z_col]):
return jsonify({'error': '3D scatter requires x, y, and z columns'}), 400
fig = px.scatter_3d(df, x=x_col, y=y_col, z=z_col,
title=f'3D Scatter: {x_col} vs {y_col} vs {z_col}')
else:
return jsonify({'error': 'Unsupported chart type'}), 400
# Convert to JSON
chart_json = json.dumps(fig, cls=PlotlyJSONEncoder)
return jsonify({
'chart': chart_json,
'chartType': chart_type,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Visualization error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/data-quality', methods=['POST'])
def assess_data_quality():
"""Data quality assessment endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
if not all([session_id, file_id]):
return jsonify({'error': 'Session ID and File ID required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
quality_report = {
'overall_score': 0,
'dimensions': {
'completeness': {},
'consistency': {},
'validity': {},
'uniqueness': {},
'accuracy': {}
},
'issues': [],
'recommendations': []
}
# Completeness assessment
total_cells = len(df) * len(df.columns)
missing_cells = df.isnull().sum().sum()
completeness_score = ((total_cells - missing_cells) / total_cells) * 100
quality_report['dimensions']['completeness'] = {
'score': completeness_score,
'missing_values': df.isnull().sum().to_dict(),
'missing_percentage': (df.isnull().sum() / len(df) * 100).to_dict()
}
# Consistency assessment
consistency_issues = []
for col in df.select_dtypes(include=['object']):
# Check for inconsistent formatting
values = df[col].dropna().astype(str)
if len(values) > 0:
# Mixed case issues
lowercase_values = set(v.lower() for v in values)
if len(lowercase_values) != len(set(values)):
consistency_issues.append(f"Column '{col}' has mixed case values")
# Leading/trailing spaces
if any(v != v.strip() for v in values):
consistency_issues.append(f"Column '{col}' has leading/trailing spaces")
consistency_score = max(0, 100 - len(consistency_issues) * 10)
quality_report['dimensions']['consistency'] = {
'score': consistency_score,
'issues': consistency_issues
}
# Validity assessment (basic data type validation)
validity_issues = []
for col in df.columns:
if df[col].dtype == 'object':
# Check for potential numeric columns stored as strings
try:
pd.to_numeric(df[col].dropna(), errors='raise')
validity_issues.append(f"Column '{col}' appears to be numeric but stored as text")
except:
pass
validity_score = max(0, 100 - len(validity_issues) * 15)
quality_report['dimensions']['validity'] = {
'score': validity_score,
'issues': validity_issues
}
# Uniqueness assessment
uniqueness_scores = {}
for col in df.columns:
unique_ratio = df[col].nunique() / len(df) if len(df) > 0 else 0
uniqueness_scores[col] = unique_ratio * 100
avg_uniqueness = np.mean(list(uniqueness_scores.values()))
quality_report['dimensions']['uniqueness'] = {
'score': avg_uniqueness,
'column_scores': uniqueness_scores,
'duplicate_rows': df.duplicated().sum()
}
# Overall score calculation
dimension_scores = [
completeness_score,
consistency_score,
validity_score,
avg_uniqueness
]
quality_report['overall_score'] = np.mean(dimension_scores)
# Generate recommendations
if completeness_score < 80:
quality_report['recommendations'].append({
'type': 'completeness',
'priority': 'high',
'message': 'Consider imputing missing values or removing incomplete records'
})
if consistency_score < 70:
quality_report['recommendations'].append({
'type': 'consistency',
'priority': 'medium',
'message': 'Standardize text formatting and remove extra spaces'
})
if validity_score < 80:
quality_report['recommendations'].append({
'type': 'validity',
'priority': 'medium',
'message': 'Review data types and convert where appropriate'
})
return jsonify(quality_report)
except Exception as e:
logger.error(f"Data quality error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/statistical-tests', methods=['POST'])
def run_statistical_tests():
"""Statistical hypothesis testing endpoint"""
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
test_type = data.get('testType')
parameters = data.get('parameters', {})
if not all([session_id, file_id, test_type]):
return jsonify({'error': 'Session ID, File ID, and test type required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
results = {'test_type': test_type, 'results': {}}
if test_type == 'normality':
column = parameters.get('column')
if not column or column not in df.columns:
return jsonify({'error': 'Column not specified or not found'}), 400
data_col = df[column].dropna()
# Shapiro-Wilk test
shapiro_stat, shapiro_p = stats.shapiro(data_col.sample(min(5000, len(data_col))))
# Anderson-Darling test
anderson_result = stats.anderson(data_col)
results['results'] = {
'shapiro_wilk': {
'statistic': shapiro_stat,
'p_value': shapiro_p,
'is_normal': shapiro_p > 0.05
},
'anderson_darling': {
'statistic': anderson_result.statistic,
'critical_values': anderson_result.critical_values.tolist(),
'significance_levels': anderson_result.significance_level.tolist()
}
}
elif test_type == 'correlation_significance':
col1 = parameters.get('column1')
col2 = parameters.get('column2')
if not all([col1, col2]) or col1 not in df.columns or col2 not in df.columns:
return jsonify({'error': 'Both columns must be specified and exist'}), 400
# Pearson correlation
pearson_corr, pearson_p = stats.pearsonr(df[col1].dropna(), df[col2].dropna())
# Spearman correlation
spearman_corr, spearman_p = stats.spearmanr(df[col1].dropna(), df[col2].dropna())
results['results'] = {
'pearson': {
'correlation': pearson_corr,
'p_value': pearson_p,
'significant': pearson_p < 0.05
},
'spearman': {
'correlation': spearman_corr,
'p_value': spearman_p,
'significant': spearman_p < 0.05
}
}
elif test_type == 'group_comparison':
group_col = parameters.get('groupColumn')
value_col = parameters.get('valueColumn')
if not all([group_col, value_col]):
return jsonify({'error': 'Group and value columns required'}), 400
groups = [group for name, group in df.groupby(group_col)[value_col] if len(group) > 1]
if len(groups) == 2:
# Two-sample t-test
t_stat, t_p = stats.ttest_ind(groups[0], groups[1])
# Mann-Whitney U test
u_stat, u_p = stats.mannwhitneyu(groups[0], groups[1])
results['results'] = {
'two_sample_ttest': {
'statistic': t_stat,
'p_value': t_p,
'significant': t_p < 0.05
},
'mann_whitney_u': {
'statistic': u_stat,
'p_value': u_p,
'significant': u_p < 0.05
}
}
elif len(groups) > 2:
# ANOVA
f_stat, f_p = stats.f_oneway(*groups)
# Kruskal-Wallis test
h_stat, h_p = stats.kruskal(*groups)
results['results'] = {
'anova': {
'statistic': f_stat,
'p_value': f_p,
'significant': f_p < 0.05
},
'kruskal_wallis': {
'statistic': h_stat,
'p_value': h_p,
'significant': h_p < 0.05
}
}
else:
return jsonify({'error': 'Unsupported test type'}), 400
return jsonify(results)
except Exception as e:
logger.error(f"Statistical test error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analysis-history/<session_id>', methods=['GET'])
def get_analysis_history(session_id):
"""Get analysis history for a session"""
try:
if session_id not in analysis_history:
return jsonify({'history': []})
return jsonify({'history': list(analysis_history[session_id].values())})
except Exception as e:
logger.error(f"History error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/export-report', methods=['POST'])
def export_analysis_report():
"""Export comprehensive analysis report"""
try:
data = request.get_json()
session_id = data.get('sessionId')
analyses = data.get('analyses', []) # List of analysis result IDs
if not session_id:
return jsonify({'error': 'Session ID required'}), 400
# Compile report
report = {
'session_id': session_id,
'generated_at': datetime.now().isoformat(),
'analyses': [],
'summary': {
'total_analyses': len(analyses),
'data_files_processed': len(file_storage.get(session_id, {})),
'recommendations': []
}
}
# Load each analysis result
for analysis_id in analyses:
try:
result_files = [
f for f in os.listdir(os.path.join(PROCESSED_FOLDER, session_id))
if f.startswith(analysis_id)
]
if result_files:
filepath = os.path.join(PROCESSED_FOLDER, session_id, result_files[0])
with open(filepath, 'r') as f:
analysis_data = json.load(f)
report['analyses'].append({
'id': analysis_id,
'type': result_files[0].split('_')[1].split('.')[0],
'data': analysis_data
})
except Exception as e:
logger.error(f"Error loading analysis {analysis_id}: {str(e)}")
continue
# Generate summary recommendations
if report['analyses']:
report['summary']['recommendations'] = [
"Review data quality scores and address high-priority issues",
"Consider feature engineering for improved model performance",
"Validate statistical assumptions before drawing conclusions",
"Monitor model performance with cross-validation results"
]
# Save report
report_id = str(uuid.uuid4())
report_dir = os.path.join(PROCESSED_FOLDER, session_id)
os.makedirs(report_dir, exist_ok=True)
report_filepath = os.path.join(report_dir, f"{report_id}_report.json")
with open(report_filepath, 'w') as f:
json.dump(report, f, indent=2, default=str)
return jsonify({
'reportId': report_id,
'message': 'Report generated successfully',
'downloadUrl': f'/api/download/{report_id}?sessionId={session_id}&format=json'
})
except Exception as e:
logger.error(f"Report export error: {str(e)}")
return jsonify({'error': str(e)}), 500
# Update existing endpoints with enhanced functionality
@app.route('/api/preview/<file_id>', methods=['GET'])
def preview_file(file_id):
try:
session_id = request.args.get('sessionId')
if not session_id or session_id not in file_storage:
return jsonify({'error': 'Invalid session'}), 400
if file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Enhanced preview with data insights
preview_data = {
'basic_info': {
'columns': df.columns.tolist(),
'dtypes': df.dtypes.astype(str).to_dict(),
'shape': df.shape,
'memory_usage': df.memory_usage(deep=True).sum()
},
'sample_data': {
'head': df.head(5).to_dict('records'),
'tail': df.tail(5).to_dict('records')
},
'data_quality': {
'missing_values': df.isnull().sum().to_dict(),
'duplicate_rows': df.duplicated().sum(),
'unique_values': df.nunique().to_dict()
},
'quick_stats': {}
}
# Quick statistics for numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
preview_data['quick_stats']['numeric'] = df[numeric_cols].describe().to_dict()
# Quick statistics for categorical columns
categorical_cols = df.select_dtypes(include=['object']).columns
if len(categorical_cols) > 0:
preview_data['quick_stats']['categorical'] = {}
for col in categorical_cols[:5]: # Limit to first 5 categorical columns
preview_data['quick_stats']['categorical'][col] = {
'top_values': df[col].value_counts().head(5).to_dict()
}
return jsonify(preview_data)
except Exception as e:
logger.error(f"Preview error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/', methods=['GET'])
def home():
return jsonify({
'message': 'Enterprise Data Analytics Platform',
'version': '2.0.0-enterprise',
'features': {
'core': ['data_profiling', 'quality_assessment', 'statistical_tests'],
'machine_learning': ['automl', 'clustering', 'feature_engineering'],
'time_series': ['trend_analysis', 'forecasting', 'anomaly_detection'],
'visualization': ['advanced_charts', 'interactive_plots', 'correlation_heatmaps'],
'enterprise': ['report_generation', 'analysis_history', 'data_governance']
},
'endpoints': {
'data_management': ['/api/upload', '/api/preview/<file_id>', '/api/profile/<file_id>'],
'analytics': ['/api/automl', '/api/clustering', '/api/timeseries'],
'quality': ['/api/data-quality', '/api/statistical-tests'],
'visualization': ['/api/advanced-visualization'],
'enterprise': ['/api/export-report', '/api/analysis-history/<session_id>']
},
'timestamp': datetime.now().isoformat()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=False) # Production ready