Spaces:
Sleeping
Sleeping
import os | |
import uuid | |
import json | |
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
from flask import Flask, request, jsonify, send_file | |
from flask_cors import CORS | |
from werkzeug.utils import secure_filename | |
import threading | |
import time | |
import logging | |
from scipy import stats | |
import matplotlib | |
matplotlib.use('Agg') # Use non-interactive backend | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import io | |
import base64 | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import atexit | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
CORS(app) | |
# Configuration | |
UPLOAD_FOLDER = '/tmp/uploads' | |
PROCESSED_FOLDER = '/tmp/processed' | |
MAX_FILE_SIZE = 512 * 1024 * 1024 # 512MB | |
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv'} | |
FILE_EXPIRY_HOURS = 1 | |
# Ensure directories exist | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(PROCESSED_FOLDER, exist_ok=True) | |
# File storage to track sessions and files | |
file_storage = {} | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def get_file_age(filepath): | |
"""Get file age in hours""" | |
if os.path.exists(filepath): | |
file_time = os.path.getmtime(filepath) | |
return (time.time() - file_time) / 3600 | |
return float('inf') | |
def cleanup_old_files(): | |
"""Remove files older than FILE_EXPIRY_HOURS""" | |
try: | |
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER]: | |
for root, dirs, files in os.walk(folder): | |
for file in files: | |
filepath = os.path.join(root, file) | |
if get_file_age(filepath) > FILE_EXPIRY_HOURS: | |
os.remove(filepath) | |
logger.info(f"Cleaned up old file: {filepath}") | |
# Clean up file_storage entries | |
current_time = datetime.now() | |
sessions_to_remove = [] | |
for session_id, files in file_storage.items(): | |
files_to_remove = [] | |
for file_id, file_info in files.items(): | |
file_time = datetime.fromisoformat(file_info['timestamp']) | |
if (current_time - file_time).total_seconds() > FILE_EXPIRY_HOURS * 3600: | |
files_to_remove.append(file_id) | |
for file_id in files_to_remove: | |
del files[file_id] | |
if not files: | |
sessions_to_remove.append(session_id) | |
for session_id in sessions_to_remove: | |
del file_storage[session_id] | |
except Exception as e: | |
logger.error(f"Error during cleanup: {str(e)}") | |
# Setup scheduler for automatic cleanup | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(func=cleanup_old_files, trigger="interval", minutes=15) | |
scheduler.start() | |
atexit.register(lambda: scheduler.shutdown()) | |
def load_data_file(filepath, filename): | |
"""Load data from various file formats""" | |
try: | |
file_ext = filename.rsplit('.', 1)[1].lower() | |
if file_ext == 'csv': | |
return pd.read_csv(filepath) | |
elif file_ext in ['xlsx', 'xls']: | |
return pd.read_excel(filepath) | |
elif file_ext == 'json': | |
return pd.read_json(filepath) | |
elif file_ext == 'parquet': | |
return pd.read_parquet(filepath) | |
elif file_ext == 'tsv': | |
return pd.read_csv(filepath, sep='\t') | |
else: | |
raise ValueError(f"Unsupported file format: {file_ext}") | |
except Exception as e: | |
raise Exception(f"Error loading file: {str(e)}") | |
def perform_basic_statistics(df, columns=None): | |
"""Perform basic statistical analysis""" | |
if columns: | |
df = df[columns] | |
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
categorical_cols = df.select_dtypes(exclude=[np.number]).columns.tolist() | |
result = { | |
'numeric_summary': {}, | |
'categorical_summary': {}, | |
'general_info': { | |
'total_rows': len(df), | |
'total_columns': len(df.columns), | |
'numeric_columns': len(numeric_cols), | |
'categorical_columns': len(categorical_cols), | |
'missing_values': df.isnull().sum().to_dict() | |
} | |
} | |
# Numeric statistics | |
if numeric_cols: | |
numeric_stats = df[numeric_cols].describe() | |
result['numeric_summary'] = numeric_stats.to_dict() | |
# Categorical statistics | |
if categorical_cols: | |
for col in categorical_cols: | |
result['categorical_summary'][col] = { | |
'unique_values': df[col].nunique(), | |
'top_values': df[col].value_counts().head(10).to_dict(), | |
'missing_count': df[col].isnull().sum() | |
} | |
return result | |
def perform_groupby_analysis(df, group_column, target_column, operation='mean', filters=None): | |
"""Perform group by analysis""" | |
# Apply filters if provided | |
if filters: | |
for f in filters: | |
col, op, val = f['column'], f['operator'], f['value'] | |
if op == '>': | |
df = df[df[col] > val] | |
elif op == '<': | |
df = df[df[col] < val] | |
elif op == '==': | |
df = df[df[col] == val] | |
elif op == '!=': | |
df = df[df[col] != val] | |
elif op == '>=': | |
df = df[df[col] >= val] | |
elif op == '<=': | |
df = df[df[col] <= val] | |
# Perform groupby operation | |
grouped = df.groupby(group_column)[target_column] | |
if operation == 'mean': | |
result = grouped.mean() | |
elif operation == 'sum': | |
result = grouped.sum() | |
elif operation == 'count': | |
result = grouped.count() | |
elif operation == 'max': | |
result = grouped.max() | |
elif operation == 'min': | |
result = grouped.min() | |
elif operation == 'std': | |
result = grouped.std() | |
else: | |
raise ValueError(f"Unsupported operation: {operation}") | |
return { | |
'result': result.to_dict(), | |
'operation': operation, | |
'group_column': group_column, | |
'target_column': target_column, | |
'total_groups': len(result) | |
} | |
def perform_correlation_analysis(df, columns=None, method='pearson'): | |
"""Perform correlation analysis""" | |
if columns: | |
df = df[columns] | |
# Only numeric columns | |
numeric_df = df.select_dtypes(include=[np.number]) | |
if numeric_df.empty: | |
raise ValueError("No numeric columns found for correlation analysis") | |
correlation_matrix = numeric_df.corr(method=method) | |
return { | |
'correlation_matrix': correlation_matrix.to_dict(), | |
'method': method, | |
'columns': numeric_df.columns.tolist() | |
} | |
def detect_outliers(df, columns=None, method='iqr'): | |
"""Detect outliers in numeric columns""" | |
if columns: | |
df = df[columns] | |
numeric_df = df.select_dtypes(include=[np.number]) | |
outliers = {} | |
for col in numeric_df.columns: | |
if method == 'iqr': | |
Q1 = numeric_df[col].quantile(0.25) | |
Q3 = numeric_df[col].quantile(0.75) | |
IQR = Q3 - Q1 | |
lower_bound = Q1 - 1.5 * IQR | |
upper_bound = Q3 + 1.5 * IQR | |
outlier_indices = numeric_df[(numeric_df[col] < lower_bound) | | |
(numeric_df[col] > upper_bound)].index.tolist() | |
elif method == 'zscore': | |
z_scores = np.abs(stats.zscore(numeric_df[col].dropna())) | |
outlier_indices = numeric_df[z_scores > 3].index.tolist() | |
outliers[col] = { | |
'count': len(outlier_indices), | |
'indices': outlier_indices[:100], # Limit to first 100 | |
'percentage': (len(outlier_indices) / len(numeric_df)) * 100 | |
} | |
return outliers | |
def generate_visualization(df, chart_type, x_column, y_column=None, group_column=None): | |
"""Generate visualization and return base64 encoded image""" | |
plt.figure(figsize=(10, 6)) | |
try: | |
if chart_type == 'histogram': | |
plt.hist(df[x_column], bins=30, alpha=0.7) | |
plt.xlabel(x_column) | |
plt.ylabel('Frequency') | |
plt.title(f'Histogram of {x_column}') | |
elif chart_type == 'scatter': | |
if not y_column: | |
raise ValueError("Y column required for scatter plot") | |
plt.scatter(df[x_column], df[y_column], alpha=0.6) | |
plt.xlabel(x_column) | |
plt.ylabel(y_column) | |
plt.title(f'{x_column} vs {y_column}') | |
elif chart_type == 'bar': | |
if group_column: | |
grouped = df.groupby(group_column)[x_column].mean() if pd.api.types.is_numeric_dtype(df[x_column]) else df[group_column].value_counts() | |
else: | |
grouped = df[x_column].value_counts().head(20) | |
grouped.plot(kind='bar') | |
plt.xlabel(group_column or x_column) | |
plt.ylabel('Count' if not pd.api.types.is_numeric_dtype(df[x_column]) else f'Mean {x_column}') | |
plt.title(f'Bar Chart') | |
plt.xticks(rotation=45) | |
elif chart_type == 'line': | |
if y_column: | |
plt.plot(df[x_column], df[y_column]) | |
plt.xlabel(x_column) | |
plt.ylabel(y_column) | |
else: | |
df[x_column].plot() | |
plt.ylabel(x_column) | |
plt.title('Line Chart') | |
elif chart_type == 'box': | |
if group_column: | |
df.boxplot(column=x_column, by=group_column) | |
else: | |
df.boxplot(column=x_column) | |
plt.title('Box Plot') | |
plt.tight_layout() | |
# Convert plot to base64 string | |
img_buffer = io.BytesIO() | |
plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight') | |
img_buffer.seek(0) | |
img_base64 = base64.b64encode(img_buffer.getvalue()).decode() | |
plt.close() | |
return img_base64 | |
except Exception as e: | |
plt.close() | |
raise Exception(f"Error generating visualization: {str(e)}") | |
def parse_natural_language_query(query, df_columns): | |
"""Simple natural language query parser""" | |
query_lower = query.lower() | |
# Define operation keywords | |
operations = { | |
'average': 'mean', 'mean': 'mean', 'avg': 'mean', | |
'sum': 'sum', 'total': 'sum', | |
'count': 'count', 'number': 'count', | |
'max': 'max', 'maximum': 'max', 'highest': 'max', | |
'min': 'min', 'minimum': 'min', 'lowest': 'min' | |
} | |
# Find operation | |
operation = 'mean' # default | |
for keyword, op in operations.items(): | |
if keyword in query_lower: | |
operation = op | |
break | |
# Find columns mentioned in query | |
mentioned_columns = [col for col in df_columns if col.lower() in query_lower] | |
# Simple parsing patterns | |
if 'by' in query_lower and len(mentioned_columns) >= 2: | |
# Group by analysis | |
target_col = mentioned_columns[0] | |
group_col = mentioned_columns[-1] | |
return { | |
'analysisType': 'groupby', | |
'parameters': { | |
'groupByColumn': group_col, | |
'targetColumn': target_col, | |
'operation': operation | |
} | |
} | |
elif 'correlation' in query_lower: | |
return { | |
'analysisType': 'correlation', | |
'parameters': { | |
'columns': mentioned_columns if mentioned_columns else None | |
} | |
} | |
elif any(word in query_lower for word in ['chart', 'plot', 'graph', 'visualize']): | |
chart_type = 'bar' # default | |
if 'scatter' in query_lower: | |
chart_type = 'scatter' | |
elif 'line' in query_lower: | |
chart_type = 'line' | |
elif 'histogram' in query_lower: | |
chart_type = 'histogram' | |
return { | |
'analysisType': 'visualization', | |
'parameters': { | |
'chartType': chart_type, | |
'xColumn': mentioned_columns[0] if mentioned_columns else None, | |
'yColumn': mentioned_columns[1] if len(mentioned_columns) > 1 else None | |
} | |
} | |
else: | |
# Default to basic statistics | |
return { | |
'analysisType': 'statistics', | |
'parameters': { | |
'columns': mentioned_columns if mentioned_columns else None | |
} | |
} | |
def health_check(): | |
return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()}) | |
def upload_file(): | |
try: | |
if 'file' not in request.files: | |
return jsonify({'error': 'No file provided'}), 400 | |
file = request.files['file'] | |
session_id = request.form.get('sessionId') | |
if not session_id: | |
return jsonify({'error': 'Session ID required'}), 400 | |
if file.filename == '': | |
return jsonify({'error': 'No file selected'}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({'error': 'File type not supported'}), 400 | |
# Check file size | |
file.seek(0, 2) # Seek to end | |
file_size = file.tell() | |
file.seek(0) # Reset to beginning | |
if file_size > MAX_FILE_SIZE: | |
return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400 | |
# Generate unique file ID and secure filename | |
file_id = str(uuid.uuid4()) | |
filename = secure_filename(file.filename) | |
# Create session directory | |
session_dir = os.path.join(UPLOAD_FOLDER, session_id) | |
os.makedirs(session_dir, exist_ok=True) | |
# Save file | |
filepath = os.path.join(session_dir, f"{file_id}_{filename}") | |
file.save(filepath) | |
# Store file info | |
if session_id not in file_storage: | |
file_storage[session_id] = {} | |
file_storage[session_id][file_id] = { | |
'filename': filename, | |
'filepath': filepath, | |
'size': file_size, | |
'timestamp': datetime.now().isoformat() | |
} | |
return jsonify({ | |
'fileId': file_id, | |
'filename': filename, | |
'size': file_size, | |
'message': 'File uploaded successfully' | |
}) | |
except Exception as e: | |
logger.error(f"Upload error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def preview_file(file_id): | |
try: | |
session_id = request.args.get('sessionId') | |
if not session_id or session_id not in file_storage: | |
return jsonify({'error': 'Invalid session'}), 400 | |
if file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
# Load data and get preview | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
preview_data = { | |
'columns': df.columns.tolist(), | |
'dtypes': df.dtypes.astype(str).to_dict(), | |
'shape': df.shape, | |
'head': df.head(5).to_dict('records'), | |
'missing_values': df.isnull().sum().to_dict() | |
} | |
return jsonify(preview_data) | |
except Exception as e: | |
logger.error(f"Preview error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def analyze_data(): | |
try: | |
data = request.get_json() | |
session_id = data.get('sessionId') | |
file_id = data.get('fileId') | |
analysis_type = data.get('analysisType') | |
parameters = data.get('parameters', {}) | |
natural_query = data.get('naturalQuery') | |
if not all([session_id, file_id]): | |
return jsonify({'error': 'Session ID and File ID required'}), 400 | |
if session_id not in file_storage or file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
df = load_data_file(file_info['filepath'], file_info['filename']) | |
# Handle natural language query | |
if natural_query and not analysis_type: | |
parsed_query = parse_natural_language_query(natural_query, df.columns.tolist()) | |
analysis_type = parsed_query['analysisType'] | |
parameters = parsed_query['parameters'] | |
result = {} | |
if analysis_type == 'statistics': | |
result = perform_basic_statistics(df, parameters.get('columns')) | |
elif analysis_type == 'groupby': | |
result = perform_groupby_analysis( | |
df, | |
parameters.get('groupByColumn'), | |
parameters.get('targetColumn'), | |
parameters.get('operation', 'mean'), | |
parameters.get('filters') | |
) | |
elif analysis_type == 'correlation': | |
result = perform_correlation_analysis( | |
df, | |
parameters.get('columns'), | |
parameters.get('method', 'pearson') | |
) | |
elif analysis_type == 'outliers': | |
result = detect_outliers( | |
df, | |
parameters.get('columns'), | |
parameters.get('method', 'iqr') | |
) | |
elif analysis_type == 'visualization': | |
chart_base64 = generate_visualization( | |
df, | |
parameters.get('chartType', 'bar'), | |
parameters.get('xColumn'), | |
parameters.get('yColumn'), | |
parameters.get('groupColumn') | |
) | |
result = { | |
'chart': chart_base64, | |
'chartType': parameters.get('chartType', 'bar') | |
} | |
else: | |
return jsonify({'error': 'Invalid analysis type'}), 400 | |
# Save result to processed folder | |
result_id = str(uuid.uuid4()) | |
result_dir = os.path.join(PROCESSED_FOLDER, session_id) | |
os.makedirs(result_dir, exist_ok=True) | |
result_filepath = os.path.join(result_dir, f"{result_id}_result.json") | |
with open(result_filepath, 'w') as f: | |
json.dump(result, f, indent=2, default=str) | |
return jsonify({ | |
'resultId': result_id, | |
'result': result, | |
'analysisType': analysis_type, | |
'timestamp': datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Analysis error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def list_files(session_id): | |
try: | |
if session_id not in file_storage: | |
return jsonify({'files': []}) | |
files = [] | |
for file_id, file_info in file_storage[session_id].items(): | |
# Check if file still exists | |
if os.path.exists(file_info['filepath']): | |
files.append({ | |
'fileId': file_id, | |
'filename': file_info['filename'], | |
'size': file_info['size'], | |
'timestamp': file_info['timestamp'] | |
}) | |
return jsonify({'files': files}) | |
except Exception as e: | |
logger.error(f"List files error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def delete_file(file_id): | |
try: | |
session_id = request.args.get('sessionId') | |
if not session_id or session_id not in file_storage: | |
return jsonify({'error': 'Invalid session'}), 400 | |
if file_id not in file_storage[session_id]: | |
return jsonify({'error': 'File not found'}), 404 | |
file_info = file_storage[session_id][file_id] | |
# Remove file from filesystem | |
if os.path.exists(file_info['filepath']): | |
os.remove(file_info['filepath']) | |
# Remove from storage | |
del file_storage[session_id][file_id] | |
return jsonify({'message': 'File deleted successfully'}) | |
except Exception as e: | |
logger.error(f"Delete error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def download_result(result_id): | |
try: | |
session_id = request.args.get('sessionId') | |
format_type = request.args.get('format', 'json') | |
if not session_id: | |
return jsonify({'error': 'Session ID required'}), 400 | |
result_filepath = os.path.join(PROCESSED_FOLDER, session_id, f"{result_id}_result.json") | |
if not os.path.exists(result_filepath): | |
return jsonify({'error': 'Result not found'}), 404 | |
if format_type == 'json': | |
return send_file(result_filepath, as_attachment=True, | |
download_name=f"analysis_result_{result_id}.json") | |
else: | |
return jsonify({'error': 'Format not supported'}), 400 | |
except Exception as e: | |
logger.error(f"Download error: {str(e)}") | |
return jsonify({'error': str(e)}), 500 | |
def home(): | |
return jsonify({ | |
'message': 'Data Analytics API is running!', | |
'version': '1.0.0', | |
'endpoints': { | |
'health': '/api/health', | |
'upload': '/api/upload', | |
'preview': '/api/preview/<file_id>', | |
'analyze': '/api/analyze', | |
'files': '/api/files/<session_id>', | |
'delete': '/api/file/<file_id>', | |
'download': '/api/download/<result_id>' | |
}, | |
'timestamp': datetime.now().isoformat() | |
}) | |
def not_found(error): | |
return jsonify({ | |
'error': 'Endpoint not found', | |
'message': 'Please check the API documentation', | |
'available_endpoints': [ | |
'/', | |
'/api/health', | |
'/api/upload', | |
'/api/preview/<file_id>', | |
'/api/analyze', | |
'/api/files/<session_id>', | |
'/api/file/<file_id>', | |
'/api/download/<result_id>' | |
] | |
}), 404 | |
if __name__ == '__main__': | |
app.run(host='0.0.0.0', port=7860, debug=True) |