Spaces:
Sleeping
Sleeping
File size: 5,859 Bytes
31d1f71 7d040bf 31d1f71 0582fea 31d1f71 7d040bf 31d1f71 7d040bf 0582fea 7d040bf 0582fea 7d040bf 31d1f71 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 31d1f71 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea 7d040bf 0582fea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import streamlit as st
import pyaria2
import time
import threading
import os
from pathlib import Path
# Aria2 RPC 配置
ARIA2_RPC_URL = "http://localhost:6800/rpc"
ARIA2_SECRET = "SECRET_KEY" # 建议修改为随机字符串
# 初始化 aria2 连接
def init_aria2():
try:
aria2 = pyaria2.Aria2(ARIA2_RPC_URL, secret=ARIA2_SECRET)
aria2.getVersion() # 测试连接
return aria2
except:
# 自动启动 aria2 进程(后台模式)
aria2_dir = os.path.expanduser("~/.aria2")
os.makedirs(aria2_dir, exist_ok=True)
cmd = f"aria2c --enable-rpc --rpc-listen-all=true --rpc-secret={ARIA2_SECRET} "
cmd += f"--dir={os.path.abspath('./downloads')} --daemon=true "
cmd += "--max-concurrent-downloads=3 --max-connection-per-server=16 "
cmd += "--split=16 --min-split-size=1M --file-allocation=none"
threading.Thread(target=lambda: os.system(cmd)).start()
time.sleep(2) # 等待服务启动
return pyaria2.Aria2(ARIA2_RPC_URL, secret=ARIA2_SECRET)
# Session 状态初始化
if 'download_progress' not in st.session_state:
st.session_state.download_progress = 0
if 'downloading' not in st.session_state:
st.session_state.downloading = False
if 'download_complete' not in st.session_state:
st.session_state.download_complete = False
if 'download_filename' not in st.session_state:
st.session_state.download_filename = None
if 'error_message' not in st.session_state:
st.session_state.error_message = None
if 'aria2' not in st.session_state:
st.session_state.aria2 = init_aria2()
def download_torrent(magnet_link):
try:
# 添加下载任务
options = {
"max-download-limit": "0", # 不限速
"seed-time": "0", # 下载完成后不做种
"bt-detach-seed-only": "true"
}
gid = st.session_state.aria2.addUri(
[magnet_link],
options=options
)
st.session_state.error_message = None
st.session_state.downloading = True
# 进度监控循环
while True:
status = st.session_state.aria2.tellStatus(gid)
total = int(status['totalLength'])
downloaded = int(status['completedLength'])
if total > 0:
progress = (downloaded / total) * 100
else:
progress = 0
st.session_state.download_progress = progress
# 完成判断
if status['status'] == 'complete':
st.session_state.download_complete = True
st.session_state.download_filename = status['files'][0]['path']
break
# 超时处理(10分钟无进度)
if time.time() - int(status['updateTime']) > 600:
raise TimeoutError("下载超时")
# 用户取消
if not st.session_state.downloading:
st.session_state.aria2.remove(gid)
break
time.sleep(1)
st.session_state.downloading = False
except Exception as e:
st.session_state.error_message = f"下载失败: {str(e)}"
st.session_state.downloading = False
st.session_state.download_complete = False
def start_download_thread(magnet_link):
thread = threading.Thread(target=download_torrent, args=(magnet_link,))
thread.start()
# 界面布局
st.title("磁力链接下载器 🧲 (Aria2版)")
with st.form("magnet_form"):
magnet_link = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:btih:...")
submitted = st.form_submit_button("开始下载")
if submitted and magnet_link:
if st.session_state.downloading:
st.warning("当前已有下载任务在进行中")
else:
# 重置状态
st.session_state.download_progress = 0
st.session_state.download_complete = False
st.session_state.download_filename = None
st.session_state.error_message = None
st.session_state.downloading = True
start_download_thread(magnet_link)
if st.session_state.downloading:
st.info("下载状态")
progress_col, control_col = st.columns([4, 1])
with progress_col:
progress_bar = st.progress(int(st.session_state.download_progress))
st.write(f"当前进度:{st.session_state.download_progress:.1f}%")
with control_col:
if st.button("取消下载"):
st.session_state.downloading = False
st.session_state.download_complete = False
st.session_state.error_message = "下载已取消"
st.experimental_rerun()
if st.session_state.error_message:
st.error(st.session_state.error_message)
if st.session_state.download_complete:
st.success("下载完成!✅")
if st.session_state.download_filename and Path(st.session_state.download_filename).exists():
file_path = Path(st.session_state.download_filename)
file_size = file_path.stat().st_size
st.write(f"文件大小:{file_size/1024/1024:.2f} MB")
with open(file_path, "rb") as f:
st.download_button(
label="下载文件",
data=f,
file_name=file_path.name,
mime="application/octet-stream"
)
else:
st.error("文件不存在,可能下载失败")
# 注意事项
st.markdown("---")
st.info("""
**使用说明:**
1. 确保已安装 aria2:`brew install aria2` (macOS) 或 `sudo apt install aria2` (Linux)
2. 首次运行会自动启动 aria2 后台服务
3. 下载文件将保存到当前目录的 downloads 文件夹
""") |