Data-analytics / app.py
mike23415's picture
Update app.py
682de52 verified
raw
history blame
14.7 kB
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import pandas as pd
import os
import threading
import time
import re
app = Flask(__name__)
CORS(app)
UPLOAD_FOLDER = "/tmp"
SESSION_KEY_PREFIX = "data_tool_session_id"
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB
# === Cleanup Thread: delete files older than 60 minutes ===
def clean_old_files(folder=UPLOAD_FOLDER, max_age=60):
def cleanup_loop():
while True:
now = time.time()
for f in os.listdir(folder):
path = os.path.join(folder, f)
if os.path.isfile(path):
if now - os.path.getmtime(path) > max_age * 60:
try:
os.remove(path)
print(f"[Cleanup] Deleted: {path}")
except Exception as e:
print(f"[Cleanup Error] {e}")
time.sleep(600) # Every 10 minutes
threading.Thread(target=cleanup_loop, daemon=True).start()
# Start cleanup thread
clean_old_files()
# === Instruction Parser ===
def apply_instruction(df, instruction):
instruction = instruction.lower()
try:
match = re.search(r"drop column (\w+)", instruction)
if match:
df = df.drop(columns=[match.group(1)])
if "remove duplicates" in instruction:
df = df.drop_duplicates()
if "drop missing" in instruction or "remove null" in instruction:
df = df.dropna()
match = re.search(r"fill missing.*with ([\w\.]+)", instruction)
if match:
val = match.group(1)
try: val = float(val)
except: pass
df = df.fillna(val)
match = re.search(r"sort by (\w+)( descending| desc)?", instruction)
if match:
col = match.group(1)
ascending = not bool(match.group(2))
df = df.sort_values(by=col, ascending=ascending)
match = re.search(r"rename column (\w+) to (\w+)", instruction)
if match:
df = df.rename(columns={match.group(1): match.group(2)})
match = re.search(r"filter where (\w+) > (\d+)", instruction)
if match:
df = df[df[match.group(1)] > float(match.group(2))]
match = re.search(r"group by (\w+) and sum (\w+)", instruction)
if match:
df = df.groupby(match.group(1))[match.group(2)].sum().reset_index()
match = re.search(r"add column (\w+) as (\w+) \+ (\w+)", instruction)
if match:
df[match.group(1)] = df[match.group(2)] + df[match.group(3)]
match = re.search(r"normalize column (\w+)", instruction)
if match:
col = match.group(1)
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import pandas as pd
import os
import threading
import time
import re
app = Flask(__name__)
CORS(app)
UPLOAD_FOLDER = "/tmp"
SESSION_KEY_PREFIX = "data_tool_session_id"
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB
# === Root Route (Required for Hugging Face) ===
@app.route("/", methods=["GET"])
def root():
return jsonify({
"message": "Data Processing API is running",
"status": "healthy",
"endpoints": {
"POST /process": "Upload and process CSV/Excel files",
"GET /download/<filename>": "Download processed file with session_id parameter",
"GET /health": "Health check"
},
"version": "1.0"
})
# === Health Check Route ===
@app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "timestamp": time.time()})
# === Cleanup Thread: delete files older than 60 minutes ===
def clean_old_files(folder=UPLOAD_FOLDER, max_age=60):
def cleanup_loop():
while True:
now = time.time()
try:
if os.path.exists(folder):
for f in os.listdir(folder):
path = os.path.join(folder, f)
if os.path.isfile(path):
if now - os.path.getmtime(path) > max_age * 60:
try:
os.remove(path)
print(f"[Cleanup] Deleted: {path}")
except Exception as e:
print(f"[Cleanup Error] {e}")
except Exception as e:
print(f"[Cleanup Error] {e}")
time.sleep(600) # Every 10 minutes
threading.Thread(target=cleanup_loop, daemon=True).start()
# Start cleanup thread
clean_old_files()
# === Instruction Parser ===
def apply_instruction(df, instruction):
instruction = instruction.lower().strip()
if not instruction:
return df, "No instruction provided"
try:
# Drop column
match = re.search(r"drop column (\w+)", instruction)
if match:
col_name = match.group(1)
if col_name in df.columns:
df = df.drop(columns=[col_name])
return df, f"Dropped column '{col_name}'"
else:
return df, f"Error: Column '{col_name}' not found"
# Remove duplicates
if "remove duplicates" in instruction:
original_count = len(df)
df = df.drop_duplicates()
removed_count = original_count - len(df)
return df, f"Removed {removed_count} duplicate rows"
# Drop missing values
if "drop missing" in instruction or "remove null" in instruction:
original_count = len(df)
df = df.dropna()
removed_count = original_count - len(df)
return df, f"Removed {removed_count} rows with missing values"
# Fill missing values
match = re.search(r"fill missing.*with ([\w\.]+)", instruction)
if match:
val = match.group(1)
try:
val = float(val)
except:
pass
missing_count = df.isnull().sum().sum()
df = df.fillna(val)
return df, f"Filled {missing_count} missing values with '{val}'"
# Sort by column
match = re.search(r"sort by (\w+)( descending| desc)?", instruction)
if match:
col = match.group(1)
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
ascending = not bool(match.group(2))
df = df.sort_values(by=col, ascending=ascending)
order = "descending" if not ascending else "ascending"
return df, f"Sorted by '{col}' in {order} order"
# Rename column
match = re.search(r"rename column (\w+) to (\w+)", instruction)
if match:
old_name, new_name = match.group(1), match.group(2)
if old_name not in df.columns:
return df, f"Error: Column '{old_name}' not found"
df = df.rename(columns={old_name: new_name})
return df, f"Renamed column '{old_name}' to '{new_name}'"
# Filter rows
match = re.search(r"filter where (\w+) > (\d+)", instruction)
if match:
col, val = match.group(1), float(match.group(2))
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
original_count = len(df)
df = df[df[col] > val]
kept_count = len(df)
return df, f"Filtered data: kept {kept_count} rows where {col} > {val}"
# Group by and sum
match = re.search(r"group by (\w+) and sum (\w+)", instruction)
if match:
group_col, sum_col = match.group(1), match.group(2)
if group_col not in df.columns:
return df, f"Error: Column '{group_col}' not found"
if sum_col not in df.columns:
return df, f"Error: Column '{sum_col}' not found"
df = df.groupby(group_col)[sum_col].sum().reset_index()
return df, f"Grouped by '{group_col}' and summed '{sum_col}'"
# Add column (sum of two columns)
match = re.search(r"add column (\w+) as (\w+) \+ (\w+)", instruction)
if match:
new_col, col1, col2 = match.group(1), match.group(2), match.group(3)
if col1 not in df.columns:
return df, f"Error: Column '{col1}' not found"
if col2 not in df.columns:
return df, f"Error: Column '{col2}' not found"
df[new_col] = df[col1] + df[col2]
return df, f"Added column '{new_col}' as sum of '{col1}' and '{col2}'"
# Normalize column
match = re.search(r"normalize column (\w+)", instruction)
if match:
col = match.group(1)
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
if not pd.api.types.is_numeric_dtype(df[col]):
return df, f"Error: Column '{col}' is not numeric"
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())
return df, f"Normalized column '{col}' using min-max scaling"
# Standardize column
match = re.search(r"standardize column (\w+)", instruction)
if match:
col = match.group(1)
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
if not pd.api.types.is_numeric_dtype(df[col]):
return df, f"Error: Column '{col}' is not numeric"
df[col] = (df[col] - df[col].mean()) / df[col].std()
return df, f"Standardized column '{col}' using z-score"
# Split column by comma
match = re.search(r"split column (\w+) by comma", instruction)
if match:
col = match.group(1)
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
df[[f"{col}_1", f"{col}_2"]] = df[col].str.split(",", expand=True)
return df, f"Split column '{col}' by comma into '{col}_1' and '{col}_2'"
# Remove special characters
match = re.search(r"remove special characters from (\w+)", instruction)
if match:
col = match.group(1)
if col not in df.columns:
return df, f"Error: Column '{col}' not found"
df[col] = df[col].astype(str).str.replace(r"[^a-zA-Z0-9]", "", regex=True)
return df, f"Removed special characters from column '{col}'"
# If no instruction matched
return df, f"Instruction '{instruction}' not recognized"
except Exception as e:
return df, f"Error: {str(e)}"
# === File Processor Endpoint ===
@app.route("/process", methods=["POST"])
def process_file():
try:
# Validate request
if "file" not in request.files:
return jsonify({"error": "No file provided"}), 400
if "instruction" not in request.form:
return jsonify({"error": "No instruction provided"}), 400
if "session_id" not in request.form:
return jsonify({"error": "No session_id provided"}), 400
file = request.files["file"]
instruction = request.form["instruction"]
session_id = request.form["session_id"]
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
# Read file
try:
if file.filename.lower().endswith('.csv'):
df = pd.read_csv(file)
elif file.filename.lower().endswith(('.xlsx', '.xls')):
df = pd.read_excel(file)
else:
return jsonify({"error": "Unsupported file format. Use CSV or Excel files."}), 400
except Exception as e:
return jsonify({"error": f"File reading error: {str(e)}"}), 400
# Apply instruction
df_processed, status = apply_instruction(df, instruction)
# Save processed file
original_name = file.filename.rsplit('.', 1)[0] # Remove extension
filename = f"processed_{session_id}_{original_name}.csv"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
try:
df_processed.to_csv(filepath, index=False)
except Exception as e:
return jsonify({"error": f"File saving error: {str(e)}"}), 500
# Generate preview (first 5 rows)
preview = df_processed.head(5).to_dict(orient="records")
return jsonify({
"success": True,
"message": status,
"preview": preview,
"download_url": f"/download/{filename}",
"original_rows": len(df),
"processed_rows": len(df_processed),
"columns": list(df_processed.columns),
"filename": filename
})
except Exception as e:
return jsonify({"error": f"Processing error: {str(e)}"}), 500
# === File Download with Session ID Verification ===
@app.route("/download/<filename>", methods=["GET"])
def download_file(filename):
try:
session_id = request.args.get("session_id")
# Validate session
if not session_id:
return jsonify({"error": "session_id parameter required"}), 400
if f"_{session_id}_" not in filename:
return jsonify({"error": "Invalid session or unauthorized access"}), 403
# Check file exists
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if not os.path.exists(filepath):
return jsonify({"error": "File not found or expired"}), 404
return send_file(filepath, as_attachment=True, download_name=filename)
except Exception as e:
return jsonify({"error": f"Download error: {str(e)}"}), 500
# === Error Handlers ===
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(413)
def too_large(error):
return jsonify({"error": "File too large (max 512MB)"}), 413
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Internal server error"}), 500
# === Run on Port 7860 for Hugging Face ===
if __name__ == "__main__":
print("πŸš€ Starting Data Processing API on port 7860...")
print("πŸ“Š API Endpoints:")
print(" POST /process - Process files")
print(" GET /download/<filename> - Download processed files")
print(" GET /health - Health check")
app.run(host="0.0.0.0", port=7860, debug=False)