File size: 10,343 Bytes
3b13b0e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

'''
@Project: NarratoAI
@File   : clip_video
@Author : 小林同学
@Date   : 2025/5/6 下午6:14
'''

import os
import subprocess
import json
import hashlib
from loguru import logger
from typing import Dict, List, Optional
from pathlib import Path

from app.utils import ffmpeg_utils


def parse_timestamp(timestamp: str) -> tuple:
    """
    解析时间戳字符串,返回开始和结束时间

    Args:
        timestamp: 格式为'HH:MM:SS-HH:MM:SS'或'HH:MM:SS,sss-HH:MM:SS,sss'的时间戳字符串

    Returns:
        tuple: (开始时间, 结束时间) 格式为'HH:MM:SS'或'HH:MM:SS,sss'
    """
    start_time, end_time = timestamp.split('-')
    return start_time, end_time


def calculate_end_time(start_time: str, duration: float, extra_seconds: float = 1.0) -> str:
    """
    根据开始时间和持续时间计算结束时间

    Args:
        start_time: 开始时间,格式为'HH:MM:SS'或'HH:MM:SS,sss'(带毫秒)
        duration: 持续时间,单位为秒
        extra_seconds: 额外添加的秒数,默认为1秒

    Returns:
        str: 计算后的结束时间,格式与输入格式相同
    """
    # 检查是否包含毫秒
    has_milliseconds = ',' in start_time
    milliseconds = 0

    if has_milliseconds:
        time_part, ms_part = start_time.split(',')
        h, m, s = map(int, time_part.split(':'))
        milliseconds = int(ms_part)
    else:
        h, m, s = map(int, start_time.split(':'))

    # 转换为总毫秒数
    total_milliseconds = ((h * 3600 + m * 60 + s) * 1000 + milliseconds +
                          int((duration + extra_seconds) * 1000))

    # 计算新的时、分、秒、毫秒
    ms_new = total_milliseconds % 1000
    total_seconds = total_milliseconds // 1000
    h_new = int(total_seconds // 3600)
    m_new = int((total_seconds % 3600) // 60)
    s_new = int(total_seconds % 60)

    # 返回与输入格式一致的时间字符串
    if has_milliseconds:
        return f"{h_new:02d}:{m_new:02d}:{s_new:02d},{ms_new:03d}"
    else:
        return f"{h_new:02d}:{m_new:02d}:{s_new:02d}"


def check_hardware_acceleration() -> Optional[str]:
    """
    检查系统支持的硬件加速选项

    Returns:
        Optional[str]: 硬件加速参数,如果不支持则返回None
    """
    # 使用集中式硬件加速检测
    return ffmpeg_utils.get_ffmpeg_hwaccel_type()


def clip_video(
        video_origin_path: str,
        tts_result: List[Dict],
        output_dir: Optional[str] = None,
        task_id: Optional[str] = None
) -> Dict[str, str]:
    """
    根据时间戳裁剪视频

    Args:
        video_origin_path: 原始视频的路径
        tts_result: 包含时间戳和持续时间信息的列表
        output_dir: 输出目录路径,默认为None时会自动生成
        task_id: 任务ID,用于生成唯一的输出目录,默认为None时会自动生成

    Returns:
        Dict[str, str]: 时间戳到裁剪后视频路径的映射
    """
    # 检查视频文件是否存在
    if not os.path.exists(video_origin_path):
        raise FileNotFoundError(f"视频文件不存在: {video_origin_path}")

    # 如果未提供task_id,则根据输入生成一个唯一ID
    if task_id is None:
        content_for_hash = f"{video_origin_path}_{json.dumps(tts_result)}"
        task_id = hashlib.md5(content_for_hash.encode()).hexdigest()

    # 设置输出目录
    if output_dir is None:
        output_dir = os.path.join(
            os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
            "storage", "temp", "clip_video", task_id
        )

    # 确保输出目录存在
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    # 获取硬件加速支持
    hwaccel = check_hardware_acceleration()
    hwaccel_args = []
    if hwaccel:
        hwaccel_args = ffmpeg_utils.get_ffmpeg_hwaccel_args()

    # 存储裁剪结果
    result = {}

    for item in tts_result:
        _id = item.get("_id", item.get("timestamp", "unknown"))
        timestamp = item["timestamp"]
        start_time, _ = parse_timestamp(timestamp)

        # 根据持续时间计算真正的结束时间(加上1秒余量)
        duration = item["duration"]
        calculated_end_time = calculate_end_time(start_time, duration)

        # 转换为FFmpeg兼容的时间格式(逗号替换为点)
        ffmpeg_start_time = start_time.replace(',', '.')
        ffmpeg_end_time = calculated_end_time.replace(',', '.')

        # 格式化输出文件名(使用连字符替代冒号和逗号)
        safe_start_time = start_time.replace(':', '-').replace(',', '-')
        safe_end_time = calculated_end_time.replace(':', '-').replace(',', '-')
        output_filename = f"vid_{safe_start_time}@{safe_end_time}.mp4"
        output_path = os.path.join(output_dir, output_filename)

        # 构建FFmpeg命令
        ffmpeg_cmd = [
            "ffmpeg", "-y", *hwaccel_args,
            "-i", video_origin_path,
            "-ss", ffmpeg_start_time,
            "-to", ffmpeg_end_time,
            "-c:v", "h264_videotoolbox" if hwaccel == "videotoolbox" else "libx264",
            "-c:a", "aac",
            "-strict", "experimental",
            output_path
        ]

        # 执行FFmpeg命令
        try:
            logger.info(f"裁剪视频片段: {timestamp} -> {ffmpeg_start_time}{ffmpeg_end_time}")
            # logger.debug(f"执行命令: {' '.join(ffmpeg_cmd)}")

            # 在Windows系统上使用UTF-8编码处理输出,避免GBK编码错误
            is_windows = os.name == 'nt'
            if is_windows:
                process = subprocess.run(
                    ffmpeg_cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    encoding='utf-8',  # 明确指定编码为UTF-8
                    text=True,
                    check=True
                )
            else:
                process = subprocess.run(
                    ffmpeg_cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    text=True,
                    check=True
                )

            result[_id] = output_path

        except subprocess.CalledProcessError as e:
            logger.error(f"裁剪视频片段失败: {timestamp}")
            logger.error(f"错误信息: {e.stderr}")
            raise RuntimeError(f"视频裁剪失败: {e.stderr}")

    return result


if __name__ == "__main__":
    video_origin_path = "/Users/apple/Desktop/home/NarratoAI/resource/videos/qyn2-2无片头片尾.mp4"

    tts_result = [{'timestamp': '00:00:00-00:01:15',
                   'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_00_00-00_01_15.mp3',
                   'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_00_00-00_01_15.srt',
                   'duration': 25.55,
                   'text': '好的各位,欢迎回到我的频道!《庆余年 2》刚开播就给了我们一个王炸!范闲在北齐"死"了?这怎么可能!上集片尾那个巨大的悬念,这一集就立刻揭晓了!范闲假死归来,他面临的第一个,也是最大的难关,就是如何面对他最敬爱的,同时也是最可怕的那个人——庆帝!'},
                  {'timestamp': '00:01:15-00:04:40',
                   'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_01_15-00_04_40.mp3',
                   'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_01_15-00_04_40.srt',
                   'duration': 13.488,
                   'text': '但我们都知道,他绝不可能就这么轻易退场!第二集一开场,范闲就已经秘密回到了京都。他的生死传闻,可不像我们想象中那样只是小范围流传,而是…'},
                  {'timestamp': '00:04:58-00:05:45',
                   'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_04_58-00_05_45.mp3',
                   'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_04_58-00_05_45.srt',
                   'duration': 21.363,
                   'text': '"欺君之罪"!在封建王朝,这可是抄家灭族的大罪!搁一般人,肯定脚底抹油溜之大吉了。但范闲是谁啊?他偏要反其道而行之!他竟然决定,直接去见庆帝!冒着天大的风险,用"假死"这个事实去赌庆帝的态度!'},
                  {'timestamp': '00:05:45-00:06:00',
                   'audio_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/audio_00_05_45-00_06_00.mp3',
                   'subtitle_file': '/Users/apple/Desktop/home/NarratoAI/storage/tasks/qyn2-2-demo/subtitle_00_05_45-00_06_00.srt',
                   'duration': 7.675, 'text': '但想见庆帝,哪有那么容易?范闲艺高人胆大,竟然选择了最激进的方式——闯宫!'}]
    subclip_path_videos = {
        '00:00:00-00:01:15': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-00-00-00-01-15.mp4',
        '00:01:15-00:04:40': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-01-15-00-04-40.mp4',
        '00:04:41-00:04:58': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-41-00-04-58.mp4',
        '00:04:58-00:05:45': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-04-58-00-05-45.mp4',
        '00:05:45-00:06:00': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-05-45-00-06-00.mp4',
        '00:06:00-00:06:03': '/Users/apple/Desktop/home/NarratoAI/storage/temp/clip_video/6e7e343c7592c7d6f9a9636b55000f23/vid-00-06-00-00-06-03.mp4',
    }

    # 使用方法示例
    try:
        result = clip_video(video_origin_path, tts_result, subclip_path_videos)
        print("裁剪结果:")
        print(json.dumps(result, indent=4, ensure_ascii=False))
    except Exception as e:
        print(f"发生错误: {e}")