File size: 5,210 Bytes
3d2357d 4f5d5ba 3d2357d 4f5d5ba 3d2357d 4f5d5ba 3d2357d 4f5d5ba 3d2357d 4f5d5ba 3d2357d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import os
import urllib.parse
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
from werkzeug.security import check_password_hash, generate_password_hash
from dotenv import load_dotenv
from converter import converter # 导入自定义转换器
# 加载环境变量
load_dotenv()
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'starsky_secret_key')
# 登录管理配置
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# 用户类
class User(UserMixin):
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
# 默认用户
default_username = os.environ.get('ADMIN_USERNAME', 'admin')
default_password = os.environ.get('ADMIN_PASSWORD', 'admin123')
# 用户存储
users = {
default_username: User(
default_username,
default_username,
generate_password_hash(default_password)
)
}
@login_manager.user_loader
def load_user(user_id):
return users.get(user_id)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = users.get(username)
if user and check_password_hash(user.password_hash, password):
login_user(user)
return redirect(url_for('index'))
else:
flash('用户名或密码错误')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/')
@login_required
def index():
return render_template('index.html')
@app.route('/convert', methods=['POST'])
@login_required
def convert():
# 获取参数
backend_url = request.form.get('backend_url', 'https://raw.githubusercontent.com/yuanwangokk-1/subscribe/refs/heads/main/ACL4SSR/ACL4SSR.ini')
target = request.form.get('target', 'clash')
original_url = request.form.get('original_url', '')
if not original_url:
return jsonify({"status": "error", "message": "订阅链接不能为空"})
# 使用自定义转换器处理(处理整个转换逻辑)
try:
# 使用本地转换器进行转换
result = converter.convert(original_url, target, backend_url)
# 如果转换失败
if result.get('status') == 'error':
return jsonify(result)
# 如果返回的是字符串,说明是配置内容,我们需要将它放入临时文件并提供下载链接
# 但是在HF Space中,我们可以直接通过API返回内容
if isinstance(result.get('result'), str):
# 这里我们直接返回配置文件内容的下载链接
# 实际部署时,我们会添加一个下载路由
response_data = {
"status": "success",
"message": "转换成功",
"result": f"/download?target={target}&token={_generate_download_token(original_url)}"
}
# 保存转换结果到会话中,用于下载
if not hasattr(current_user, 'conversion_results'):
current_user.conversion_results = {}
current_user.conversion_results[_generate_download_token(original_url)] = {
'content': result.get('result'),
'target': target
}
return jsonify(response_data)
# 否则返回结果
return jsonify(result)
except Exception as e:
return jsonify({"status": "error", "message": f"处理失败: {str(e)}"})
def _generate_download_token(url):
"""生成下载令牌,简单实现"""
import hashlib
import time
token = hashlib.md5(f"{url}:{time.time()}".encode()).hexdigest()
return token
@app.route('/download')
@login_required
def download():
target = request.args.get('target', 'clash')
token = request.args.get('token', '')
# 获取保存的转换结果
if not hasattr(current_user, 'conversion_results') or token not in current_user.conversion_results:
return "配置文件不存在或已过期", 404
result = current_user.conversion_results[token]
content = result['content']
# 根据目标类型设置合适的文件名和MIME类型
filename = f"config_{target}.yaml" if target == 'clash' else f"config_{target}.txt"
mimetype = 'application/x-yaml' if target == 'clash' else 'text/plain'
# 返回文件下载
from flask import Response
response = Response(content, mimetype=mimetype)
response.headers['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
# 不要在这里调用 app.run()
# 让 Hugging Face 的 WSGI 服务器来运行应用
# 仅在本地开发时使用以下代码
if __name__ == '__main__' and os.environ.get('DEVELOPMENT') == 'true':
app.run(host='0.0.0.0', port=7860, debug=True)
|