BT / app.py
Ethscriptions's picture
Update app.py
0582fea verified
raw
history blame
5.04 kB
import streamlit as st
import libtorrent as lt
import time
import threading
import os
# Session状态初始化
if 'download_progress' not in st.session_state:
st.session_state.download_progress = 0
if 'downloading' not in st.session_state:
st.session_state.downloading = False
if 'download_complete' not in st.session_state:
st.session_state.download_complete = False
if 'download_filename' not in st.session_state:
st.session_state.download_filename = None
if 'error_message' not in st.session_state:
st.session_state.error_message = None
def download_torrent(magnet_link, save_path):
try:
ses = lt.session()
ses.listen_on(6881, 6891)
params = {
'save_path': save_path,
'storage_mode': lt.storage_mode_t(2)
}
handle = lt.add_magnet_uri(ses, magnet_link, params)
ses.start_dht()
ses.start_lsd()
ses.start_upnp()
ses.start_natpmp()
# 等待元数据
st.session_state.error_message = "正在获取元数据..."
while not handle.has_metadata():
time.sleep(1)
if not st.session_state.downloading:
return
# 获取文件信息
torinfo = handle.get_torrent_info()
files = torinfo.files()
if files.num_files() > 1:
st.session_state.error_message = "暂不支持多文件种子"
st.session_state.downloading = False
return
file_path = files.file_path(0)
st.session_state.download_filename = os.path.join(save_path, file_path)
st.session_state.error_message = None
st.session_state.downloading = True
# 更新下载进度
while handle.status().state != lt.torrent_status.seeding:
if not st.session_state.downloading:
handle.pause()
return
s = handle.status()
progress = s.progress * 100
st.session_state.download_progress = progress
time.sleep(0.5)
st.session_state.download_progress = 100
st.session_state.download_complete = True
st.session_state.downloading = False
except Exception as e:
st.session_state.error_message = f"下载出错:{str(e)}"
st.session_state.downloading = False
st.session_state.download_complete = False
def start_download_thread(magnet_link):
save_path = './downloads'
if not os.path.exists(save_path):
os.makedirs(save_path)
thread = threading.Thread(target=download_torrent, args=(magnet_link, save_path))
thread.start()
# 界面布局
st.title("磁力链接下载器 🧲")
with st.form("magnet_form"):
magnet_link = st.text_input("输入磁力链接:", placeholder="magnet:?xt=urn:btih:...")
submitted = st.form_submit_button("开始下载")
if submitted and magnet_link:
if st.session_state.downloading:
st.warning("当前已有下载任务在进行中")
else:
# 重置状态
st.session_state.download_progress = 0
st.session_state.download_complete = False
st.session_state.download_filename = None
st.session_state.error_message = None
st.session_state.downloading = True
start_download_thread(magnet_link)
if st.session_state.downloading:
st.info("下载状态")
progress_col, control_col = st.columns([4, 1])
with progress_col:
progress_bar = st.progress(int(st.session_state.download_progress))
st.write(f"当前进度:{st.session_state.download_progress:.1f}%")
with control_col:
if st.button("取消下载"):
st.session_state.downloading = False
st.session_state.download_complete = False
st.session_state.error_message = "下载已取消"
st.experimental_rerun()
if st.session_state.error_message:
st.error(st.session_state.error_message)
if st.session_state.download_complete:
st.success("下载完成!✅")
if st.session_state.download_filename and os.path.exists(st.session_state.download_filename):
file_size = os.path.getsize(st.session_state.download_filename)
st.write(f"文件大小:{file_size/1024/1024:.2f} MB")
with open(st.session_state.download_filename, "rb") as f:
st.download_button(
label="下载文件",
data=f,
file_name=os.path.basename(st.session_state.download_filename),
mime="application/octet-stream"
)
else:
st.error("文件不存在,可能下载失败")
# 注意事项提示
st.markdown("---")
st.info("""
**使用说明:**
1. 输入有效的磁力链接(必须以'magnet:?xt='开头)
2. 点击开始下载按钮
3. 下载完成后会出现文件下载按钮
4. 大文件下载可能需要较长时间,请保持页面开启
**注意:** 下载速度取决于种子可用性和网络环境
""")