File size: 5,545 Bytes
3b13b0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import yt_dlp
import os
from typing import List, Dict, Optional, Tuple
from loguru import logger
from uuid import uuid4

from app.utils import utils
from app.services import video as VideoService


class YoutubeService:
    def __init__(self):
        self.supported_formats = ['mp4', 'mkv', 'webm', 'flv', 'avi']

    def _get_video_formats(self, url: str) -> List[Dict]:
        """获取视频可用的格式列表"""
        ydl_opts = {
            'quiet': True,
            'no_warnings': True
        }

        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=False)
                formats = info.get('formats', [])

                format_list = []
                for f in formats:
                    format_info = {
                        'format_id': f.get('format_id', 'N/A'),
                        'ext': f.get('ext', 'N/A'),
                        'resolution': f.get('format_note', 'N/A'),
                        'filesize': f.get('filesize', 'N/A'),
                        'vcodec': f.get('vcodec', 'N/A'),
                        'acodec': f.get('acodec', 'N/A')
                    }
                    format_list.append(format_info)

                return format_list
        except Exception as e:
            logger.error(f"获取视频格式失败: {str(e)}")
            raise

    def _validate_format(self, output_format: str) -> None:
        """验证输出格式是否支持"""
        if output_format.lower() not in self.supported_formats:
            raise ValueError(
                f"不支持的视频格式: {output_format}。"
                f"支持的格式: {', '.join(self.supported_formats)}"
            )

    async def download_video(
            self,
            url: str,
            resolution: str,
            output_format: str = 'mp4',
            rename: Optional[str] = None
    ) -> Tuple[str, str, str]:
        """
        下载指定分辨率的视频
        
        Args:
            url: YouTube视频URL
            resolution: 目标分辨率 ('2160p', '1440p', '1080p', '720p' etc.)
                       注意:对于类似'1080p60'的输入会被处理为'1080p'
            output_format: 输出视频格式
            rename: 可选的重命名
            
        Returns:
            Tuple[str, str, str]: (task_id, output_path, filename)
        """
        try:
            task_id = str(uuid4())
            self._validate_format(output_format)

            # 标准化分辨率格式
            base_resolution = resolution.split('p')[0] + 'p'
            
            # 获取所有可用格式
            formats = self._get_video_formats(url)

            # 查找指定分辨率的最佳视频格式
            target_format = None
            for fmt in formats:
                fmt_resolution = fmt['resolution']
                # 将格式的分辨率也标准化后进行比较
                if fmt_resolution != 'N/A':
                    fmt_base_resolution = fmt_resolution.split('p')[0] + 'p'
                    if fmt_base_resolution == base_resolution and fmt['vcodec'] != 'none':
                        target_format = fmt
                        break

            if target_format is None:
                # 收集可用分辨率时也进行标准化
                available_resolutions = set(
                    fmt['resolution'].split('p')[0] + 'p'
                    for fmt in formats
                    if fmt['resolution'] != 'N/A' and fmt['vcodec'] != 'none'
                )
                raise ValueError(
                    f"未找到 {base_resolution} 分辨率的视频。"
                    f"可用分辨率: {', '.join(sorted(available_resolutions))}"
                )

            # 创建输出目录
            output_dir = utils.video_dir()
            os.makedirs(output_dir, exist_ok=True)

            # 设置下载选项
            if rename:
                # 如果指定了重命名,直接使用新名字
                filename = f"{rename}.{output_format}"
                output_template = os.path.join(output_dir, filename)
            else:
                # 否则使用任务ID和原标题
                output_template = os.path.join(output_dir, f'{task_id}_%(title)s.%(ext)s')

            ydl_opts = {
                'format': f"{target_format['format_id']}+bestaudio[ext=m4a]/best",
                'outtmpl': output_template,
                'merge_output_format': output_format.lower(),
                'postprocessors': [{
                    'key': 'FFmpegVideoConvertor',
                    'preferedformat': output_format.lower(),
                }]
            }

            # 执行下载
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                info = ydl.extract_info(url, download=True)
                if rename:
                    # 如果指定了重命名,使用新文件名
                    output_path = output_template
                    filename = os.path.basename(output_path)
                else:
                    # 否则使用原始标题
                    video_title = info.get('title', task_id)
                    filename = f"{task_id}_{video_title}.{output_format}"
                    output_path = os.path.join(output_dir, filename)

            logger.info(f"视频下载成功: {output_path}")
            return task_id, output_path, filename

        except Exception as e:
            logger.exception("下载视频失败")
            raise