import streamlit as st import libtorrent as lt import time import os import tempfile def download_magnet_link(magnet_link): """ 使用libtorrent下载磁力链接,并显示下载进度。 参数: magnet_link (str): 磁力链接 返回: str: 下载文件的本地路径,如果下载失败则返回 None """ try: ses = lt.session() ses.listen_on(6881, 6891) # 监听端口范围,用于BT下载 params = lt.parse_magnet_uri(magnet_link) add_params = lt.add_torrent_params() # 创建 add_torrent_params 对象 add_params.set_params(params) # 将解析磁力链接得到的参数设置进去 temp_dir = tempfile.TemporaryDirectory() # 创建临时目录用于存放下载文件 add_params.save_path = temp_dir.name # 设置 save_path 为临时目录路径 handle = ses.add_torrent(add_params) # 使用 add_torrent_params 添加 torrent 任务 progress_bar = st.empty() # 创建一个空的占位符,用于显示进度条 progress_text = st.empty() # 创建一个空的占位符,用于显示进度文字 status = handle.status() while not status.has_metadata(): # 等待获取 torrent 元数据 status = handle.status() time.sleep(1) torrent_info = handle.get_torrent_info() file_name = torrent_info.files()[0].path # 假设只下载第一个文件,您可以根据需求修改逻辑 download_path = os.path.join(temp_dir.name, file_name) # 拼接下载文件的完整路径 while status.state != lt.torrent_status.seeding: # 循环直到下载完成 (seeding 状态) status = handle.status() progress_percent = int(status.progress * 100) # 计算下载进度百分比 progress_bar.progress(progress_percent / 100) # 更新进度条 progress_text.text(f"下载进度: {progress_percent}% - 速度: {status.download_payload_rate / 1000:.2f} KB/s - 已下载: {status.total_done / (1024 * 1024):.2f} MB / 总大小: {status.total / (1024 * 1024):.2f} MB") # 更新进度文字 time.sleep(0.5) # 每0.5秒更新一次进度 ses.remove_torrent(handle) # 下载完成后移除 torrent 任务 ses.pause() # 暂停 libtorrent 会话 return download_path # 返回下载文件的完整路径 except Exception as e: st.error(f"下载失败: {e}") # 显示错误信息 return None st.title("磁力链接下载器") # Streamlit 应用标题 magnet_link_input = st.text_input("请输入磁力链接:") # 用户输入磁力链接的输入框 if magnet_link_input: # 当用户输入磁力链接后 download_path = download_magnet_link(magnet_link_input) # 调用下载函数 if download_path: # 如果下载成功 st.success("下载完成!文件已保存在临时目录,您可以下载文件。") # 显示下载成功信息 try: with open(download_path, "rb") as f: # 以二进制读取下载的文件 file_data = f.read() # 读取文件内容 file_name = os.path.basename(download_path) # 获取文件名 st.download_button( # 创建下载按钮 label="下载文件", # 按钮标签 data=file_data, # 下载的数据,即文件内容 file_name=file_name, # 下载的文件名 mime="application/octet-stream" # 设置MIME类型,通用二进制流 ) except Exception as e: st.error(f"无法读取文件或创建下载链接: {e}") # 显示文件读取或下载链接创建失败的错误信息