Spaces:
Runtime error
Runtime error
import gradio as gr | |
import yt_dlp | |
import os | |
import re | |
import json | |
from pathlib import Path | |
import tempfile | |
import shutil | |
from urllib.parse import urlparse, parse_qs | |
import threading | |
from concurrent.futures import ThreadPoolExecutor | |
import time | |
SUPPORTED_PLATFORMS = { | |
"抖音": r'(https?://)?(v\.douyin\.com|www\.douyin\.com)', | |
"快手": r'(https?://)?(v\.kuaishou\.com|www\.kuaishou\.com)', | |
"哔哩哔哩": r'(https?://)?(www\.bilibili\.com|b23\.tv)', | |
"YouTube": r'(https?://)?(www\.youtube\.com|youtu\.be)', | |
"小红书": r'(https?://)?(www\.xiaohongshu\.com|xhslink\.com)', | |
"微博": r'(https?://)?(weibo\.com|t\.cn)', | |
"西瓜视频": r'(https?://)?(www\.ixigua\.com)', | |
"腾讯视频": r'(https?://)?(v\.qq\.com)' | |
} | |
def get_platform_from_url(url): | |
""" | |
自动识别URL所属平台 | |
""" | |
if not url: | |
return None | |
for platform, pattern in SUPPORTED_PLATFORMS.items(): | |
if re.search(pattern, url): | |
return platform | |
return None | |
def get_platform_config(url, format_id=None): | |
""" | |
根据URL返回对的配置 | |
""" | |
platform = get_platform_from_url(url) | |
if not platform: | |
return None | |
# 基础配置 | |
base_config = { | |
'format': format_id if format_id else 'best', | |
'merge_output_format': 'mp4', | |
# 网络相关设置 | |
'socket_timeout': 10, # 减少超时时间 | |
'retries': 2, # 减少重试次数 | |
'fragment_retries': 2, | |
'retry_sleep': 2, # 减少重试等待时间 | |
'concurrent_fragment_downloads': 8, | |
} | |
configs = { | |
"抖音": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
}, | |
"快手": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
}, | |
"哔哩哔哩": { | |
**base_config, | |
'format': format_id if format_id else 'bestvideo+bestaudio/best', | |
# B站特定设置 | |
'concurrent_fragment_downloads': 16, | |
'file_access_retries': 2, | |
'extractor_retries': 2, | |
'fragment_retries': 2, | |
'retry_sleep': 2, | |
}, | |
"YouTube": { | |
**base_config, | |
'format': format_id if format_id else 'bestvideo+bestaudio/best', | |
}, | |
"小红书": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
}, | |
"微博": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
}, | |
"西瓜视频": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
}, | |
"腾讯视频": { | |
**base_config, | |
'format': format_id if format_id else 'best', | |
} | |
} | |
return configs.get(platform) | |
def validate_url(url): | |
""" | |
验证URL是否符合支持的平台格式 | |
""" | |
if not url: | |
return False, "请输入视频链接" | |
platform = get_platform_from_url(url) | |
if not platform: | |
return False, "不支持的平台或链接格式不正确" | |
return True, f"识别为{platform}平台" | |
def format_filesize(bytes): | |
""" | |
格式化文件大小显示 | |
""" | |
if not bytes: | |
return "未知大小" | |
for unit in ['B', 'KB', 'MB', 'GB']: | |
if bytes < 1024: | |
return f"{bytes:.1f} {unit}" | |
bytes /= 1024 | |
return f"{bytes:.1f} TB" | |
def parse_video_info(url): | |
""" | |
解析视频信息 | |
""" | |
try: | |
# 验证URL | |
is_valid, message = validate_url(url) | |
if not is_valid: | |
return {"status": "error", "message": message} | |
# 获取平台特定配置 | |
ydl_opts = get_platform_config(url) | |
if not ydl_opts: | |
return {"status": "error", "message": "不支持的平台"} | |
ydl_opts.update({ | |
'quiet': True, | |
'no_warnings': True, | |
}) | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=False) | |
if not info: | |
return {"status": "error", "message": "无法获取视频信息"} | |
# 获取可用的格式 | |
formats = [] | |
seen_resolutions = set() # 用于去重 | |
if 'formats' in info: | |
# 过滤和排序格式 | |
video_formats = [] | |
for f in info['formats']: | |
# 过滤音频格式和没有视频编码的格式 | |
if f.get('vcodec') == 'none' or not f.get('vcodec'): | |
continue | |
# 获取分辨率 | |
width = f.get('width', 0) | |
height = f.get('height', 0) | |
resolution = f.get('resolution', 'unknown') | |
if width and height: | |
resolution = f"{width}x{height}" | |
# 获取格式说明 | |
format_note = f.get('format_note', '') | |
if not format_note and resolution != 'unknown': | |
if height: | |
format_note = f"{height}p" | |
# 创建唯一标识用于去重 | |
resolution_key = f"{height}_{width}" if height and width else resolution | |
# 如果这个分辨率已经存在,跳过 | |
if resolution_key in seen_resolutions: | |
continue | |
seen_resolutions.add(resolution_key) | |
# 创建格式信息 | |
format_info = { | |
'format_id': f.get('format_id', ''), | |
'ext': f.get('ext', ''), | |
'resolution': resolution, | |
'format_note': format_note, | |
'quality': height or 0 # 用于排序 | |
} | |
video_formats.append(format_info) | |
# 按质量排序 | |
video_formats.sort(key=lambda x: x['quality'], reverse=True) | |
formats = video_formats | |
# 获取��览图 | |
thumbnail = info.get('thumbnail', '') | |
if not thumbnail and 'thumbnails' in info: | |
thumbnails = info['thumbnails'] | |
if thumbnails: | |
thumbnail = thumbnails[-1]['url'] | |
platform = get_platform_from_url(url) | |
return { | |
"status": "success", | |
"message": "解析成功", | |
"platform": platform, | |
"title": info.get('title', '未知标题'), | |
"duration": info.get('duration', 0), | |
"formats": formats, | |
"thumbnail": thumbnail, | |
"description": info.get('description', ''), | |
"webpage_url": info.get('webpage_url', url), | |
} | |
except Exception as e: | |
return {"status": "error", "message": f"解析失败: {str(e)}"} | |
class DownloadProgress: | |
def __init__(self): | |
self.progress = 0 | |
self.status = "准备下载" | |
self.lock = threading.Lock() | |
def update(self, d): | |
with self.lock: | |
if d.get('status') == 'downloading': | |
total = d.get('total_bytes') | |
downloaded = d.get('downloaded_bytes') | |
if total and downloaded: | |
self.progress = (downloaded / total) * 100 | |
self.status = f"下载中: {d.get('_percent_str', '0%')} of {d.get('_total_bytes_str', 'unknown')}" | |
elif d.get('status') == 'finished': | |
self.progress = 100 | |
self.status = "下载完成,正在处理..." | |
def get_downloads_dir(): | |
""" | |
获取用户的下载目录 | |
""" | |
# 获取用户主目录 | |
home = str(Path.home()) | |
# 获取下载目录 | |
downloads_dir = os.path.join(home, "Downloads") | |
# 如果下载目录不存在,则创建 | |
if not os.path.exists(downloads_dir): | |
downloads_dir = home | |
return downloads_dir | |
def clean_filename(title, platform): | |
""" | |
清理并格式化文件名 | |
""" | |
# 移除非法字符 | |
illegal_chars = r'[<>:"/\\|?*\n\r\t]' | |
clean_title = re.sub(illegal_chars, '', title) | |
# 移除多余的空格和特殊符号 | |
clean_title = re.sub(r'\s+', ' ', clean_title).strip() | |
clean_title = re.sub(r'[,.,。!!@#$%^&*()()+=\[\]{};:]+', '', clean_title) | |
# 移除表情符号 | |
clean_title = re.sub(r'[\U0001F300-\U0001F9FF]', '', clean_title) | |
# 添加平台标识 | |
platform_suffix = { | |
"抖音": "抖音", | |
"快手": "快手", | |
"哔哩哔哩": "B站", | |
"YouTube": "YT", | |
"小红书": "XHS", | |
"微博": "微博", | |
"西瓜视频": "西瓜", | |
"腾讯视频": "腾讯" | |
} | |
# 限制标题长度(考虑到平台标识的长度) | |
max_length = 50 | |
if len(clean_title) > max_length: | |
clean_title = clean_title[:max_length-3] + '...' | |
# 添加时间戳和平台标识 | |
timestamp = time.strftime("%Y%m%d", time.localtime()) | |
suffix = platform_suffix.get(platform, "视频") | |
# 最终文件名格式:标题_时间_平台.mp4 | |
final_name = f"{clean_title}_{timestamp}_{suffix}" | |
return final_name | |
def download_single_video(url, format_id, progress_tracker): | |
""" | |
下载单个视频 | |
""" | |
try: | |
# 创建临时目录 | |
temp_dir = tempfile.mkdtemp() | |
# 获取平台信息 | |
platform = get_platform_from_url(url) | |
if not platform: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": "不支持的平台"} | |
# 获取视频信息 | |
with yt_dlp.YoutubeDL({'quiet': True}) as ydl: | |
info = ydl.extract_info(url, download=False) | |
# 清理并格式化文件名 | |
clean_title = clean_filename(info.get('title', 'video'), platform) | |
ydl_opts = get_platform_config(url, format_id) | |
if not ydl_opts: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": "不支持的平台"} | |
# 更新下载配置 | |
ydl_opts.update({ | |
'quiet': False, | |
'no_warnings': False, | |
'extract_flat': False, | |
'paths': {'home': temp_dir}, | |
'progress_hooks': [progress_tracker.update], | |
'outtmpl': clean_title + '.%(ext)s', # 不使用绝对路径 | |
'ignoreerrors': True, # 忽略部分错误继续下载 | |
'noprogress': False, # 显示进度 | |
'continuedl': True, # 支持断点续传 | |
'retries': float('inf'), # 无限重试 | |
'fragment_retries': float('inf'), # 片段无限重试 | |
'skip_unavailable_fragments': True, # 跳过不可用片段 | |
'no_abort_on_error': True, # 发生错误时不中止 | |
}) | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
try: | |
info = ydl.extract_info(url, download=True) | |
if 'requested_downloads' in info: | |
file_path = info['requested_downloads'][0]['filepath'] | |
else: | |
file_path = os.path.join(temp_dir, f"{clean_title}.mp4") | |
if os.path.exists(file_path): | |
# 检查文件大小 | |
file_size = os.path.getsize(file_path) | |
if file_size == 0: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": "下载的文件大小为0,可能下载失败"} | |
# 创建一个新的临时文件 | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
temp_file.close() | |
shutil.copy2(file_path, temp_file.name) | |
# 清理原始临时目录 | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return { | |
"status": "success", | |
"file_path": temp_file.name, | |
"title": clean_title, | |
"ext": "mp4" | |
} | |
else: | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": "下载文件不存在"} | |
except Exception as e: | |
error_msg = str(e) | |
# 如果是超时错误且进度不为0,继续下载 | |
if ("timed out" in error_msg or "timeout" in error_msg) and progress_tracker.progress > 0: | |
return { | |
"status": "success", | |
"file_path": file_path if 'file_path' in locals() else None, | |
"title": clean_title, | |
"ext": "mp4" | |
} | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": f"下载过程中出错: {error_msg}"} | |
except Exception as e: | |
if 'temp_dir' in locals(): | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return {"status": "error", "message": str(e)} | |
def download_video(urls, format_id=None): | |
""" | |
下载视频并返回文件 | |
""" | |
if isinstance(urls, str): | |
urls = [url.strip() for url in urls.split('\n') if url.strip()] | |
if not urls: | |
return "请输入至少一个视频链接", None, 0, "未开始下载" | |
progress_tracker = DownloadProgress() | |
result = download_single_video(urls[0], format_id, progress_tracker) | |
if result["status"] == "success": | |
try: | |
# 返回文件路径供Gradio处理下载 | |
return "下载成功,正在传输...", result["file_path"], 100, "下载完成" | |
except Exception as e: | |
return f"文件处理失败: {str(e)}", None, 0, "下载失败" | |
else: | |
return f"下载失败: {result.get('message', '未知错误')}", None, 0, "下载失败" | |
# 创建Gradio界面 | |
with gr.Blocks(title="视频下载工具", theme=gr.themes.Soft()) as demo: | |
# 存储视频信息的状态变量 | |
video_info_state = gr.State({}) | |
with gr.Column(elem_id="header"): | |
gr.Markdown(""" | |
# 🎥 视频下载工具 | |
一键下载各大平台视频,支持以下平台: | |
""") | |
with gr.Row(): | |
for platform in SUPPORTED_PLATFORMS.keys(): | |
gr.Markdown(f"<span class='platform-badge'>{platform}</span>", elem_classes="platform") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# 输入部分 | |
url_input = gr.Textbox( | |
label="视频链接", | |
placeholder="请输入视频链接,支持批量下载(每行一个链接)...", | |
lines=3, | |
info="支持多个平台的视频链接,自动识别平台类型" | |
) | |
parse_btn = gr.Button("解析视频", variant="secondary", size="lg") | |
# 视频信息显示(使用Accordion组件) | |
with gr.Accordion("视频详细信息", open=False, visible=False) as video_info_accordion: | |
video_info = gr.JSON(show_label=False) | |
format_choice = gr.Dropdown( | |
label="选择清晰度", | |
choices=[], | |
interactive=True, | |
visible=False | |
) | |
download_btn = gr.Button("开始下载", variant="primary", size="lg", interactive=False) | |
with gr.Column(scale=3): | |
# 预览和输出部分 | |
with gr.Row(): | |
preview_image = gr.Image(label="视频预览", visible=False) | |
with gr.Row(): | |
progress = gr.Slider( | |
minimum=0, | |
maximum=100, | |
value=0, | |
label="下载进度", | |
interactive=False | |
) | |
status = gr.Textbox( | |
label="状态信息", | |
value="等待开始下载...", | |
interactive=False | |
) | |
# 使用File组件来处理下载 | |
output_file = gr.File(label="下载文件") | |
# 添加自定义CSS | |
gr.Markdown(""" | |
<style> | |
#header { | |
text-align: center; | |
margin-bottom: 2rem; | |
} | |
.platform-badge { | |
display: inline-block; | |
padding: 0.5rem 1rem; | |
margin: 0.5rem; | |
border-radius: 2rem; | |
background-color: #2196F3; | |
color: white; | |
font-weight: bold; | |
} | |
.gradio-container { | |
max-width: 1200px !important; | |
} | |
.contain { | |
margin: 0 auto; | |
padding: 2rem; | |
} | |
.download-link { | |
display: inline-block; | |
padding: 0.8rem 1.5rem; | |
background-color: #4CAF50; | |
color: white; | |
text-decoration: none; | |
border-radius: 0.5rem; | |
margin-top: 1rem; | |
font-weight: bold; | |
transition: background-color 0.3s; | |
} | |
.download-link:hover { | |
background-color: #45a049; | |
} | |
</style> | |
""") | |
def update_video_info(url): | |
"""更新视频信息""" | |
# 只解析第一个链接 | |
first_url = url.split('\n')[0].strip() | |
info = parse_video_info(first_url) | |
if info["status"] == "success": | |
# 准备清晰度选项 | |
format_choices = [] | |
for fmt in info["formats"]: | |
# 构建格式标签 | |
label_parts = [] | |
if fmt['format_note']: | |
label_parts.append(fmt['format_note']) | |
if fmt['resolution'] != 'unknown': | |
label_parts.append(fmt['resolution']) | |
label = " - ".join(filter(None, label_parts)) | |
if not label: | |
label = f"格式 {fmt['format_id']}" | |
format_choices.append((label, fmt['format_id'])) | |
return [ | |
gr.update(visible=True, value=info), # video_info | |
gr.update(visible=True, choices=format_choices, value=format_choices[0][1] if format_choices else None), # format_choice | |
gr.update(interactive=True), # download_btn | |
gr.update(visible=True, value=info["thumbnail"]), # preview_image | |
f"解析成功: {info['title']} ({info['platform']})", # status | |
gr.update(visible=True) # video_info_accordion | |
] | |
else: | |
return [ | |
gr.update(visible=False), # video_info | |
gr.update(visible=False), # format_choice | |
gr.update(interactive=False), # download_btn | |
gr.update(visible=False), # preview_image | |
info["message"], # status | |
gr.update(visible=False) # video_info_accordion | |
] | |
# 绑定解析按钮事件 | |
parse_btn.click( | |
fn=update_video_info, | |
inputs=[url_input], | |
outputs=[video_info, format_choice, download_btn, preview_image, status, video_info_accordion] | |
) | |
# 绑定下载按钮事件 | |
download_btn.click( | |
fn=download_video, | |
inputs=[url_input, format_choice], | |
outputs=[status, output_file, progress, status] | |
) | |
# 启动应用 | |
if __name__ == "__main__": | |
demo.launch() |