|
from flask import Flask, request, jsonify, send_file |
|
from flask_cors import CORS |
|
from werkzeug.utils import secure_filename |
|
import tempfile |
|
import uuid |
|
import os |
|
import io |
|
import base64 |
|
import time |
|
import json |
|
import hashlib |
|
import qrcode |
|
from PIL import Image |
|
import requests |
|
import random |
|
import string |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
|
|
SECRETS = {} |
|
SHORT_LINKS = {} |
|
ANALYTICS = {} |
|
|
|
|
|
MAX_FILE_SIZE = 5 * 1024 * 1024 |
|
ALLOWED_EXTENSIONS = { |
|
'image': ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg'], |
|
'video': ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm'], |
|
'audio': ['mp3', 'wav', 'ogg', 'aac', 'm4a'], |
|
'document': ['pdf', 'txt', 'doc', 'docx', 'rtf', 'odt'] |
|
} |
|
|
|
def get_file_type(filename): |
|
"""Determine file type based on extension""" |
|
if not filename: |
|
return 'unknown' |
|
|
|
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' |
|
|
|
for file_type, extensions in ALLOWED_EXTENSIONS.items(): |
|
if ext in extensions: |
|
return file_type |
|
|
|
return 'unknown' |
|
|
|
def generate_short_id(): |
|
"""Generate a short, unique ID""" |
|
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) |
|
|
|
def get_client_ip(request): |
|
"""Get client IP address""" |
|
if request.headers.get('X-Forwarded-For'): |
|
return request.headers.get('X-Forwarded-For').split(',')[0].strip() |
|
elif request.headers.get('X-Real-IP'): |
|
return request.headers.get('X-Real-IP') |
|
else: |
|
return request.remote_addr |
|
|
|
def get_location_info(ip): |
|
"""Get location information from IP (mock implementation)""" |
|
|
|
try: |
|
|
|
if ip == '127.0.0.1' or ip.startswith('192.168.'): |
|
return { |
|
'country': 'Local', |
|
'city': 'Local', |
|
'region': 'Local', |
|
'timezone': 'Local' |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return { |
|
'country': 'Unknown', |
|
'city': 'Unknown', |
|
'region': 'Unknown', |
|
'timezone': 'Unknown' |
|
} |
|
except: |
|
return { |
|
'country': 'Unknown', |
|
'city': 'Unknown', |
|
'region': 'Unknown', |
|
'timezone': 'Unknown' |
|
} |
|
|
|
def generate_qr_code(data): |
|
"""Generate QR code for the given data""" |
|
qr = qrcode.QRCode( |
|
version=1, |
|
error_correction=qrcode.constants.ERROR_CORRECT_L, |
|
box_size=10, |
|
border=4, |
|
) |
|
qr.add_data(data) |
|
qr.make(fit=True) |
|
|
|
img = qr.make_image(fill_color="black", back_color="white") |
|
|
|
|
|
buffer = io.BytesIO() |
|
img.save(buffer, format='PNG') |
|
img_str = base64.b64encode(buffer.getvalue()).decode() |
|
|
|
return f"data:image/png;base64,{img_str}" |
|
|
|
def record_access(secret_id, request): |
|
"""Record access analytics""" |
|
ip = get_client_ip(request) |
|
location = get_location_info(ip) |
|
user_agent = request.headers.get('User-Agent', '') |
|
|
|
|
|
device_type = 'desktop' |
|
if any(mobile in user_agent.lower() for mobile in ['mobile', 'android', 'iphone', 'ipad']): |
|
device_type = 'mobile' |
|
|
|
analytics_entry = { |
|
'timestamp': time.time(), |
|
'ip': ip, |
|
'location': location, |
|
'user_agent': user_agent, |
|
'device_type': device_type, |
|
'referer': request.headers.get('Referer', ''), |
|
'accept_language': request.headers.get('Accept-Language', '') |
|
} |
|
|
|
if secret_id not in ANALYTICS: |
|
ANALYTICS[secret_id] = [] |
|
|
|
ANALYTICS[secret_id].append(analytics_entry) |
|
|
|
return analytics_entry |
|
|
|
@app.route("/") |
|
def index(): |
|
"""Health check endpoint""" |
|
return jsonify({ |
|
"status": "running", |
|
"service": "Sharelock Backend", |
|
"version": "2.0.0", |
|
"features": [ |
|
"End-to-end encryption", |
|
"File uploads (5MB max)", |
|
"QR code generation", |
|
"Analytics tracking", |
|
"Short URLs", |
|
"Self-destruct messages" |
|
] |
|
}) |
|
|
|
@app.route("/api/store", methods=["POST"]) |
|
def store(): |
|
"""Store encrypted secret with enhanced features""" |
|
try: |
|
form = request.form |
|
data = form.get("data") |
|
|
|
if not data: |
|
return jsonify({"error": "Data is required"}), 400 |
|
|
|
|
|
ttl = int(form.get("ttl", 300)) |
|
view_once = form.get("view_once", "false").lower() == "true" |
|
delay_seconds = int(form.get("delay_seconds", 0)) |
|
theme = form.get("theme", "default") |
|
password_hint = form.get("password_hint", "") |
|
|
|
|
|
file_data = None |
|
file_type = None |
|
file_name = None |
|
|
|
if 'file' in request.files: |
|
file = request.files['file'] |
|
if file and file.filename: |
|
|
|
file.seek(0, os.SEEK_END) |
|
file_size = file.tell() |
|
file.seek(0) |
|
|
|
if file_size > MAX_FILE_SIZE: |
|
return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE/1024/1024:.1f}MB"}), 400 |
|
|
|
|
|
file_name = secure_filename(file.filename) |
|
file_type = get_file_type(file_name) |
|
|
|
if file_type == 'unknown': |
|
return jsonify({"error": "File type not supported"}), 400 |
|
|
|
|
|
file_content = file.read() |
|
file_data = base64.b64encode(file_content).decode('utf-8') |
|
|
|
|
|
secret_id = str(uuid.uuid4()) |
|
short_id = generate_short_id() |
|
|
|
|
|
while short_id in SHORT_LINKS: |
|
short_id = generate_short_id() |
|
|
|
|
|
SECRETS[secret_id] = { |
|
"data": data, |
|
"file_data": file_data, |
|
"file_type": file_type, |
|
"file_name": file_name, |
|
"expire_at": time.time() + ttl, |
|
"view_once": view_once, |
|
"delay_seconds": delay_seconds, |
|
"theme": theme, |
|
"password_hint": password_hint, |
|
"created_at": time.time(), |
|
"creator_ip": get_client_ip(request), |
|
"access_count": 0 |
|
} |
|
|
|
|
|
SHORT_LINKS[short_id] = secret_id |
|
|
|
|
|
base_url = request.host_url.rstrip('/') |
|
secret_url = f"{base_url}/tools/sharelock?id={secret_id}" |
|
qr_code = generate_qr_code(secret_url) |
|
|
|
return jsonify({ |
|
"id": secret_id, |
|
"short_id": short_id, |
|
"short_url": f"{base_url}/s/{short_id}", |
|
"qr_code": qr_code, |
|
"expires_at": SECRETS[secret_id]["expire_at"], |
|
"has_file": file_data is not None |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/fetch/<secret_id>") |
|
def fetch(secret_id): |
|
"""Fetch and decrypt secret with analytics - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
if secret_id in SHORT_LINKS: |
|
secret_id = SHORT_LINKS[secret_id] |
|
|
|
secret = SECRETS.get(secret_id) |
|
if not secret: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if time.time() > secret["expire_at"]: |
|
|
|
if secret_id in SECRETS: |
|
del SECRETS[secret_id] |
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
return jsonify({"error": "Secret has expired"}), 410 |
|
|
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
|
|
if not verify_only: |
|
|
|
analytics_entry = record_access(secret_id, request) |
|
|
|
|
|
secret["access_count"] += 1 |
|
|
|
|
|
response = { |
|
"data": secret["data"], |
|
"theme": secret.get("theme", "default"), |
|
"delay_seconds": secret.get("delay_seconds", 0), |
|
"password_hint": secret.get("password_hint", ""), |
|
"access_count": secret["access_count"] |
|
} |
|
|
|
|
|
if secret.get("file_data"): |
|
response["file_data"] = secret["file_data"] |
|
response["file_type"] = secret.get("file_type", "unknown") |
|
response["file_name"] = secret.get("file_name", "unknown") |
|
|
|
|
|
if secret["view_once"] and not verify_only: |
|
|
|
del SECRETS[secret_id] |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify(response) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/analytics/<secret_id>") |
|
def get_analytics(secret_id): |
|
"""Get analytics for a specific secret - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
|
|
if secret_id not in SECRETS and secret_id not in ANALYTICS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if not verify_only: |
|
|
|
record_access(secret_id, request) |
|
|
|
analytics_data = ANALYTICS.get(secret_id, []) |
|
|
|
|
|
formatted_analytics = [] |
|
for entry in analytics_data: |
|
formatted_analytics.append({ |
|
"timestamp": entry["timestamp"], |
|
"datetime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(entry["timestamp"])), |
|
"ip": entry["ip"], |
|
"location": entry["location"], |
|
"device_type": entry["device_type"], |
|
"user_agent": entry["user_agent"][:100] + "..." if len(entry["user_agent"]) > 100 else entry["user_agent"] |
|
}) |
|
|
|
return jsonify({ |
|
"secret_id": secret_id, |
|
"total_accesses": len(formatted_analytics), |
|
"analytics": formatted_analytics |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/secrets") |
|
def list_secrets(): |
|
"""List all active secrets (for dashboard)""" |
|
try: |
|
current_time = time.time() |
|
active_secrets = [] |
|
|
|
for secret_id, secret in SECRETS.items(): |
|
if current_time <= secret["expire_at"]: |
|
|
|
short_id = None |
|
for s_id, full_id in SHORT_LINKS.items(): |
|
if full_id == secret_id: |
|
short_id = s_id |
|
break |
|
|
|
active_secrets.append({ |
|
"id": secret_id, |
|
"short_id": short_id, |
|
"created_at": secret["created_at"], |
|
"expires_at": secret["expire_at"], |
|
"view_once": secret["view_once"], |
|
"has_file": secret.get("file_data") is not None, |
|
"file_type": secret.get("file_type"), |
|
"theme": secret.get("theme", "default"), |
|
"access_count": secret.get("access_count", 0), |
|
"preview": secret["data"][:100] + "..." if len(secret["data"]) > 100 else secret["data"] |
|
}) |
|
|
|
return jsonify({ |
|
"secrets": active_secrets, |
|
"total": len(active_secrets) |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/delete/<secret_id>", methods=["DELETE"]) |
|
def delete_secret(secret_id): |
|
"""Manually delete a secret - MODIFIED TO HANDLE verify_only""" |
|
try: |
|
|
|
verify_only = request.args.get('verify_only', 'false').lower() == 'true' |
|
|
|
if secret_id not in SECRETS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
|
|
if not verify_only: |
|
|
|
record_access(secret_id, request) |
|
|
|
|
|
del SECRETS[secret_id] |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify({"message": "Secret deleted successfully"}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/s/<short_id>") |
|
def redirect_short_link(short_id): |
|
"""Redirect short link to full URL""" |
|
if short_id not in SHORT_LINKS: |
|
return jsonify({"error": "Short link not found"}), 404 |
|
|
|
secret_id = SHORT_LINKS[short_id] |
|
base_url = request.host_url.rstrip('/') |
|
return f""" |
|
<!DOCTYPE html> |
|
<html> |
|
<head> |
|
<title>Sharelock - Redirecting...</title> |
|
<meta http-equiv="refresh" content="0;url={base_url}/tools/sharelock?id={secret_id}"> |
|
</head> |
|
<body> |
|
<p>Redirecting to secure message...</p> |
|
<p>If you are not redirected automatically, <a href="{base_url}/tools/sharelock?id={secret_id}">click here</a>.</p> |
|
</body> |
|
</html> |
|
""" |
|
|
|
@app.route("/api/qr/<secret_id>") |
|
def get_qr_code(secret_id): |
|
"""Generate QR code for a secret""" |
|
try: |
|
if secret_id not in SECRETS: |
|
return jsonify({"error": "Secret not found"}), 404 |
|
|
|
base_url = request.host_url.rstrip('/') |
|
secret_url = f"{base_url}/tools/sharelock?id={secret_id}" |
|
qr_code = generate_qr_code(secret_url) |
|
|
|
return jsonify({"qr_code": qr_code}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/stats") |
|
def get_stats(): |
|
"""Get overall statistics""" |
|
try: |
|
total_secrets = len(SECRETS) |
|
total_accesses = sum(len(analytics) for analytics in ANALYTICS.values()) |
|
|
|
|
|
file_types = {} |
|
for secret in SECRETS.values(): |
|
file_type = secret.get("file_type", "text") |
|
file_types[file_type] = file_types.get(file_type, 0) + 1 |
|
|
|
|
|
themes = {} |
|
for secret in SECRETS.values(): |
|
theme = secret.get("theme", "default") |
|
themes[theme] = themes.get(theme, 0) + 1 |
|
|
|
return jsonify({ |
|
"total_secrets": total_secrets, |
|
"total_accesses": total_accesses, |
|
"file_types": file_types, |
|
"themes": themes, |
|
"active_short_links": len(SHORT_LINKS) |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
@app.route("/api/cleanup", methods=["POST"]) |
|
def cleanup_expired(): |
|
"""Clean up expired secrets""" |
|
try: |
|
current_time = time.time() |
|
expired_count = 0 |
|
|
|
|
|
expired_secrets = [] |
|
for secret_id, secret in SECRETS.items(): |
|
if current_time > secret["expire_at"]: |
|
expired_secrets.append(secret_id) |
|
|
|
|
|
for secret_id in expired_secrets: |
|
del SECRETS[secret_id] |
|
expired_count += 1 |
|
|
|
|
|
for short_id, full_id in list(SHORT_LINKS.items()): |
|
if full_id == secret_id: |
|
del SHORT_LINKS[short_id] |
|
break |
|
|
|
return jsonify({ |
|
"message": f"Cleaned up {expired_count} expired secrets", |
|
"expired_count": expired_count |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.errorhandler(404) |
|
def not_found(error): |
|
return jsonify({"error": "Endpoint not found"}), 404 |
|
|
|
@app.errorhandler(500) |
|
def internal_error(error): |
|
return jsonify({"error": "Internal server error"}), 500 |
|
|
|
if __name__ == "__main__": |
|
print("π Sharelock Backend Starting...") |
|
print("π Features enabled:") |
|
print(" β
End-to-end encryption") |
|
print(" β
File uploads (5MB max)") |
|
print(" β
QR code generation") |
|
print(" β
Analytics tracking") |
|
print(" β
Short URLs") |
|
print(" β
Self-destruct messages") |
|
print(" β
Multiple themes") |
|
print(" β
Password hints") |
|
print(" β
verify_only parameter support") |
|
print("π Server running on http://0.0.0.0:7860") |
|
|
|
app.run(host="0.0.0.0", port=7860, debug=True) |