Spaces:
Running
on
Zero
Running
on
Zero
File size: 10,711 Bytes
8935a60 93526af d36ea49 93526af 8935a60 93526af d36ea49 261c97f 8935a60 6c226f9 8935a60 261c97f 9d6fa91 a33f970 cf205ff 8935a60 93526af 38c2998 8935a60 93526af 8935a60 93526af 8935a60 93526af caa6c38 93526af cf205ff 93526af 8935a60 93526af 288d41b 93526af cf205ff e8ca4d1 93526af 8935a60 d36ea49 93526af 8935a60 d36ea49 e8ca4d1 d36ea49 8935a60 93526af 8935a60 e8ca4d1 8935a60 d36ea49 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 1a6e7e3 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af cf205ff 8935a60 288d41b 8935a60 93526af 8935a60 93526af 38c2998 8935a60 93526af e8ca4d1 93526af e8ca4d1 288d41b 2186009 93526af 288d41b 8935a60 93526af 288d41b 8935a60 93526af 8935a60 93526af 8935a60 93526af e8ca4d1 5085deb 93526af 8935a60 93526af 8935a60 93526af 8935a60 288d41b 93526af 8935a60 93526af 8935a60 93526af 8935a60 93526af 8935a60 2186009 93526af e8ca4d1 93526af 8935a60 1a6e7e3 93526af 8935a60 93526af 1a6e7e3 93526af 8935a60 93526af 8935a60 93526af e8ca4d1 6c226f9 93526af |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
import os, io, math, tempfile
from typing import List, Tuple
import numpy as np
import gradio as gr
import librosa
import torch
try:
from scipy.ndimage import median_filter
_HAS_SCIPY = True
except Exception:
_HAS_SCIPY = False
from transformers import pipeline
import spaces # 关键:用于 ZeroGPU
# ================== 默认参数 ==================
MODEL_NAME = "openai/whisper-large-v3"
BATCH_SIZE = 8
FILE_LIMIT_MB = 1000
DEF_SILENCE_MIN_LEN = 0.45
DEF_DB_DROP = 25.0
DEF_PCTL_FLOOR = 20.0
DEF_MIN_SEG_DUR = 1.00
DEF_FRAME_LEN_MS = 25
DEF_HOP_LEN_MS = 10
DEF_CUT_OFFSET_SEC = 0.00
DEF_CHUNK_LEN_S = 20
DEF_STRIDE_LEN_S = 2
SR_TARGET = 16000
# ================== 全局懒加载 ==================
_ASR = None
_ASR_DEVICE = None
_ASR_DTYPE = None
def _pick_device_dtype():
if torch.cuda.is_available():
return 0, torch.float16
elif torch.backends.mps.is_available():
return "mps", torch.float16
else:
return -1, torch.float32
def _get_asr():
"""
在 ZeroGPU 下必须在 @spaces.GPU 修饰的函数内首次调用,才能拿到 cuda。
CPU/常规 GPU 也兼容。
"""
global _ASR, _ASR_DEVICE, _ASR_DTYPE
dev, dt = _pick_device_dtype()
if _ASR is None or _ASR_DEVICE != dev:
_ASR = pipeline(
task="automatic-speech-recognition",
model=MODEL_NAME,
device=dev,
torch_dtype=dt,
return_timestamps="word",
chunk_length_s=DEF_CHUNK_LEN_S,
stride_length_s=DEF_STRIDE_LEN_S,
ignore_warning=True,
)
_ASR_DEVICE, _ASR_DTYPE = dev, dt
print(f"[ASR] Initialized on device={dev} dtype={dt}")
return _ASR
# ================== 音频 & 工具 ==================
def _load_audio(path: str, sr: int = SR_TARGET):
y, sr = librosa.load(path, sr=sr, mono=True)
return y, sr
def _to_db(rms: np.ndarray):
ref = np.maximum(np.max(rms), 1e-10)
return 20.0 * np.log10(np.maximum(rms, 1e-10) / ref)
def _fmt_ts(sec: float) -> str:
if sec < 0: sec = 0.0
h = int(sec // 3600)
m = int((sec % 3600) // 60)
s = int(sec % 60)
ms = int(round((sec - math.floor(sec)) * 1000.0))
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def _extract_word_stream(chunks) -> List[Tuple[str, float, float]]:
out = []
if not chunks:
return out
for ch in chunks:
txt = (ch.get("text") or "").strip()
ts = ch.get("timestamp", ch.get("timestamps", None))
if ts is None:
s = ch.get("start", ch.get("time_start", None))
e = ch.get("end", ch.get("time_end", None))
if s is not None and e is not None and txt:
s = float(s); e = float(e)
if e < s: e = s
out.append((txt, s, e))
continue
if isinstance(ts, (list, tuple)) and len(ts) == 2 and txt:
s = float(ts[0] or 0.0); e = float(ts[1] or 0.0)
if e < s: e = s
out.append((txt, s, e))
return out
def _detect_silence_cuts(
y: np.ndarray,
sr: int,
silence_min_len: float,
db_drop: float,
pctl_floor: float,
frame_len_ms: int,
hop_len_ms: int,
):
frame_len = max(256, int(sr * frame_len_ms / 1000))
hop_len = max( 64, int(sr * hop_len_ms / 1000))
rms = librosa.feature.rms(y=y, frame_length=frame_len, hop_length=hop_len, center=True)[0]
rms_db = _to_db(rms)
if _HAS_SCIPY:
rms_db = median_filter(rms_db, size=5)
max_db = float(np.max(rms_db))
floor_db = float(np.percentile(rms_db, pctl_floor))
thr = max(max_db - db_drop, floor_db)
low = rms_db <= thr
cut_times = []
n = len(low)
i = 0
min_frames = max(1, int(silence_min_len * sr / hop_len))
while i < n:
if not low[i]:
i += 1; continue
j = i + 1
while j < n and low[j]:
j += 1
if (j - i) >= min_frames:
local = rms_db[i:j]
k = int(np.argmin(local))
best = i + k
cut_times.append(best * hop_len / sr)
i = j
total = float(len(y) / sr)
cut_times = sorted(set(t for t in cut_times if 0.05 <= t <= total - 0.05))
return cut_times, total
def _snap_to_word_bounds(cuts: List[float], words: List[Tuple[str, float, float]], max_dist=0.25):
if not cuts or not words: return cuts
bounds = sorted({b for _, s, e in words for b in (s, e)})
snapped = []
for t in cuts:
idx = min(range(len(bounds)), key=lambda i: abs(bounds[i]-t))
snapped.append(bounds[idx] if abs(bounds[idx]-t) <= max_dist else t)
snapped = sorted(set(snapped))
out = []
for t in snapped:
if not out or (t - out[-1]) >= 0.12:
out.append(t)
return out
def _segment(words: List[Tuple[str,float,float]], cuts: List[float], total: float, min_seg: float):
if not words:
return [(0.0, total, "")]
bnds = [0.0] + [t for t in cuts if 0.0 < t < total] + [total]
segs = []
wi, W = 0, len(words)
for i in range(len(bnds)-1):
L, R = bnds[i], bnds[i+1]
texts, starts, ends = [], [], []
while wi < W and words[wi][2] <= L:
wi += 1
wj = wi
while wj < W and words[wj][1] < R:
txt, s, e = words[wj]
if e > L and s < R:
texts.append(txt); starts.append(s); ends.append(e)
wj += 1
if texts:
st, en = max(min(starts), L), min(max(ends), R)
segs.append([float(st), float(en), " ".join(texts).strip()])
elif (R - L) >= max(0.25, min_seg * 0.5):
segs.append([L, R, ""])
def has_punc(t): return any(p in t for p in ",。!?,.!?;;::")
i = 0
while i < len(segs):
st, en, tx = segs[i]
if (en - st) < min_seg and len(segs) > 1:
cand = []
if i + 1 < len(segs): cand.append(i + 1)
if i - 1 >= 0: cand.append(i - 1)
cand.sort(key=lambda j: (not has_punc(segs[j][2]), abs(j - i)))
t = cand[0]
nst, nen = min(segs[t][0], st), max(segs[t][1], en)
ntx = (" ".join([segs[t][2], tx]) if t < i else " ".join([tx, segs[t][2]])).strip()
keep, drop = (t, i) if t < i else (i, t)
segs[keep] = [nst, nen, ntx]
del segs[drop]
i = max(0, keep - 1); continue
i += 1
return [(st, en, tx.strip()) for st, en, tx in segs if (en - st) >= 0.12]
def _build_srt(segs: List[Tuple[float,float,str]]) -> str:
lines = []
for idx, (st, en, tx) in enumerate(segs, start=1):
lines.append(str(idx))
lines.append(f"{_fmt_ts(st)} --> {_fmt_ts(en)}")
lines.append(tx)
lines.append("")
return "\n".join(lines).strip() + "\n"
# ================== 推理核心(放在 GPU 上执行) ==================
@spaces.GPU # 关键:ZeroGPU 运行入口(按钮点击会调用它)
def transcribe_and_split(
audio_path: str,
silence_min_len: float = DEF_SILENCE_MIN_LEN,
db_drop: float = DEF_DB_DROP,
pctl_floor: float = DEF_PCTL_FLOOR,
min_seg_dur: float = DEF_MIN_SEG_DUR,
frame_len_ms: int = DEF_FRAME_LEN_MS,
hop_len_ms: int = DEF_HOP_LEN_MS,
cut_offset_sec: float = DEF_CUT_OFFSET_SEC,
):
if not audio_path:
raise gr.Error("请先上传或录制音频。")
try:
if os.path.getsize(audio_path) / (1024*1024) > FILE_LIMIT_MB:
raise gr.Error(f"文件过大,超过 {FILE_LIMIT_MB} MB。")
except Exception:
pass
asr = _get_asr() # 在 GPU 上首次创建
result = asr(
audio_path,
return_timestamps="word",
chunk_length_s=DEF_CHUNK_LEN_S,
stride_length_s=DEF_STRIDE_LEN_S,
batch_size=BATCH_SIZE,
)
text = (result.get("text") or "").strip()
words = _extract_word_stream(result.get("chunks") or [])
y, sr = _load_audio(audio_path, sr=SR_TARGET)
cuts, total = _detect_silence_cuts(
y, sr,
silence_min_len=silence_min_len,
db_drop=db_drop,
pctl_floor=pctl_floor,
frame_len_ms=frame_len_ms,
hop_len_ms=hop_len_ms,
)
if abs(cut_offset_sec) > 1e-6:
cuts = [max(0.0, min(total, t + cut_offset_sec)) for t in cuts]
cuts = _snap_to_word_bounds(cuts, words, max_dist=0.25)
segs = _segment(words, cuts, total, min_seg_dur)
if not segs:
segs = [(0.0, total, text)]
srt = _build_srt(segs)
tmpf = tempfile.NamedTemporaryFile(delete=False, suffix=".srt")
tmpf.write(srt.encode("utf-8")); tmpf.flush(); tmpf.close()
return srt, tmpf.name
# 让启动检查看到 GPU 入口(可选,不调用也行)
@spaces.GPU
def gpu_warmup():
return "ok"
# ================== UI ==================
with gr.Blocks(title="Whisper Large V3 · 智能切分 SRT", theme=gr.themes.Soft()) as demo:
gr.Markdown("### 🎧 Whisper Large V3 · 更稳的 SRT 切分\n"
"- 词级时间戳 + 能量最低点切分 + 词边界吸附\n"
"- 片段过短自动合并,SRT 含序号行\n")
audio = gr.Audio(sources=["upload", "microphone"], type="filepath", label="音频(上传或录制)")
with gr.Accordion("高级参数", open=False):
with gr.Row():
silence_min_len = gr.Slider(0.1, 1.0, value=DEF_SILENCE_MIN_LEN, step=0.05, label="静音最短时长 (s)")
db_drop = gr.Slider(10, 40, value=DEF_DB_DROP, step=1.0, label="相对峰值下落 (dB)")
pctl_floor = gr.Slider(0, 50, value=DEF_PCTL_FLOOR, step=1.0, label="能量分位下限 (dB)")
with gr.Row():
min_seg_dur = gr.Slider(0.3, 3.0, value=DEF_MIN_SEG_DUR, step=0.05, label="最短片段时长 (s)")
frame_len_ms = gr.Slider(10, 50, value=DEF_FRAME_LEN_MS, step=1, label="帧长 (ms)")
hop_len_ms = gr.Slider(5, 25, value=DEF_HOP_LEN_MS, step=1, label="帧移 (ms)")
cut_offset_sec = gr.Slider(-0.20, 0.20, value=DEF_CUT_OFFSET_SEC, step=0.01, label="切分整体偏移 (s)")
btn = gr.Button("开始识别并生成 SRT", variant="primary")
srt_preview = gr.Textbox(lines=16, label="SRT 预览", show_copy_button=True)
srt_file = gr.File(label="下载 SRT 文件", file_count="single")
btn.click(
fn=transcribe_and_split, # 注意:绑定的是 @spaces.GPU 函数
inputs=[audio, silence_min_len, db_drop, pctl_floor, min_seg_dur, frame_len_ms, hop_len_ms, cut_offset_sec],
outputs=[srt_preview, srt_file],
)
if __name__ == "__main__":
demo.launch() |