Spaces:
Sleeping
Sleeping
import streamlit as st | |
import libtorrent as lt | |
import os | |
import threading | |
import time | |
import logging | |
from contextlib import contextmanager | |
# 配置日志 | |
logging.basicConfig(level=logging.INFO) | |
# 初始化锁 | |
session_lock = threading.Lock() | |
def session_guard(): | |
"""线程安全的session_state访问上下文管理器""" | |
with session_lock: | |
try: | |
yield | |
finally: | |
pass | |
class TorrentSession: | |
def __init__(self): | |
self.ses = lt.session() | |
self.ses.listen_on(6881, 6891) | |
self.alerts = [] | |
self.last_alert_time = time.time() | |
def process_alerts(self): | |
self.alerts = [] | |
while (alert := self.ses.wait_for_alert(1000)): | |
self.alerts.append(alert) | |
self.ses.pop_alert() | |
class DownloadState: | |
_instance = None | |
_lock = threading.Lock() | |
def __new__(cls): | |
with cls._lock: | |
if cls._instance is None: | |
cls._instance = super().__new__(cls) | |
cls._instance.reset() | |
return cls._instance | |
def reset(self): | |
self.progress = 0.0 | |
self.is_downloading = False | |
self.complete = False | |
self.file_path = None | |
self.status = "等待开始" | |
self.info_hash = "" | |
# 关键改进点:强化初始化机制 | |
def safe_init_session(): | |
"""线程安全的session_state初始化""" | |
with session_guard(): | |
if 'download' not in st.session_state: | |
state = DownloadState() | |
st.session_state.download = { | |
'progress': state.progress, | |
'is_downloading': state.is_downloading, | |
'complete': state.complete, | |
'file_path': state.file_path, | |
'status': state.status, | |
'info_hash': state.info_hash | |
} | |
logging.info("Session state initialized") | |
def safe_sync_session(): | |
"""双重保障的状态同步机制""" | |
safe_init_session() # 同步前确保初始化 | |
state = DownloadState() | |
with session_guard(): | |
st.session_state.download.update({ | |
'progress': state.progress, | |
'is_downloading': state.is_downloading, | |
'complete': state.complete, | |
'file_path': state.file_path, | |
'status': state.status, | |
'info_hash': state.info_hash | |
}) | |
# 初始化session状态(主线程保障) | |
safe_init_session() | |
def download_worker(magnet_link, save_path): | |
try: | |
state = DownloadState() | |
ts = TorrentSession() | |
# 重置状态 | |
with session_guard(): | |
state.reset() | |
safe_sync_session() | |
# 使用新版API添加磁力链接 | |
params = { | |
'save_path': save_path, | |
'storage_mode': lt.storage_mode_t.storage_mode_sparse, | |
'flags': lt.torrent_flags.duplicate_is_error | lt.torrent_flags.auto_managed | |
} | |
handle = lt.add_magnet_uri(ts.ses, magnet_link, params) | |
state.info_hash = str(handle.info_hash()) | |
safe_sync_session() | |
# 事件驱动等待元数据 | |
state.status = "等待元数据..." | |
safe_sync_session() | |
metadata_received = False | |
start_time = time.time() | |
while not metadata_received: | |
ts.process_alerts() | |
for alert in ts.alerts: | |
if isinstance(alert, lt.metadata_received_alert): | |
if alert.handle == handle: | |
metadata_received = True | |
break | |
if time.time() - start_time > 300: # 5分钟超时 | |
raise TimeoutError("获取元数据超时") | |
time.sleep(0.5) | |
# 获取文件信息 | |
ti = handle.get_torrent_info() | |
state.status = f"开始下载 {ti.name()}" | |
safe_sync_session() | |
# 启动下载 | |
handle.set_sequential_download(True) | |
handle.resume() | |
# 下载进度监控 | |
state.is_downloading = True | |
safe_sync_session() | |
while not handle.status().is_seeding: | |
status = handle.status() | |
state.progress = status.progress * 100 | |
# 状态更新 | |
if status.state == lt.torrent_status.downloading_metadata: | |
state.status = "获取元数据..." | |
elif status.state == lt.torrent_status.downloading: | |
dl = status.download_rate / 1000 | |
up = status.upload_rate / 1000 | |
peers = status.num_peers | |
state.status = f"下载中: {dl:.1f}kB/s ↑{up:.1f}kB/s ↔{peers} peers" | |
safe_sync_session() | |
time.sleep(1) | |
# 下载完成 | |
state.is_downloading = False | |
state.complete = True | |
state.file_path = save_path | |
state.status = "下载完成" | |
safe_sync_session() | |
except Exception as e: | |
logging.error(f"下载错误: {str(e)}") | |
state.status = f"错误: {str(e)}" | |
state.is_downloading = False | |
safe_sync_session() | |
finally: | |
safe_sync_session() | |
# Streamlit界面 | |
st.title("🚀 磁力链接下载器") | |
with st.form("magnet_form"): | |
magnet = st.text_input("磁力链接", placeholder="magnet:?xt=urn:btih:...") | |
submitted = st.form_submit_button("开始下载", | |
disabled=st.session_state.download['is_downloading']) | |
if submitted: | |
if not magnet.startswith("magnet:"): | |
st.error("无效的磁力链接格式") | |
else: | |
save_dir = "./downloads" | |
os.makedirs(save_dir, exist_ok=True) | |
# 启动前同步状态 | |
DownloadState().reset() | |
safe_sync_session() | |
threading.Thread( | |
target=download_worker, | |
args=(magnet, save_dir), | |
daemon=True | |
).start() | |
# 实时状态显示 | |
safe_sync_session() | |
if st.session_state.download['is_downloading']: | |
cols = st.columns([1,3]) | |
with cols[0]: | |
st.metric("进度", f"{st.session_state.download['progress']:.1f}%") | |
with cols[1]: | |
st.progress(st.session_state.download['progress']/100) | |
st.info(st.session_state.download['status']) | |
if st.session_state.download['complete']: | |
st.success("下载完成!") | |
files = [f for f in os.listdir(st.session_state.download['file_path']) | |
if os.path.isfile(os.path.join(st.session_state.download['file_path'], f))] | |
if files: | |
with st.expander("下载文件"): | |
selected = st.selectbox("选择文件", files) | |
with open(os.path.join(st.session_state.download['file_path'], selected), "rb") as f: | |
st.download_button( | |
"下载文件", | |
f, | |
file_name=selected, | |
mime="application/octet-stream" | |
) | |
# 状态监控线程 | |
if 'monitor' not in st.session_state: | |
def status_monitor(): | |
while True: | |
safe_sync_session() | |
time.sleep(0.5) | |
threading.Thread(target=status_monitor, daemon=True).start() | |
st.session_state.monitor = True |