BT / app.py
Ethscriptions's picture
Update app.py
e8bd76b verified
raw
history blame
7.32 kB
import streamlit as st
import libtorrent as lt
import os
import threading
import time
import logging
from contextlib import contextmanager
# 配置日志
logging.basicConfig(level=logging.INFO)
# 初始化锁
session_lock = threading.Lock()
@contextmanager
def session_guard():
"""线程安全的session_state访问上下文管理器"""
with session_lock:
try:
yield
finally:
pass
class TorrentSession:
def __init__(self):
self.ses = lt.session()
self.ses.listen_on(6881, 6891)
self.alerts = []
self.last_alert_time = time.time()
def process_alerts(self):
self.alerts = []
while (alert := self.ses.wait_for_alert(1000)):
self.alerts.append(alert)
self.ses.pop_alert()
class DownloadState:
_instance = None
_lock = threading.Lock()
def __new__(cls):
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.reset()
return cls._instance
def reset(self):
self.progress = 0.0
self.is_downloading = False
self.complete = False
self.file_path = None
self.status = "等待开始"
self.info_hash = ""
# 关键改进点:强化初始化机制
def safe_init_session():
"""线程安全的session_state初始化"""
with session_guard():
if 'download' not in st.session_state:
state = DownloadState()
st.session_state.download = {
'progress': state.progress,
'is_downloading': state.is_downloading,
'complete': state.complete,
'file_path': state.file_path,
'status': state.status,
'info_hash': state.info_hash
}
logging.info("Session state initialized")
def safe_sync_session():
"""双重保障的状态同步机制"""
safe_init_session() # 同步前确保初始化
state = DownloadState()
with session_guard():
st.session_state.download.update({
'progress': state.progress,
'is_downloading': state.is_downloading,
'complete': state.complete,
'file_path': state.file_path,
'status': state.status,
'info_hash': state.info_hash
})
# 初始化session状态(主线程保障)
safe_init_session()
def download_worker(magnet_link, save_path):
try:
state = DownloadState()
ts = TorrentSession()
# 重置状态
with session_guard():
state.reset()
safe_sync_session()
# 使用新版API添加磁力链接
params = {
'save_path': save_path,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
'flags': lt.torrent_flags.duplicate_is_error | lt.torrent_flags.auto_managed
}
handle = lt.add_magnet_uri(ts.ses, magnet_link, params)
state.info_hash = str(handle.info_hash())
safe_sync_session()
# 事件驱动等待元数据
state.status = "等待元数据..."
safe_sync_session()
metadata_received = False
start_time = time.time()
while not metadata_received:
ts.process_alerts()
for alert in ts.alerts:
if isinstance(alert, lt.metadata_received_alert):
if alert.handle == handle:
metadata_received = True
break
if time.time() - start_time > 300: # 5分钟超时
raise TimeoutError("获取元数据超时")
time.sleep(0.5)
# 获取文件信息
ti = handle.get_torrent_info()
state.status = f"开始下载 {ti.name()}"
safe_sync_session()
# 启动下载
handle.set_sequential_download(True)
handle.resume()
# 下载进度监控
state.is_downloading = True
safe_sync_session()
while not handle.status().is_seeding:
status = handle.status()
state.progress = status.progress * 100
# 状态更新
if status.state == lt.torrent_status.downloading_metadata:
state.status = "获取元数据..."
elif status.state == lt.torrent_status.downloading:
dl = status.download_rate / 1000
up = status.upload_rate / 1000
peers = status.num_peers
state.status = f"下载中: {dl:.1f}kB/s ↑{up:.1f}kB/s ↔{peers} peers"
safe_sync_session()
time.sleep(1)
# 下载完成
state.is_downloading = False
state.complete = True
state.file_path = save_path
state.status = "下载完成"
safe_sync_session()
except Exception as e:
logging.error(f"下载错误: {str(e)}")
state.status = f"错误: {str(e)}"
state.is_downloading = False
safe_sync_session()
finally:
safe_sync_session()
# Streamlit界面
st.title("🚀 磁力链接下载器")
with st.form("magnet_form"):
magnet = st.text_input("磁力链接", placeholder="magnet:?xt=urn:btih:...")
submitted = st.form_submit_button("开始下载",
disabled=st.session_state.download['is_downloading'])
if submitted:
if not magnet.startswith("magnet:"):
st.error("无效的磁力链接格式")
else:
save_dir = "./downloads"
os.makedirs(save_dir, exist_ok=True)
# 启动前同步状态
DownloadState().reset()
safe_sync_session()
threading.Thread(
target=download_worker,
args=(magnet, save_dir),
daemon=True
).start()
# 实时状态显示
safe_sync_session()
if st.session_state.download['is_downloading']:
cols = st.columns([1,3])
with cols[0]:
st.metric("进度", f"{st.session_state.download['progress']:.1f}%")
with cols[1]:
st.progress(st.session_state.download['progress']/100)
st.info(st.session_state.download['status'])
if st.session_state.download['complete']:
st.success("下载完成!")
files = [f for f in os.listdir(st.session_state.download['file_path'])
if os.path.isfile(os.path.join(st.session_state.download['file_path'], f))]
if files:
with st.expander("下载文件"):
selected = st.selectbox("选择文件", files)
with open(os.path.join(st.session_state.download['file_path'], selected), "rb") as f:
st.download_button(
"下载文件",
f,
file_name=selected,
mime="application/octet-stream"
)
# 状态监控线程
if 'monitor' not in st.session_state:
def status_monitor():
while True:
safe_sync_session()
time.sleep(0.5)
threading.Thread(target=status_monitor, daemon=True).start()
st.session_state.monitor = True