shipinxiazai / app.py
Ethscriptions's picture
Create app.py
315962b verified
import gradio as gr
import yt_dlp
import os
import re
import json
from pathlib import Path
import tempfile
import shutil
from urllib.parse import urlparse, parse_qs
import threading
from concurrent.futures import ThreadPoolExecutor
import time
SUPPORTED_PLATFORMS = {
"抖音": r'(https?://)?(v\.douyin\.com|www\.douyin\.com)',
"快手": r'(https?://)?(v\.kuaishou\.com|www\.kuaishou\.com)',
"哔哩哔哩": r'(https?://)?(www\.bilibili\.com|b23\.tv)',
"YouTube": r'(https?://)?(www\.youtube\.com|youtu\.be)',
"小红书": r'(https?://)?(www\.xiaohongshu\.com|xhslink\.com)',
"微博": r'(https?://)?(weibo\.com|t\.cn)',
"西瓜视频": r'(https?://)?(www\.ixigua\.com)',
"腾讯视频": r'(https?://)?(v\.qq\.com)'
}
def get_platform_from_url(url):
"""
自动识别URL所属平台
"""
if not url:
return None
for platform, pattern in SUPPORTED_PLATFORMS.items():
if re.search(pattern, url):
return platform
return None
def get_platform_config(url, format_id=None):
"""
根据URL返回对的配置
"""
platform = get_platform_from_url(url)
if not platform:
return None
# 基础配置
base_config = {
'format': format_id if format_id else 'best',
'merge_output_format': 'mp4',
# 网络相关设置
'socket_timeout': 10, # 减少超时时间
'retries': 2, # 减少重试次数
'fragment_retries': 2,
'retry_sleep': 2, # 减少重试等待时间
'concurrent_fragment_downloads': 8,
}
configs = {
"抖音": {
**base_config,
'format': format_id if format_id else 'best',
},
"快手": {
**base_config,
'format': format_id if format_id else 'best',
},
"哔哩哔哩": {
**base_config,
'format': format_id if format_id else 'bestvideo+bestaudio/best',
# B站特定设置
'concurrent_fragment_downloads': 16,
'file_access_retries': 2,
'extractor_retries': 2,
'fragment_retries': 2,
'retry_sleep': 2,
},
"YouTube": {
**base_config,
'format': format_id if format_id else 'bestvideo+bestaudio/best',
},
"小红书": {
**base_config,
'format': format_id if format_id else 'best',
},
"微博": {
**base_config,
'format': format_id if format_id else 'best',
},
"西瓜视频": {
**base_config,
'format': format_id if format_id else 'best',
},
"腾讯视频": {
**base_config,
'format': format_id if format_id else 'best',
}
}
return configs.get(platform)
def validate_url(url):
"""
验证URL是否符合支持的平台格式
"""
if not url:
return False, "请输入视频链接"
platform = get_platform_from_url(url)
if not platform:
return False, "不支持的平台或链接格式不正确"
return True, f"识别为{platform}平台"
def format_filesize(bytes):
"""
格式化文件大小显示
"""
if not bytes:
return "未知大小"
for unit in ['B', 'KB', 'MB', 'GB']:
if bytes < 1024:
return f"{bytes:.1f} {unit}"
bytes /= 1024
return f"{bytes:.1f} TB"
def parse_video_info(url):
"""
解析视频信息
"""
try:
# 验证URL
is_valid, message = validate_url(url)
if not is_valid:
return {"status": "error", "message": message}
# 获取平台特定配置
ydl_opts = get_platform_config(url)
if not ydl_opts:
return {"status": "error", "message": "不支持的平台"}
ydl_opts.update({
'quiet': True,
'no_warnings': True,
})
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
if not info:
return {"status": "error", "message": "无法获取视频信息"}
# 获取可用的格式
formats = []
seen_resolutions = set() # 用于去重
if 'formats' in info:
# 过滤和排序格式
video_formats = []
for f in info['formats']:
# 过滤音频格式和没有视频编码的格式
if f.get('vcodec') == 'none' or not f.get('vcodec'):
continue
# 获取分辨率
width = f.get('width', 0)
height = f.get('height', 0)
resolution = f.get('resolution', 'unknown')
if width and height:
resolution = f"{width}x{height}"
# 获取格式说明
format_note = f.get('format_note', '')
if not format_note and resolution != 'unknown':
if height:
format_note = f"{height}p"
# 创建唯一标识用于去重
resolution_key = f"{height}_{width}" if height and width else resolution
# 如果这个分辨率已经存在,跳过
if resolution_key in seen_resolutions:
continue
seen_resolutions.add(resolution_key)
# 创建格式信息
format_info = {
'format_id': f.get('format_id', ''),
'ext': f.get('ext', ''),
'resolution': resolution,
'format_note': format_note,
'quality': height or 0 # 用于排序
}
video_formats.append(format_info)
# 按质量排序
video_formats.sort(key=lambda x: x['quality'], reverse=True)
formats = video_formats
# 获取��览图
thumbnail = info.get('thumbnail', '')
if not thumbnail and 'thumbnails' in info:
thumbnails = info['thumbnails']
if thumbnails:
thumbnail = thumbnails[-1]['url']
platform = get_platform_from_url(url)
return {
"status": "success",
"message": "解析成功",
"platform": platform,
"title": info.get('title', '未知标题'),
"duration": info.get('duration', 0),
"formats": formats,
"thumbnail": thumbnail,
"description": info.get('description', ''),
"webpage_url": info.get('webpage_url', url),
}
except Exception as e:
return {"status": "error", "message": f"解析失败: {str(e)}"}
class DownloadProgress:
def __init__(self):
self.progress = 0
self.status = "准备下载"
self.lock = threading.Lock()
def update(self, d):
with self.lock:
if d.get('status') == 'downloading':
total = d.get('total_bytes')
downloaded = d.get('downloaded_bytes')
if total and downloaded:
self.progress = (downloaded / total) * 100
self.status = f"下载中: {d.get('_percent_str', '0%')} of {d.get('_total_bytes_str', 'unknown')}"
elif d.get('status') == 'finished':
self.progress = 100
self.status = "下载完成,正在处理..."
def get_downloads_dir():
"""
获取用户的下载目录
"""
# 获取用户主目录
home = str(Path.home())
# 获取下载目录
downloads_dir = os.path.join(home, "Downloads")
# 如果下载目录不存在,则创建
if not os.path.exists(downloads_dir):
downloads_dir = home
return downloads_dir
def clean_filename(title, platform):
"""
清理并格式化文件名
"""
# 移除非法字符
illegal_chars = r'[<>:"/\\|?*\n\r\t]'
clean_title = re.sub(illegal_chars, '', title)
# 移除多余的空格和特殊符号
clean_title = re.sub(r'\s+', ' ', clean_title).strip()
clean_title = re.sub(r'[,.,。!!@#$%^&*()()+=\[\]{};:]+', '', clean_title)
# 移除表情符号
clean_title = re.sub(r'[\U0001F300-\U0001F9FF]', '', clean_title)
# 添加平台标识
platform_suffix = {
"抖音": "抖音",
"快手": "快手",
"哔哩哔哩": "B站",
"YouTube": "YT",
"小红书": "XHS",
"微博": "微博",
"西瓜视频": "西瓜",
"腾讯视频": "腾讯"
}
# 限制标题长度(考虑到平台标识的长度)
max_length = 50
if len(clean_title) > max_length:
clean_title = clean_title[:max_length-3] + '...'
# 添加时间戳和平台标识
timestamp = time.strftime("%Y%m%d", time.localtime())
suffix = platform_suffix.get(platform, "视频")
# 最终文件名格式:标题_时间_平台.mp4
final_name = f"{clean_title}_{timestamp}_{suffix}"
return final_name
def download_single_video(url, format_id, progress_tracker):
"""
下载单个视频
"""
try:
# 创建临时目录
temp_dir = tempfile.mkdtemp()
# 获取平台信息
platform = get_platform_from_url(url)
if not platform:
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": "不支持的平台"}
# 获取视频信息
with yt_dlp.YoutubeDL({'quiet': True}) as ydl:
info = ydl.extract_info(url, download=False)
# 清理并格式化文件名
clean_title = clean_filename(info.get('title', 'video'), platform)
ydl_opts = get_platform_config(url, format_id)
if not ydl_opts:
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": "不支持的平台"}
# 更新下载配置
ydl_opts.update({
'quiet': False,
'no_warnings': False,
'extract_flat': False,
'paths': {'home': temp_dir},
'progress_hooks': [progress_tracker.update],
'outtmpl': clean_title + '.%(ext)s', # 不使用绝对路径
'ignoreerrors': True, # 忽略部分错误继续下载
'noprogress': False, # 显示进度
'continuedl': True, # 支持断点续传
'retries': float('inf'), # 无限重试
'fragment_retries': float('inf'), # 片段无限重试
'skip_unavailable_fragments': True, # 跳过不可用片段
'no_abort_on_error': True, # 发生错误时不中止
})
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=True)
if 'requested_downloads' in info:
file_path = info['requested_downloads'][0]['filepath']
else:
file_path = os.path.join(temp_dir, f"{clean_title}.mp4")
if os.path.exists(file_path):
# 检查文件大小
file_size = os.path.getsize(file_path)
if file_size == 0:
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": "下载的文件大小为0,可能下载失败"}
# 创建一个新的临时文件
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
temp_file.close()
shutil.copy2(file_path, temp_file.name)
# 清理原始临时目录
shutil.rmtree(temp_dir, ignore_errors=True)
return {
"status": "success",
"file_path": temp_file.name,
"title": clean_title,
"ext": "mp4"
}
else:
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": "下载文件不存在"}
except Exception as e:
error_msg = str(e)
# 如果是超时错误且进度不为0,继续下载
if ("timed out" in error_msg or "timeout" in error_msg) and progress_tracker.progress > 0:
return {
"status": "success",
"file_path": file_path if 'file_path' in locals() else None,
"title": clean_title,
"ext": "mp4"
}
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": f"下载过程中出错: {error_msg}"}
except Exception as e:
if 'temp_dir' in locals():
shutil.rmtree(temp_dir, ignore_errors=True)
return {"status": "error", "message": str(e)}
def download_video(urls, format_id=None):
"""
下载视频并返回文件
"""
if isinstance(urls, str):
urls = [url.strip() for url in urls.split('\n') if url.strip()]
if not urls:
return "请输入至少一个视频链接", None, 0, "未开始下载"
progress_tracker = DownloadProgress()
result = download_single_video(urls[0], format_id, progress_tracker)
if result["status"] == "success":
try:
# 返回文件路径供Gradio处理下载
return "下载成功,正在传输...", result["file_path"], 100, "下载完成"
except Exception as e:
return f"文件处理失败: {str(e)}", None, 0, "下载失败"
else:
return f"下载失败: {result.get('message', '未知错误')}", None, 0, "下载失败"
# 创建Gradio界面
with gr.Blocks(title="视频下载工具", theme=gr.themes.Soft()) as demo:
# 存储视频信息的状态变量
video_info_state = gr.State({})
with gr.Column(elem_id="header"):
gr.Markdown("""
# 🎥 视频下载工具
一键下载各大平台视频,支持以下平台:
""")
with gr.Row():
for platform in SUPPORTED_PLATFORMS.keys():
gr.Markdown(f"<span class='platform-badge'>{platform}</span>", elem_classes="platform")
with gr.Row():
with gr.Column(scale=2):
# 输入部分
url_input = gr.Textbox(
label="视频链接",
placeholder="请输入视频链接,支持批量下载(每行一个链接)...",
lines=3,
info="支持多个平台的视频链接,自动识别平台类型"
)
parse_btn = gr.Button("解析视频", variant="secondary", size="lg")
# 视频信息显示(使用Accordion组件)
with gr.Accordion("视频详细信息", open=False, visible=False) as video_info_accordion:
video_info = gr.JSON(show_label=False)
format_choice = gr.Dropdown(
label="选择清晰度",
choices=[],
interactive=True,
visible=False
)
download_btn = gr.Button("开始下载", variant="primary", size="lg", interactive=False)
with gr.Column(scale=3):
# 预览和输出部分
with gr.Row():
preview_image = gr.Image(label="视频预览", visible=False)
with gr.Row():
progress = gr.Slider(
minimum=0,
maximum=100,
value=0,
label="下载进度",
interactive=False
)
status = gr.Textbox(
label="状态信息",
value="等待开始下载...",
interactive=False
)
# 使用File组件来处理下载
output_file = gr.File(label="下载文件")
# 添加自定义CSS
gr.Markdown("""
<style>
#header {
text-align: center;
margin-bottom: 2rem;
}
.platform-badge {
display: inline-block;
padding: 0.5rem 1rem;
margin: 0.5rem;
border-radius: 2rem;
background-color: #2196F3;
color: white;
font-weight: bold;
}
.gradio-container {
max-width: 1200px !important;
}
.contain {
margin: 0 auto;
padding: 2rem;
}
.download-link {
display: inline-block;
padding: 0.8rem 1.5rem;
background-color: #4CAF50;
color: white;
text-decoration: none;
border-radius: 0.5rem;
margin-top: 1rem;
font-weight: bold;
transition: background-color 0.3s;
}
.download-link:hover {
background-color: #45a049;
}
</style>
""")
def update_video_info(url):
"""更新视频信息"""
# 只解析第一个链接
first_url = url.split('\n')[0].strip()
info = parse_video_info(first_url)
if info["status"] == "success":
# 准备清晰度选项
format_choices = []
for fmt in info["formats"]:
# 构建格式标签
label_parts = []
if fmt['format_note']:
label_parts.append(fmt['format_note'])
if fmt['resolution'] != 'unknown':
label_parts.append(fmt['resolution'])
label = " - ".join(filter(None, label_parts))
if not label:
label = f"格式 {fmt['format_id']}"
format_choices.append((label, fmt['format_id']))
return [
gr.update(visible=True, value=info), # video_info
gr.update(visible=True, choices=format_choices, value=format_choices[0][1] if format_choices else None), # format_choice
gr.update(interactive=True), # download_btn
gr.update(visible=True, value=info["thumbnail"]), # preview_image
f"解析成功: {info['title']} ({info['platform']})", # status
gr.update(visible=True) # video_info_accordion
]
else:
return [
gr.update(visible=False), # video_info
gr.update(visible=False), # format_choice
gr.update(interactive=False), # download_btn
gr.update(visible=False), # preview_image
info["message"], # status
gr.update(visible=False) # video_info_accordion
]
# 绑定解析按钮事件
parse_btn.click(
fn=update_video_info,
inputs=[url_input],
outputs=[video_info, format_choice, download_btn, preview_image, status, video_info_accordion]
)
# 绑定下载按钮事件
download_btn.click(
fn=download_video,
inputs=[url_input, format_choice],
outputs=[status, output_file, progress, status]
)
# 启动应用
if __name__ == "__main__":
demo.launch()