Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, send_file | |
from flask_cors import CORS | |
import pandas as pd | |
import os | |
import threading | |
import time | |
import re | |
app = Flask(__name__) | |
CORS(app) | |
UPLOAD_FOLDER = "/tmp" | |
SESSION_KEY_PREFIX = "data_tool_session_id" | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB | |
# === Cleanup Thread: delete files older than 60 minutes === | |
def clean_old_files(folder=UPLOAD_FOLDER, max_age=60): | |
def cleanup_loop(): | |
while True: | |
now = time.time() | |
for f in os.listdir(folder): | |
path = os.path.join(folder, f) | |
if os.path.isfile(path): | |
if now - os.path.getmtime(path) > max_age * 60: | |
try: | |
os.remove(path) | |
print(f"[Cleanup] Deleted: {path}") | |
except Exception as e: | |
print(f"[Cleanup Error] {e}") | |
time.sleep(600) # Every 10 minutes | |
threading.Thread(target=cleanup_loop, daemon=True).start() | |
# Start cleanup thread | |
clean_old_files() | |
# === Instruction Parser === | |
def apply_instruction(df, instruction): | |
instruction = instruction.lower() | |
try: | |
match = re.search(r"drop column (\w+)", instruction) | |
if match: | |
df = df.drop(columns=[match.group(1)]) | |
if "remove duplicates" in instruction: | |
df = df.drop_duplicates() | |
if "drop missing" in instruction or "remove null" in instruction: | |
df = df.dropna() | |
match = re.search(r"fill missing.*with ([\w\.]+)", instruction) | |
if match: | |
val = match.group(1) | |
try: val = float(val) | |
except: pass | |
df = df.fillna(val) | |
match = re.search(r"sort by (\w+)( descending| desc)?", instruction) | |
if match: | |
col = match.group(1) | |
ascending = not bool(match.group(2)) | |
df = df.sort_values(by=col, ascending=ascending) | |
match = re.search(r"rename column (\w+) to (\w+)", instruction) | |
if match: | |
df = df.rename(columns={match.group(1): match.group(2)}) | |
match = re.search(r"filter where (\w+) > (\d+)", instruction) | |
if match: | |
df = df[df[match.group(1)] > float(match.group(2))] | |
match = re.search(r"group by (\w+) and sum (\w+)", instruction) | |
if match: | |
df = df.groupby(match.group(1))[match.group(2)].sum().reset_index() | |
match = re.search(r"add column (\w+) as (\w+) \+ (\w+)", instruction) | |
if match: | |
df[match.group(1)] = df[match.group(2)] + df[match.group(3)] | |
match = re.search(r"normalize column (\w+)", instruction) | |
if match: | |
col = match.group(1) | |
from flask import Flask, request, jsonify, send_file | |
from flask_cors import CORS | |
import pandas as pd | |
import os | |
import threading | |
import time | |
import re | |
app = Flask(__name__) | |
CORS(app) | |
UPLOAD_FOLDER = "/tmp" | |
SESSION_KEY_PREFIX = "data_tool_session_id" | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
app.config['MAX_CONTENT_LENGTH'] = 512 * 1024 * 1024 # 512 MB | |
# === Root Route (Required for Hugging Face) === | |
def root(): | |
return jsonify({ | |
"message": "Data Processing API is running", | |
"status": "healthy", | |
"endpoints": { | |
"POST /process": "Upload and process CSV/Excel files", | |
"GET /download/<filename>": "Download processed file with session_id parameter", | |
"GET /health": "Health check" | |
}, | |
"version": "1.0" | |
}) | |
# === Health Check Route === | |
def health_check(): | |
return jsonify({"status": "healthy", "timestamp": time.time()}) | |
# === Cleanup Thread: delete files older than 60 minutes === | |
def clean_old_files(folder=UPLOAD_FOLDER, max_age=60): | |
def cleanup_loop(): | |
while True: | |
now = time.time() | |
try: | |
if os.path.exists(folder): | |
for f in os.listdir(folder): | |
path = os.path.join(folder, f) | |
if os.path.isfile(path): | |
if now - os.path.getmtime(path) > max_age * 60: | |
try: | |
os.remove(path) | |
print(f"[Cleanup] Deleted: {path}") | |
except Exception as e: | |
print(f"[Cleanup Error] {e}") | |
except Exception as e: | |
print(f"[Cleanup Error] {e}") | |
time.sleep(600) # Every 10 minutes | |
threading.Thread(target=cleanup_loop, daemon=True).start() | |
# Start cleanup thread | |
clean_old_files() | |
# === Instruction Parser === | |
def apply_instruction(df, instruction): | |
instruction = instruction.lower().strip() | |
if not instruction: | |
return df, "No instruction provided" | |
try: | |
# Drop column | |
match = re.search(r"drop column (\w+)", instruction) | |
if match: | |
col_name = match.group(1) | |
if col_name in df.columns: | |
df = df.drop(columns=[col_name]) | |
return df, f"Dropped column '{col_name}'" | |
else: | |
return df, f"Error: Column '{col_name}' not found" | |
# Remove duplicates | |
if "remove duplicates" in instruction: | |
original_count = len(df) | |
df = df.drop_duplicates() | |
removed_count = original_count - len(df) | |
return df, f"Removed {removed_count} duplicate rows" | |
# Drop missing values | |
if "drop missing" in instruction or "remove null" in instruction: | |
original_count = len(df) | |
df = df.dropna() | |
removed_count = original_count - len(df) | |
return df, f"Removed {removed_count} rows with missing values" | |
# Fill missing values | |
match = re.search(r"fill missing.*with ([\w\.]+)", instruction) | |
if match: | |
val = match.group(1) | |
try: | |
val = float(val) | |
except: | |
pass | |
missing_count = df.isnull().sum().sum() | |
df = df.fillna(val) | |
return df, f"Filled {missing_count} missing values with '{val}'" | |
# Sort by column | |
match = re.search(r"sort by (\w+)( descending| desc)?", instruction) | |
if match: | |
col = match.group(1) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
ascending = not bool(match.group(2)) | |
df = df.sort_values(by=col, ascending=ascending) | |
order = "descending" if not ascending else "ascending" | |
return df, f"Sorted by '{col}' in {order} order" | |
# Rename column | |
match = re.search(r"rename column (\w+) to (\w+)", instruction) | |
if match: | |
old_name, new_name = match.group(1), match.group(2) | |
if old_name not in df.columns: | |
return df, f"Error: Column '{old_name}' not found" | |
df = df.rename(columns={old_name: new_name}) | |
return df, f"Renamed column '{old_name}' to '{new_name}'" | |
# Filter rows | |
match = re.search(r"filter where (\w+) > (\d+)", instruction) | |
if match: | |
col, val = match.group(1), float(match.group(2)) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
original_count = len(df) | |
df = df[df[col] > val] | |
kept_count = len(df) | |
return df, f"Filtered data: kept {kept_count} rows where {col} > {val}" | |
# Group by and sum | |
match = re.search(r"group by (\w+) and sum (\w+)", instruction) | |
if match: | |
group_col, sum_col = match.group(1), match.group(2) | |
if group_col not in df.columns: | |
return df, f"Error: Column '{group_col}' not found" | |
if sum_col not in df.columns: | |
return df, f"Error: Column '{sum_col}' not found" | |
df = df.groupby(group_col)[sum_col].sum().reset_index() | |
return df, f"Grouped by '{group_col}' and summed '{sum_col}'" | |
# Add column (sum of two columns) | |
match = re.search(r"add column (\w+) as (\w+) \+ (\w+)", instruction) | |
if match: | |
new_col, col1, col2 = match.group(1), match.group(2), match.group(3) | |
if col1 not in df.columns: | |
return df, f"Error: Column '{col1}' not found" | |
if col2 not in df.columns: | |
return df, f"Error: Column '{col2}' not found" | |
df[new_col] = df[col1] + df[col2] | |
return df, f"Added column '{new_col}' as sum of '{col1}' and '{col2}'" | |
# Normalize column | |
match = re.search(r"normalize column (\w+)", instruction) | |
if match: | |
col = match.group(1) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
if not pd.api.types.is_numeric_dtype(df[col]): | |
return df, f"Error: Column '{col}' is not numeric" | |
df[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min()) | |
return df, f"Normalized column '{col}' using min-max scaling" | |
# Standardize column | |
match = re.search(r"standardize column (\w+)", instruction) | |
if match: | |
col = match.group(1) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
if not pd.api.types.is_numeric_dtype(df[col]): | |
return df, f"Error: Column '{col}' is not numeric" | |
df[col] = (df[col] - df[col].mean()) / df[col].std() | |
return df, f"Standardized column '{col}' using z-score" | |
# Split column by comma | |
match = re.search(r"split column (\w+) by comma", instruction) | |
if match: | |
col = match.group(1) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
df[[f"{col}_1", f"{col}_2"]] = df[col].str.split(",", expand=True) | |
return df, f"Split column '{col}' by comma into '{col}_1' and '{col}_2'" | |
# Remove special characters | |
match = re.search(r"remove special characters from (\w+)", instruction) | |
if match: | |
col = match.group(1) | |
if col not in df.columns: | |
return df, f"Error: Column '{col}' not found" | |
df[col] = df[col].astype(str).str.replace(r"[^a-zA-Z0-9]", "", regex=True) | |
return df, f"Removed special characters from column '{col}'" | |
# If no instruction matched | |
return df, f"Instruction '{instruction}' not recognized" | |
except Exception as e: | |
return df, f"Error: {str(e)}" | |
# === File Processor Endpoint === | |
def process_file(): | |
try: | |
# Validate request | |
if "file" not in request.files: | |
return jsonify({"error": "No file provided"}), 400 | |
if "instruction" not in request.form: | |
return jsonify({"error": "No instruction provided"}), 400 | |
if "session_id" not in request.form: | |
return jsonify({"error": "No session_id provided"}), 400 | |
file = request.files["file"] | |
instruction = request.form["instruction"] | |
session_id = request.form["session_id"] | |
if file.filename == '': | |
return jsonify({"error": "No file selected"}), 400 | |
# Read file | |
try: | |
if file.filename.lower().endswith('.csv'): | |
df = pd.read_csv(file) | |
elif file.filename.lower().endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file) | |
else: | |
return jsonify({"error": "Unsupported file format. Use CSV or Excel files."}), 400 | |
except Exception as e: | |
return jsonify({"error": f"File reading error: {str(e)}"}), 400 | |
# Apply instruction | |
df_processed, status = apply_instruction(df, instruction) | |
# Save processed file | |
original_name = file.filename.rsplit('.', 1)[0] # Remove extension | |
filename = f"processed_{session_id}_{original_name}.csv" | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
try: | |
df_processed.to_csv(filepath, index=False) | |
except Exception as e: | |
return jsonify({"error": f"File saving error: {str(e)}"}), 500 | |
# Generate preview (first 5 rows) | |
preview = df_processed.head(5).to_dict(orient="records") | |
return jsonify({ | |
"success": True, | |
"message": status, | |
"preview": preview, | |
"download_url": f"/download/{filename}", | |
"original_rows": len(df), | |
"processed_rows": len(df_processed), | |
"columns": list(df_processed.columns), | |
"filename": filename | |
}) | |
except Exception as e: | |
return jsonify({"error": f"Processing error: {str(e)}"}), 500 | |
# === File Download with Session ID Verification === | |
def download_file(filename): | |
try: | |
session_id = request.args.get("session_id") | |
# Validate session | |
if not session_id: | |
return jsonify({"error": "session_id parameter required"}), 400 | |
if f"_{session_id}_" not in filename: | |
return jsonify({"error": "Invalid session or unauthorized access"}), 403 | |
# Check file exists | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
if not os.path.exists(filepath): | |
return jsonify({"error": "File not found or expired"}), 404 | |
return send_file(filepath, as_attachment=True, download_name=filename) | |
except Exception as e: | |
return jsonify({"error": f"Download error: {str(e)}"}), 500 | |
# === Error Handlers === | |
def not_found(error): | |
return jsonify({"error": "Endpoint not found"}), 404 | |
def too_large(error): | |
return jsonify({"error": "File too large (max 512MB)"}), 413 | |
def internal_error(error): | |
return jsonify({"error": "Internal server error"}), 500 | |
# === Run on Port 7860 for Hugging Face === | |
if __name__ == "__main__": | |
print("π Starting Data Processing API on port 7860...") | |
print("π API Endpoints:") | |
print(" POST /process - Process files") | |
print(" GET /download/<filename> - Download processed files") | |
print(" GET /health - Health check") | |
app.run(host="0.0.0.0", port=7860, debug=False) |