Sharelock / app.py
mike23415's picture
Update app.py
731c9bc verified
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
import tempfile
import uuid
import os
import io
import base64
import time
import json
import hashlib
import qrcode
from PIL import Image
import requests
import random
import string
app = Flask(__name__)
CORS(app)
# In-memory storage
SECRETS = {} # { id: { data, file_data, file_type, expire_at, view_once, theme, analytics, etc. } }
SHORT_LINKS = {} # { short_id: full_id }
ANALYTICS = {} # { secret_id: [analytics_entries] }
# Configuration
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
ALLOWED_EXTENSIONS = {
'image': ['png', 'jpg', 'jpeg', 'gif', 'webp', 'svg'],
'video': ['mp4', 'avi', 'mov', 'wmv', 'flv', 'webm'],
'audio': ['mp3', 'wav', 'ogg', 'aac', 'm4a'],
'document': ['pdf', 'txt', 'doc', 'docx', 'rtf', 'odt']
}
def get_file_type(filename):
"""Determine file type based on extension"""
if not filename:
return 'unknown'
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
for file_type, extensions in ALLOWED_EXTENSIONS.items():
if ext in extensions:
return file_type
return 'unknown'
def generate_short_id():
"""Generate a short, unique ID"""
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))
def get_client_ip(request):
"""Get client IP address"""
if request.headers.get('X-Forwarded-For'):
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
elif request.headers.get('X-Real-IP'):
return request.headers.get('X-Real-IP')
else:
return request.remote_addr
def get_location_info(ip):
"""Get location information from IP (mock implementation)"""
# In production, use a real geolocation service like ipapi.co, ipstack.com, etc.
try:
# Mock data - replace with real API call
if ip == '127.0.0.1' or ip.startswith('192.168.'):
return {
'country': 'Local',
'city': 'Local',
'region': 'Local',
'timezone': 'Local'
}
# Example with ipapi.co (uncomment for production)
# response = requests.get(f'https://ipapi.co/{ip}/json/', timeout=5)
# if response.status_code == 200:
# data = response.json()
# return {
# 'country': data.get('country_name', 'Unknown'),
# 'city': data.get('city', 'Unknown'),
# 'region': data.get('region', 'Unknown'),
# 'timezone': data.get('timezone', 'Unknown')
# }
return {
'country': 'Unknown',
'city': 'Unknown',
'region': 'Unknown',
'timezone': 'Unknown'
}
except:
return {
'country': 'Unknown',
'city': 'Unknown',
'region': 'Unknown',
'timezone': 'Unknown'
}
def generate_qr_code(data):
"""Generate QR code for the given data"""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64
buffer = io.BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def record_access(secret_id, request):
"""Record access analytics"""
ip = get_client_ip(request)
location = get_location_info(ip)
user_agent = request.headers.get('User-Agent', '')
# Determine device type
device_type = 'desktop'
if any(mobile in user_agent.lower() for mobile in ['mobile', 'android', 'iphone', 'ipad']):
device_type = 'mobile'
analytics_entry = {
'timestamp': time.time(),
'ip': ip,
'location': location,
'user_agent': user_agent,
'device_type': device_type,
'referer': request.headers.get('Referer', ''),
'accept_language': request.headers.get('Accept-Language', '')
}
if secret_id not in ANALYTICS:
ANALYTICS[secret_id] = []
ANALYTICS[secret_id].append(analytics_entry)
return analytics_entry
@app.route("/")
def index():
"""Health check endpoint"""
return jsonify({
"status": "running",
"service": "Sharelock Backend",
"version": "2.0.0",
"features": [
"End-to-end encryption",
"File uploads (5MB max)",
"QR code generation",
"Analytics tracking",
"Short URLs",
"Self-destruct messages"
]
})
@app.route("/api/store", methods=["POST"])
def store():
"""Store encrypted secret with enhanced features"""
try:
form = request.form
data = form.get("data")
if not data:
return jsonify({"error": "Data is required"}), 400
# Parse parameters
ttl = int(form.get("ttl", 300))
view_once = form.get("view_once", "false").lower() == "true"
delay_seconds = int(form.get("delay_seconds", 0))
theme = form.get("theme", "default")
password_hint = form.get("password_hint", "")
# Handle file upload
file_data = None
file_type = None
file_name = None
if 'file' in request.files:
file = request.files['file']
if file and file.filename:
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
return jsonify({"error": f"File too large. Max size: {MAX_FILE_SIZE/1024/1024:.1f}MB"}), 400
# Process file
file_name = secure_filename(file.filename)
file_type = get_file_type(file_name)
if file_type == 'unknown':
return jsonify({"error": "File type not supported"}), 400
# Read and encode file
file_content = file.read()
file_data = base64.b64encode(file_content).decode('utf-8')
# Generate IDs
secret_id = str(uuid.uuid4())
short_id = generate_short_id()
# Ensure short_id is unique
while short_id in SHORT_LINKS:
short_id = generate_short_id()
# Store secret
SECRETS[secret_id] = {
"data": data,
"file_data": file_data,
"file_type": file_type,
"file_name": file_name,
"expire_at": time.time() + ttl,
"view_once": view_once,
"delay_seconds": delay_seconds,
"theme": theme,
"password_hint": password_hint,
"created_at": time.time(),
"creator_ip": get_client_ip(request),
"access_count": 0
}
# Store short link mapping
SHORT_LINKS[short_id] = secret_id
# Generate QR code
base_url = request.host_url.rstrip('/')
secret_url = f"{base_url}/tools/sharelock?id={secret_id}"
qr_code = generate_qr_code(secret_url)
return jsonify({
"id": secret_id,
"short_id": short_id,
"short_url": f"{base_url}/s/{short_id}",
"qr_code": qr_code,
"expires_at": SECRETS[secret_id]["expire_at"],
"has_file": file_data is not None
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/fetch/<secret_id>")
def fetch(secret_id):
"""Fetch and decrypt secret with analytics - MODIFIED TO HANDLE verify_only"""
try:
# Check if it's a short link
if secret_id in SHORT_LINKS:
secret_id = SHORT_LINKS[secret_id]
secret = SECRETS.get(secret_id)
if not secret:
return jsonify({"error": "Secret not found"}), 404
# Check expiration
if time.time() > secret["expire_at"]:
# Clean up expired secret
if secret_id in SECRETS:
del SECRETS[secret_id]
# Clean up short link
for short_id, full_id in list(SHORT_LINKS.items()):
if full_id == secret_id:
del SHORT_LINKS[short_id]
return jsonify({"error": "Secret has expired"}), 410
# CHECK FOR verify_only PARAMETER
verify_only = request.args.get('verify_only', 'false').lower() == 'true'
# Only record access and increment count if NOT verify_only
if not verify_only:
# Record access analytics
analytics_entry = record_access(secret_id, request)
# Increment access count
secret["access_count"] += 1
# Prepare response
response = {
"data": secret["data"],
"theme": secret.get("theme", "default"),
"delay_seconds": secret.get("delay_seconds", 0),
"password_hint": secret.get("password_hint", ""),
"access_count": secret["access_count"]
}
# Include file data if present
if secret.get("file_data"):
response["file_data"] = secret["file_data"]
response["file_type"] = secret.get("file_type", "unknown")
response["file_name"] = secret.get("file_name", "unknown")
# Handle view-once deletion (only if not verify_only)
if secret["view_once"] and not verify_only:
# Delete the secret
del SECRETS[secret_id]
# Clean up short link
for short_id, full_id in list(SHORT_LINKS.items()):
if full_id == secret_id:
del SHORT_LINKS[short_id]
break
return jsonify(response)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/analytics/<secret_id>")
def get_analytics(secret_id):
"""Get analytics for a specific secret - MODIFIED TO HANDLE verify_only"""
try:
# CHECK FOR verify_only PARAMETER
verify_only = request.args.get('verify_only', 'false').lower() == 'true'
# Verify secret exists or existed
if secret_id not in SECRETS and secret_id not in ANALYTICS:
return jsonify({"error": "Secret not found"}), 404
# Only record access if NOT verify_only
if not verify_only:
# Record access analytics for the analytics request itself
record_access(secret_id, request)
analytics_data = ANALYTICS.get(secret_id, [])
# Format analytics for frontend
formatted_analytics = []
for entry in analytics_data:
formatted_analytics.append({
"timestamp": entry["timestamp"],
"datetime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(entry["timestamp"])),
"ip": entry["ip"],
"location": entry["location"],
"device_type": entry["device_type"],
"user_agent": entry["user_agent"][:100] + "..." if len(entry["user_agent"]) > 100 else entry["user_agent"]
})
return jsonify({
"secret_id": secret_id,
"total_accesses": len(formatted_analytics),
"analytics": formatted_analytics
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/secrets")
def list_secrets():
"""List all active secrets (for dashboard)"""
try:
current_time = time.time()
active_secrets = []
for secret_id, secret in SECRETS.items():
if current_time <= secret["expire_at"]:
# Find short link
short_id = None
for s_id, full_id in SHORT_LINKS.items():
if full_id == secret_id:
short_id = s_id
break
active_secrets.append({
"id": secret_id,
"short_id": short_id,
"created_at": secret["created_at"],
"expires_at": secret["expire_at"],
"view_once": secret["view_once"],
"has_file": secret.get("file_data") is not None,
"file_type": secret.get("file_type"),
"theme": secret.get("theme", "default"),
"access_count": secret.get("access_count", 0),
"preview": secret["data"][:100] + "..." if len(secret["data"]) > 100 else secret["data"]
})
return jsonify({
"secrets": active_secrets,
"total": len(active_secrets)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/delete/<secret_id>", methods=["DELETE"])
def delete_secret(secret_id):
"""Manually delete a secret - MODIFIED TO HANDLE verify_only"""
try:
# CHECK FOR verify_only PARAMETER
verify_only = request.args.get('verify_only', 'false').lower() == 'true'
if secret_id not in SECRETS:
return jsonify({"error": "Secret not found"}), 404
# Only record access if NOT verify_only
if not verify_only:
# Record access analytics for the delete request
record_access(secret_id, request)
# Delete secret
del SECRETS[secret_id]
# Clean up short link
for short_id, full_id in list(SHORT_LINKS.items()):
if full_id == secret_id:
del SHORT_LINKS[short_id]
break
return jsonify({"message": "Secret deleted successfully"})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/s/<short_id>")
def redirect_short_link(short_id):
"""Redirect short link to full URL"""
if short_id not in SHORT_LINKS:
return jsonify({"error": "Short link not found"}), 404
secret_id = SHORT_LINKS[short_id]
base_url = request.host_url.rstrip('/')
return f"""
<!DOCTYPE html>
<html>
<head>
<title>Sharelock - Redirecting...</title>
<meta http-equiv="refresh" content="0;url={base_url}/tools/sharelock?id={secret_id}">
</head>
<body>
<p>Redirecting to secure message...</p>
<p>If you are not redirected automatically, <a href="{base_url}/tools/sharelock?id={secret_id}">click here</a>.</p>
</body>
</html>
"""
@app.route("/api/qr/<secret_id>")
def get_qr_code(secret_id):
"""Generate QR code for a secret"""
try:
if secret_id not in SECRETS:
return jsonify({"error": "Secret not found"}), 404
base_url = request.host_url.rstrip('/')
secret_url = f"{base_url}/tools/sharelock?id={secret_id}"
qr_code = generate_qr_code(secret_url)
return jsonify({"qr_code": qr_code})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/stats")
def get_stats():
"""Get overall statistics"""
try:
total_secrets = len(SECRETS)
total_accesses = sum(len(analytics) for analytics in ANALYTICS.values())
# Count by file type
file_types = {}
for secret in SECRETS.values():
file_type = secret.get("file_type", "text")
file_types[file_type] = file_types.get(file_type, 0) + 1
# Count by theme
themes = {}
for secret in SECRETS.values():
theme = secret.get("theme", "default")
themes[theme] = themes.get(theme, 0) + 1
return jsonify({
"total_secrets": total_secrets,
"total_accesses": total_accesses,
"file_types": file_types,
"themes": themes,
"active_short_links": len(SHORT_LINKS)
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/cleanup", methods=["POST"])
def cleanup_expired():
"""Clean up expired secrets"""
try:
current_time = time.time()
expired_count = 0
# Find expired secrets
expired_secrets = []
for secret_id, secret in SECRETS.items():
if current_time > secret["expire_at"]:
expired_secrets.append(secret_id)
# Delete expired secrets
for secret_id in expired_secrets:
del SECRETS[secret_id]
expired_count += 1
# Clean up short links
for short_id, full_id in list(SHORT_LINKS.items()):
if full_id == secret_id:
del SHORT_LINKS[short_id]
break
return jsonify({
"message": f"Cleaned up {expired_count} expired secrets",
"expired_count": expired_count
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Internal server error"}), 500
if __name__ == "__main__":
print("πŸ” Sharelock Backend Starting...")
print("πŸ“Š Features enabled:")
print(" βœ… End-to-end encryption")
print(" βœ… File uploads (5MB max)")
print(" βœ… QR code generation")
print(" βœ… Analytics tracking")
print(" βœ… Short URLs")
print(" βœ… Self-destruct messages")
print(" βœ… Multiple themes")
print(" βœ… Password hints")
print(" βœ… verify_only parameter support")
print("πŸš€ Server running on http://0.0.0.0:7860")
app.run(host="0.0.0.0", port=7860, debug=True)