Data-analytics / app.py
mike23415's picture
Update app.py
66de5aa verified
raw
history blame
22.8 kB
import os
import uuid
import json
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
import threading
import time
import logging
from scipy import stats
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app)
# Configuration
UPLOAD_FOLDER = '/tmp/uploads'
PROCESSED_FOLDER = '/tmp/processed'
MAX_FILE_SIZE = 512 * 1024 * 1024 # 512MB
ALLOWED_EXTENSIONS = {'csv', 'xlsx', 'xls', 'json', 'parquet', 'tsv'}
FILE_EXPIRY_HOURS = 1
# Ensure directories exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(PROCESSED_FOLDER, exist_ok=True)
# File storage to track sessions and files
file_storage = {}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_age(filepath):
"""Get file age in hours"""
if os.path.exists(filepath):
file_time = os.path.getmtime(filepath)
return (time.time() - file_time) / 3600
return float('inf')
def cleanup_old_files():
"""Remove files older than FILE_EXPIRY_HOURS"""
try:
for folder in [UPLOAD_FOLDER, PROCESSED_FOLDER]:
for root, dirs, files in os.walk(folder):
for file in files:
filepath = os.path.join(root, file)
if get_file_age(filepath) > FILE_EXPIRY_HOURS:
os.remove(filepath)
logger.info(f"Cleaned up old file: {filepath}")
# Clean up file_storage entries
current_time = datetime.now()
sessions_to_remove = []
for session_id, files in file_storage.items():
files_to_remove = []
for file_id, file_info in files.items():
file_time = datetime.fromisoformat(file_info['timestamp'])
if (current_time - file_time).total_seconds() > FILE_EXPIRY_HOURS * 3600:
files_to_remove.append(file_id)
for file_id in files_to_remove:
del files[file_id]
if not files:
sessions_to_remove.append(session_id)
for session_id in sessions_to_remove:
del file_storage[session_id]
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
# Setup scheduler for automatic cleanup
scheduler = BackgroundScheduler()
scheduler.add_job(func=cleanup_old_files, trigger="interval", minutes=15)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())
def load_data_file(filepath, filename):
"""Load data from various file formats"""
try:
file_ext = filename.rsplit('.', 1)[1].lower()
if file_ext == 'csv':
return pd.read_csv(filepath)
elif file_ext in ['xlsx', 'xls']:
return pd.read_excel(filepath)
elif file_ext == 'json':
return pd.read_json(filepath)
elif file_ext == 'parquet':
return pd.read_parquet(filepath)
elif file_ext == 'tsv':
return pd.read_csv(filepath, sep='\t')
else:
raise ValueError(f"Unsupported file format: {file_ext}")
except Exception as e:
raise Exception(f"Error loading file: {str(e)}")
def perform_basic_statistics(df, columns=None):
"""Perform basic statistical analysis"""
if columns:
df = df[columns]
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
categorical_cols = df.select_dtypes(exclude=[np.number]).columns.tolist()
result = {
'numeric_summary': {},
'categorical_summary': {},
'general_info': {
'total_rows': len(df),
'total_columns': len(df.columns),
'numeric_columns': len(numeric_cols),
'categorical_columns': len(categorical_cols),
'missing_values': df.isnull().sum().to_dict()
}
}
# Numeric statistics
if numeric_cols:
numeric_stats = df[numeric_cols].describe()
result['numeric_summary'] = numeric_stats.to_dict()
# Categorical statistics
if categorical_cols:
for col in categorical_cols:
result['categorical_summary'][col] = {
'unique_values': df[col].nunique(),
'top_values': df[col].value_counts().head(10).to_dict(),
'missing_count': df[col].isnull().sum()
}
return result
def perform_groupby_analysis(df, group_column, target_column, operation='mean', filters=None):
"""Perform group by analysis"""
# Apply filters if provided
if filters:
for f in filters:
col, op, val = f['column'], f['operator'], f['value']
if op == '>':
df = df[df[col] > val]
elif op == '<':
df = df[df[col] < val]
elif op == '==':
df = df[df[col] == val]
elif op == '!=':
df = df[df[col] != val]
elif op == '>=':
df = df[df[col] >= val]
elif op == '<=':
df = df[df[col] <= val]
# Perform groupby operation
grouped = df.groupby(group_column)[target_column]
if operation == 'mean':
result = grouped.mean()
elif operation == 'sum':
result = grouped.sum()
elif operation == 'count':
result = grouped.count()
elif operation == 'max':
result = grouped.max()
elif operation == 'min':
result = grouped.min()
elif operation == 'std':
result = grouped.std()
else:
raise ValueError(f"Unsupported operation: {operation}")
return {
'result': result.to_dict(),
'operation': operation,
'group_column': group_column,
'target_column': target_column,
'total_groups': len(result)
}
def perform_correlation_analysis(df, columns=None, method='pearson'):
"""Perform correlation analysis"""
if columns:
df = df[columns]
# Only numeric columns
numeric_df = df.select_dtypes(include=[np.number])
if numeric_df.empty:
raise ValueError("No numeric columns found for correlation analysis")
correlation_matrix = numeric_df.corr(method=method)
return {
'correlation_matrix': correlation_matrix.to_dict(),
'method': method,
'columns': numeric_df.columns.tolist()
}
def detect_outliers(df, columns=None, method='iqr'):
"""Detect outliers in numeric columns"""
if columns:
df = df[columns]
numeric_df = df.select_dtypes(include=[np.number])
outliers = {}
for col in numeric_df.columns:
if method == 'iqr':
Q1 = numeric_df[col].quantile(0.25)
Q3 = numeric_df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_indices = numeric_df[(numeric_df[col] < lower_bound) |
(numeric_df[col] > upper_bound)].index.tolist()
elif method == 'zscore':
z_scores = np.abs(stats.zscore(numeric_df[col].dropna()))
outlier_indices = numeric_df[z_scores > 3].index.tolist()
outliers[col] = {
'count': len(outlier_indices),
'indices': outlier_indices[:100], # Limit to first 100
'percentage': (len(outlier_indices) / len(numeric_df)) * 100
}
return outliers
def generate_visualization(df, chart_type, x_column, y_column=None, group_column=None):
"""Generate visualization and return base64 encoded image"""
plt.figure(figsize=(10, 6))
try:
if chart_type == 'histogram':
plt.hist(df[x_column], bins=30, alpha=0.7)
plt.xlabel(x_column)
plt.ylabel('Frequency')
plt.title(f'Histogram of {x_column}')
elif chart_type == 'scatter':
if not y_column:
raise ValueError("Y column required for scatter plot")
plt.scatter(df[x_column], df[y_column], alpha=0.6)
plt.xlabel(x_column)
plt.ylabel(y_column)
plt.title(f'{x_column} vs {y_column}')
elif chart_type == 'bar':
if group_column:
grouped = df.groupby(group_column)[x_column].mean() if pd.api.types.is_numeric_dtype(df[x_column]) else df[group_column].value_counts()
else:
grouped = df[x_column].value_counts().head(20)
grouped.plot(kind='bar')
plt.xlabel(group_column or x_column)
plt.ylabel('Count' if not pd.api.types.is_numeric_dtype(df[x_column]) else f'Mean {x_column}')
plt.title(f'Bar Chart')
plt.xticks(rotation=45)
elif chart_type == 'line':
if y_column:
plt.plot(df[x_column], df[y_column])
plt.xlabel(x_column)
plt.ylabel(y_column)
else:
df[x_column].plot()
plt.ylabel(x_column)
plt.title('Line Chart')
elif chart_type == 'box':
if group_column:
df.boxplot(column=x_column, by=group_column)
else:
df.boxplot(column=x_column)
plt.title('Box Plot')
plt.tight_layout()
# Convert plot to base64 string
img_buffer = io.BytesIO()
plt.savefig(img_buffer, format='png', dpi=150, bbox_inches='tight')
img_buffer.seek(0)
img_base64 = base64.b64encode(img_buffer.getvalue()).decode()
plt.close()
return img_base64
except Exception as e:
plt.close()
raise Exception(f"Error generating visualization: {str(e)}")
def parse_natural_language_query(query, df_columns):
"""Simple natural language query parser"""
query_lower = query.lower()
# Define operation keywords
operations = {
'average': 'mean', 'mean': 'mean', 'avg': 'mean',
'sum': 'sum', 'total': 'sum',
'count': 'count', 'number': 'count',
'max': 'max', 'maximum': 'max', 'highest': 'max',
'min': 'min', 'minimum': 'min', 'lowest': 'min'
}
# Find operation
operation = 'mean' # default
for keyword, op in operations.items():
if keyword in query_lower:
operation = op
break
# Find columns mentioned in query
mentioned_columns = [col for col in df_columns if col.lower() in query_lower]
# Simple parsing patterns
if 'by' in query_lower and len(mentioned_columns) >= 2:
# Group by analysis
target_col = mentioned_columns[0]
group_col = mentioned_columns[-1]
return {
'analysisType': 'groupby',
'parameters': {
'groupByColumn': group_col,
'targetColumn': target_col,
'operation': operation
}
}
elif 'correlation' in query_lower:
return {
'analysisType': 'correlation',
'parameters': {
'columns': mentioned_columns if mentioned_columns else None
}
}
elif any(word in query_lower for word in ['chart', 'plot', 'graph', 'visualize']):
chart_type = 'bar' # default
if 'scatter' in query_lower:
chart_type = 'scatter'
elif 'line' in query_lower:
chart_type = 'line'
elif 'histogram' in query_lower:
chart_type = 'histogram'
return {
'analysisType': 'visualization',
'parameters': {
'chartType': chart_type,
'xColumn': mentioned_columns[0] if mentioned_columns else None,
'yColumn': mentioned_columns[1] if len(mentioned_columns) > 1 else None
}
}
else:
# Default to basic statistics
return {
'analysisType': 'statistics',
'parameters': {
'columns': mentioned_columns if mentioned_columns else None
}
}
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({'status': 'healthy', 'timestamp': datetime.now().isoformat()})
@app.route('/api/upload', methods=['POST'])
def upload_file():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
session_id = request.form.get('sessionId')
if not session_id:
return jsonify({'error': 'Session ID required'}), 400
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not supported'}), 400
# Check file size
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset to beginning
if file_size > MAX_FILE_SIZE:
return jsonify({'error': f'File too large. Maximum size is {MAX_FILE_SIZE // (1024*1024)}MB'}), 400
# Generate unique file ID and secure filename
file_id = str(uuid.uuid4())
filename = secure_filename(file.filename)
# Create session directory
session_dir = os.path.join(UPLOAD_FOLDER, session_id)
os.makedirs(session_dir, exist_ok=True)
# Save file
filepath = os.path.join(session_dir, f"{file_id}_{filename}")
file.save(filepath)
# Store file info
if session_id not in file_storage:
file_storage[session_id] = {}
file_storage[session_id][file_id] = {
'filename': filename,
'filepath': filepath,
'size': file_size,
'timestamp': datetime.now().isoformat()
}
return jsonify({
'fileId': file_id,
'filename': filename,
'size': file_size,
'message': 'File uploaded successfully'
})
except Exception as e:
logger.error(f"Upload error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/preview/<file_id>', methods=['GET'])
def preview_file(file_id):
try:
session_id = request.args.get('sessionId')
if not session_id or session_id not in file_storage:
return jsonify({'error': 'Invalid session'}), 400
if file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
# Load data and get preview
df = load_data_file(file_info['filepath'], file_info['filename'])
preview_data = {
'columns': df.columns.tolist(),
'dtypes': df.dtypes.astype(str).to_dict(),
'shape': df.shape,
'head': df.head(5).to_dict('records'),
'missing_values': df.isnull().sum().to_dict()
}
return jsonify(preview_data)
except Exception as e:
logger.error(f"Preview error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/analyze', methods=['POST'])
def analyze_data():
try:
data = request.get_json()
session_id = data.get('sessionId')
file_id = data.get('fileId')
analysis_type = data.get('analysisType')
parameters = data.get('parameters', {})
natural_query = data.get('naturalQuery')
if not all([session_id, file_id]):
return jsonify({'error': 'Session ID and File ID required'}), 400
if session_id not in file_storage or file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
df = load_data_file(file_info['filepath'], file_info['filename'])
# Handle natural language query
if natural_query and not analysis_type:
parsed_query = parse_natural_language_query(natural_query, df.columns.tolist())
analysis_type = parsed_query['analysisType']
parameters = parsed_query['parameters']
result = {}
if analysis_type == 'statistics':
result = perform_basic_statistics(df, parameters.get('columns'))
elif analysis_type == 'groupby':
result = perform_groupby_analysis(
df,
parameters.get('groupByColumn'),
parameters.get('targetColumn'),
parameters.get('operation', 'mean'),
parameters.get('filters')
)
elif analysis_type == 'correlation':
result = perform_correlation_analysis(
df,
parameters.get('columns'),
parameters.get('method', 'pearson')
)
elif analysis_type == 'outliers':
result = detect_outliers(
df,
parameters.get('columns'),
parameters.get('method', 'iqr')
)
elif analysis_type == 'visualization':
chart_base64 = generate_visualization(
df,
parameters.get('chartType', 'bar'),
parameters.get('xColumn'),
parameters.get('yColumn'),
parameters.get('groupColumn')
)
result = {
'chart': chart_base64,
'chartType': parameters.get('chartType', 'bar')
}
else:
return jsonify({'error': 'Invalid analysis type'}), 400
# Save result to processed folder
result_id = str(uuid.uuid4())
result_dir = os.path.join(PROCESSED_FOLDER, session_id)
os.makedirs(result_dir, exist_ok=True)
result_filepath = os.path.join(result_dir, f"{result_id}_result.json")
with open(result_filepath, 'w') as f:
json.dump(result, f, indent=2, default=str)
return jsonify({
'resultId': result_id,
'result': result,
'analysisType': analysis_type,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/files/<session_id>', methods=['GET'])
def list_files(session_id):
try:
if session_id not in file_storage:
return jsonify({'files': []})
files = []
for file_id, file_info in file_storage[session_id].items():
# Check if file still exists
if os.path.exists(file_info['filepath']):
files.append({
'fileId': file_id,
'filename': file_info['filename'],
'size': file_info['size'],
'timestamp': file_info['timestamp']
})
return jsonify({'files': files})
except Exception as e:
logger.error(f"List files error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/file/<file_id>', methods=['DELETE'])
def delete_file(file_id):
try:
session_id = request.args.get('sessionId')
if not session_id or session_id not in file_storage:
return jsonify({'error': 'Invalid session'}), 400
if file_id not in file_storage[session_id]:
return jsonify({'error': 'File not found'}), 404
file_info = file_storage[session_id][file_id]
# Remove file from filesystem
if os.path.exists(file_info['filepath']):
os.remove(file_info['filepath'])
# Remove from storage
del file_storage[session_id][file_id]
return jsonify({'message': 'File deleted successfully'})
except Exception as e:
logger.error(f"Delete error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/download/<result_id>', methods=['GET'])
def download_result(result_id):
try:
session_id = request.args.get('sessionId')
format_type = request.args.get('format', 'json')
if not session_id:
return jsonify({'error': 'Session ID required'}), 400
result_filepath = os.path.join(PROCESSED_FOLDER, session_id, f"{result_id}_result.json")
if not os.path.exists(result_filepath):
return jsonify({'error': 'Result not found'}), 404
if format_type == 'json':
return send_file(result_filepath, as_attachment=True,
download_name=f"analysis_result_{result_id}.json")
else:
return jsonify({'error': 'Format not supported'}), 400
except Exception as e:
logger.error(f"Download error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/', methods=['GET'])
def home():
return jsonify({
'message': 'Data Analytics API is running!',
'version': '1.0.0',
'endpoints': {
'health': '/api/health',
'upload': '/api/upload',
'preview': '/api/preview/<file_id>',
'analyze': '/api/analyze',
'files': '/api/files/<session_id>',
'delete': '/api/file/<file_id>',
'download': '/api/download/<result_id>'
},
'timestamp': datetime.now().isoformat()
})
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'Endpoint not found',
'message': 'Please check the API documentation',
'available_endpoints': [
'/',
'/api/health',
'/api/upload',
'/api/preview/<file_id>',
'/api/analyze',
'/api/files/<session_id>',
'/api/file/<file_id>',
'/api/download/<result_id>'
]
}), 404
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860, debug=True)