|
import os |
|
import urllib.parse |
|
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify |
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user |
|
from werkzeug.security import check_password_hash, generate_password_hash |
|
from dotenv import load_dotenv |
|
from converter import converter |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.environ.get('SECRET_KEY', 'starsky_secret_key') |
|
|
|
|
|
login_manager = LoginManager() |
|
login_manager.init_app(app) |
|
login_manager.login_view = 'login' |
|
|
|
|
|
class User(UserMixin): |
|
def __init__(self, id, username, password_hash): |
|
self.id = id |
|
self.username = username |
|
self.password_hash = password_hash |
|
|
|
|
|
default_username = os.environ.get('ADMIN_USERNAME', 'admin') |
|
default_password = os.environ.get('ADMIN_PASSWORD', 'admin123') |
|
|
|
|
|
users = { |
|
default_username: User( |
|
default_username, |
|
default_username, |
|
generate_password_hash(default_password) |
|
) |
|
} |
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
return users.get(user_id) |
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
password = request.form.get('password') |
|
|
|
user = users.get(username) |
|
if user and check_password_hash(user.password_hash, password): |
|
login_user(user) |
|
return redirect(url_for('index')) |
|
else: |
|
flash('用户名或密码错误') |
|
|
|
return render_template('login.html') |
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
logout_user() |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/') |
|
@login_required |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/convert', methods=['POST']) |
|
@login_required |
|
def convert(): |
|
|
|
backend_url = request.form.get('backend_url', 'https://raw.githubusercontent.com/yuanwangokk-1/subscribe/refs/heads/main/ACL4SSR/ACL4SSR.ini') |
|
target = request.form.get('target', 'clash') |
|
original_url = request.form.get('original_url', '') |
|
|
|
if not original_url: |
|
return jsonify({"status": "error", "message": "订阅链接不能为空"}) |
|
|
|
|
|
try: |
|
|
|
result = converter.convert(original_url, target, backend_url) |
|
|
|
|
|
if result.get('status') == 'error': |
|
return jsonify(result) |
|
|
|
|
|
|
|
if isinstance(result.get('result'), str): |
|
|
|
|
|
response_data = { |
|
"status": "success", |
|
"message": "转换成功", |
|
"result": f"/download?target={target}&token={_generate_download_token(original_url)}" |
|
} |
|
|
|
if not hasattr(current_user, 'conversion_results'): |
|
current_user.conversion_results = {} |
|
current_user.conversion_results[_generate_download_token(original_url)] = { |
|
'content': result.get('result'), |
|
'target': target |
|
} |
|
return jsonify(response_data) |
|
|
|
|
|
return jsonify(result) |
|
|
|
except Exception as e: |
|
return jsonify({"status": "error", "message": f"处理失败: {str(e)}"}) |
|
|
|
def _generate_download_token(url): |
|
"""生成下载令牌,简单实现""" |
|
import hashlib |
|
import time |
|
token = hashlib.md5(f"{url}:{time.time()}".encode()).hexdigest() |
|
return token |
|
|
|
@app.route('/download') |
|
@login_required |
|
def download(): |
|
target = request.args.get('target', 'clash') |
|
token = request.args.get('token', '') |
|
|
|
|
|
if not hasattr(current_user, 'conversion_results') or token not in current_user.conversion_results: |
|
return "配置文件不存在或已过期", 404 |
|
|
|
result = current_user.conversion_results[token] |
|
content = result['content'] |
|
|
|
|
|
filename = f"config_{target}.yaml" if target == 'clash' else f"config_{target}.txt" |
|
mimetype = 'application/x-yaml' if target == 'clash' else 'text/plain' |
|
|
|
|
|
from flask import Response |
|
response = Response(content, mimetype=mimetype) |
|
response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' |
|
return response |
|
|
|
|
|
|
|
|
|
if __name__ == '__main__' and os.environ.get('DEVELOPMENT') == 'true': |
|
app.run(host='0.0.0.0', port=7860, debug=True) |
|
|