import streamlit as st import libtorrent as lt import os import threading import time import logging from contextlib import contextmanager # 配置日志 logging.basicConfig(level=logging.INFO) # 初始化锁 session_lock = threading.Lock() @contextmanager def session_guard(): """线程安全的session_state访问上下文管理器""" with session_lock: try: yield finally: pass class TorrentSession: def __init__(self): self.ses = lt.session() self.ses.listen_on(6881, 6891) self.alerts = [] self.last_alert_time = time.time() def process_alerts(self): self.alerts = [] while (alert := self.ses.wait_for_alert(1000)): self.alerts.append(alert) self.ses.pop_alert() class DownloadState: _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance.reset() return cls._instance def reset(self): self.progress = 0.0 self.is_downloading = False self.complete = False self.file_path = None self.status = "等待开始" self.info_hash = "" # 关键改进点:强化初始化机制 def safe_init_session(): """线程安全的session_state初始化""" with session_guard(): if 'download' not in st.session_state: state = DownloadState() st.session_state.download = { 'progress': state.progress, 'is_downloading': state.is_downloading, 'complete': state.complete, 'file_path': state.file_path, 'status': state.status, 'info_hash': state.info_hash } logging.info("Session state initialized") def safe_sync_session(): """双重保障的状态同步机制""" safe_init_session() # 同步前确保初始化 state = DownloadState() with session_guard(): st.session_state.download.update({ 'progress': state.progress, 'is_downloading': state.is_downloading, 'complete': state.complete, 'file_path': state.file_path, 'status': state.status, 'info_hash': state.info_hash }) # 初始化session状态(主线程保障) safe_init_session() def download_worker(magnet_link, save_path): try: state = DownloadState() ts = TorrentSession() # 重置状态 with session_guard(): state.reset() safe_sync_session() # 使用新版API添加磁力链接 params = { 'save_path': save_path, 'storage_mode': lt.storage_mode_t.storage_mode_sparse, 'flags': lt.torrent_flags.duplicate_is_error | lt.torrent_flags.auto_managed } handle = lt.add_magnet_uri(ts.ses, magnet_link, params) state.info_hash = str(handle.info_hash()) safe_sync_session() # 事件驱动等待元数据 state.status = "等待元数据..." safe_sync_session() metadata_received = False start_time = time.time() while not metadata_received: ts.process_alerts() for alert in ts.alerts: if isinstance(alert, lt.metadata_received_alert): if alert.handle == handle: metadata_received = True break if time.time() - start_time > 300: # 5分钟超时 raise TimeoutError("获取元数据超时") time.sleep(0.5) # 获取文件信息 ti = handle.get_torrent_info() state.status = f"开始下载 {ti.name()}" safe_sync_session() # 启动下载 handle.set_sequential_download(True) handle.resume() # 下载进度监控 state.is_downloading = True safe_sync_session() while not handle.status().is_seeding: status = handle.status() state.progress = status.progress * 100 # 状态更新 if status.state == lt.torrent_status.downloading_metadata: state.status = "获取元数据..." elif status.state == lt.torrent_status.downloading: dl = status.download_rate / 1000 up = status.upload_rate / 1000 peers = status.num_peers state.status = f"下载中: {dl:.1f}kB/s ↑{up:.1f}kB/s ↔{peers} peers" safe_sync_session() time.sleep(1) # 下载完成 state.is_downloading = False state.complete = True state.file_path = save_path state.status = "下载完成" safe_sync_session() except Exception as e: logging.error(f"下载错误: {str(e)}") state.status = f"错误: {str(e)}" state.is_downloading = False safe_sync_session() finally: safe_sync_session() # Streamlit界面 st.title("🚀 磁力链接下载器") with st.form("magnet_form"): magnet = st.text_input("磁力链接", placeholder="magnet:?xt=urn:btih:...") submitted = st.form_submit_button("开始下载", disabled=st.session_state.download['is_downloading']) if submitted: if not magnet.startswith("magnet:"): st.error("无效的磁力链接格式") else: save_dir = "./downloads" os.makedirs(save_dir, exist_ok=True) # 启动前同步状态 DownloadState().reset() safe_sync_session() threading.Thread( target=download_worker, args=(magnet, save_dir), daemon=True ).start() # 实时状态显示 safe_sync_session() if st.session_state.download['is_downloading']: cols = st.columns([1,3]) with cols[0]: st.metric("进度", f"{st.session_state.download['progress']:.1f}%") with cols[1]: st.progress(st.session_state.download['progress']/100) st.info(st.session_state.download['status']) if st.session_state.download['complete']: st.success("下载完成!") files = [f for f in os.listdir(st.session_state.download['file_path']) if os.path.isfile(os.path.join(st.session_state.download['file_path'], f))] if files: with st.expander("下载文件"): selected = st.selectbox("选择文件", files) with open(os.path.join(st.session_state.download['file_path'], selected), "rb") as f: st.download_button( "下载文件", f, file_name=selected, mime="application/octet-stream" ) # 状态监控线程 if 'monitor' not in st.session_state: def status_monitor(): while True: safe_sync_session() time.sleep(0.5) threading.Thread(target=status_monitor, daemon=True).start() st.session_state.monitor = True