whisperx-test / app.py
sdafd's picture
Update app.py
ed7cca2 verified
raw
history blame
15 kB
import gradio as gr
import whisperx
import torch
import librosa
import logging
import os
import time
import numpy as np
import requests
import random
import string
import json
import pathlib
import tempfile
# -------------------------------
# Vocal Extraction Function
# -------------------------------
def get_vocals(input_file):
try:
session_hash = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(11))
file_id = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(11))
file_len = 0
file_content = pathlib.Path(input_file).read_bytes()
file_len = len(file_content)
r = requests.post(
f'https://politrees-audio-separator-uvr.hf.space/gradio_api/upload?upload_id={file_id}',
files={'files': open(input_file, 'rb')}
)
json_data = r.json()
headers = {
'accept': '*/*',
'accept-language': 'en-US,en;q=0.5',
'content-type': 'application/json',
'origin': 'https://politrees-audio-separator-uvr.hf.space',
'priority': 'u=1, i',
'referer': 'https://politrees-audio-separator-uvr.hf.space/?__theme=system',
'sec-ch-ua': '"Not(A:Brand";v="99", "Brave";v="133", "Chromium";v="133"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'sec-fetch-storage-access': 'none',
'sec-gpc': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
}
params = {
'__theme': 'system',
}
json_payload = {
'data': [
{
'path': json_data[0],
'url': 'https://politrees-audio-separator-uvr.hf.space/gradio_api/file=' + json_data[0],
'orig_name': pathlib.Path(input_file).name,
'size': file_len,
'mime_type': 'audio/wav',
'meta': {
'_type': 'gradio.FileData',
},
},
'MelBand Roformer | Vocals by Kimberley Jensen',
256,
False,
5,
0,
'/tmp/audio-separator-models/',
'output',
'wav',
0.9,
0,
1,
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
'NAME_(STEM)_MODEL',
],
'event_data': None,
'fn_index': 5,
'trigger_id': 28,
'session_hash': session_hash,
}
response = requests.post(
'https://politrees-audio-separator-uvr.hf.space/gradio_api/queue/join',
params=params,
headers=headers,
json=json_payload,
)
max_retries = 5
retry_delay = 5
retry_count = 0
while retry_count < max_retries:
try:
print(f"Connecting to stream... Attempt {retry_count + 1}")
r = requests.get(
f'https://politrees-audio-separator-uvr.hf.space/gradio_api/queue/data?session_hash={session_hash}',
stream=True
)
if r.status_code != 200:
raise Exception(f"Failed to connect: HTTP {r.status_code}")
print("Connected successfully.")
for line in r.iter_lines():
if line:
json_resp = json.loads(line.decode('utf-8').replace('data: ', ''))
print(json_resp)
if 'process_completed' in json_resp['msg']:
print("Process completed.")
output_url = json_resp['output']['data'][1]['url']
print(f"Output URL: {output_url}")
return output_url
print("Stream ended prematurely. Reconnecting...")
except Exception as e:
print(f"Error occurred: {e}. Retrying...")
retry_count += 1
time.sleep(retry_delay)
print("Max retries reached. Exiting.")
return None
except Exception as ex:
print(f"Unexpected error in get_vocals: {ex}")
return None
# -------------------------------
# Advanced Normalization Function
# -------------------------------
def advanced_normalize_audio(audio, threshold_ratio=0.6, window_size=1024):
"""
This advanced normalization function computes a moving-average envelope of the absolute
audio signal using a specified window size. It then zeroes out portions of the signal
where the envelope falls below a threshold (defined as a ratio of the maximum envelope value).
Parameters:
audio (np.ndarray): Input audio signal.
threshold_ratio (float): Ratio (0-1) to determine the minimum envelope value to keep.
window_size (int): Size of the moving window used to compute the envelope.
Returns:
np.ndarray: The normalized audio signal.
"""
# Compute moving-average envelope
envelope = np.convolve(np.abs(audio), np.ones(window_size) / window_size, mode='same')
max_env = np.max(envelope)
threshold = threshold_ratio * max_env
# Create a mask: keep samples where the envelope meets or exceeds the threshold.
mask = envelope >= threshold
# Optionally, you might smooth the mask further to avoid abrupt cuts.
normalized_audio = audio * mask.astype(audio.dtype)
return normalized_audio
# -------------------------------
# Logging and Model Setup
# -------------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("whisperx_app")
device = "cpu"
compute_type = "int8"
torch.set_num_threads(os.cpu_count())
models = {
"tiny": whisperx.load_model("tiny", device, compute_type=compute_type, vad_method='silero'),
"base": whisperx.load_model("base", device, compute_type=compute_type, vad_method='silero'),
"small": whisperx.load_model("small", device, compute_type=compute_type, vad_method='silero'),
"large": whisperx.load_model("large", device, compute_type=compute_type, vad_method='silero'),
"large-v2": whisperx.load_model("large-v2", device, compute_type=compute_type, vad_method='silero'),
"large-v3": whisperx.load_model("large-v3", device, compute_type=compute_type, vad_method='silero'),
}
def split_audio_by_pause(audio, sr, pause_threshold, top_db=30):
"""
Splits the audio into segments using librosa's non-silent detection.
Adjacent non-silent intervals are merged if the gap between them is less than the pause_threshold.
Returns a list of (start_sample, end_sample) tuples.
"""
intervals = librosa.effects.split(audio, top_db=top_db)
if intervals.size == 0:
return [(0, len(audio))]
merged_intervals = []
current_start, current_end = intervals[0]
for start, end in intervals[1:]:
gap_duration = (start - current_end) / sr
if gap_duration < pause_threshold:
current_end = end
else:
merged_intervals.append((current_start, current_end))
current_start, current_end = start, end
merged_intervals.append((current_start, current_end))
return merged_intervals
# -------------------------------
# Main Transcription Function
# -------------------------------
def transcribe(audio_file, model_size="base", debug=False, pause_threshold=0.0, vocal_extraction=False, language="en"):
start_time = time.time()
final_result = ""
debug_log = []
try:
# If vocal extraction is enabled, process the file first
if vocal_extraction:
debug_log.append("Vocal extraction enabled; processing input file for vocals...")
extracted_url = get_vocals(audio_file)
if extracted_url is not None:
debug_log.append("Vocal extraction succeeded; downloading extracted audio...")
response = requests.get(extracted_url)
if response.status_code == 200:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp:
tmp.write(response.content)
audio_file = tmp.name
debug_log.append("Extracted audio downloaded and saved for transcription.")
else:
debug_log.append("Failed to download extracted audio; proceeding with original file.")
else:
debug_log.append("Vocal extraction failed; proceeding with original audio.")
# Load audio file at 16kHz
audio, sr = librosa.load(audio_file, sr=16000)
debug_log.append(f"Audio loaded: {len(audio)/sr:.2f} seconds long at {sr} Hz")
# If vocal extraction was used, apply advanced normalization
if vocal_extraction:
audio = advanced_normalize_audio(audio)
debug_log.append("Advanced normalization applied to extracted audio to remove low-amplitude segments.")
# Select the model and set batch size
model = models[model_size]
batch_size = 8 if model_size == "tiny" else 4
# Use provided language if set; otherwise, use language detection.
if language:
transcript = model.transcribe(audio, batch_size=batch_size, language=language)
else:
transcript = model.transcribe(audio, batch_size=batch_size)
language = transcript.get("language", "unknown")
# Load alignment model using the specified language
model_a, metadata = whisperx.load_align_model(language_code=language, device=device)
# If pause_threshold > 0, split audio and process segments individually
if pause_threshold > 0:
segments = split_audio_by_pause(audio, sr, pause_threshold)
debug_log.append(f"Audio split into {len(segments)} segment(s) using a pause threshold of {pause_threshold}s")
for seg_idx, (seg_start, seg_end) in enumerate(segments):
audio_segment = audio[seg_start:seg_end]
seg_duration = (seg_end - seg_start) / sr
debug_log.append(f"Segment {seg_idx+1}: start={seg_start/sr:.2f}s, duration={seg_duration:.2f}s")
seg_transcript = model.transcribe(audio_segment, batch_size=batch_size, language=language)
seg_aligned = whisperx.align(
seg_transcript["segments"], model_a, metadata, audio_segment, device
)
for segment in seg_aligned["segments"]:
for word in segment["words"]:
adjusted_start = word['start'] + seg_start/sr
adjusted_end = word['end'] + seg_start/sr
final_result += f"[{adjusted_start:5.2f}s-{adjusted_end:5.2f}s] {word['word']}\n"
else:
# Process the entire audio without splitting
transcript = model.transcribe(audio, batch_size=batch_size, language=language)
aligned = whisperx.align(
transcript["segments"], model_a, metadata, audio, device
)
for segment in aligned["segments"]:
for word in segment["words"]:
final_result += f"[{word['start']:5.2f}s-{word['end']:5.2f}s] {word['word']}\n"
debug_log.append(f"Language used: {language}")
debug_log.append(f"Batch size: {batch_size}")
debug_log.append(f"Processed in {time.time()-start_time:.2f}s")
except Exception as e:
logger.error("Error during transcription:", exc_info=True)
final_result = "Error occurred during transcription"
debug_log.append(f"ERROR: {str(e)}")
if debug:
return final_result, "\n".join(debug_log)
return final_result
# -------------------------------
# Gradio Interface
# -------------------------------
with gr.Blocks(title="WhisperX CPU Transcription") as demo:
gr.Markdown("# WhisperX CPU Transcription with Vocal Extraction Option")
with gr.Row():
with gr.Column():
audio_input = gr.Audio(
label="Upload Audio File",
type="filepath",
sources=["upload", "microphone"],
interactive=True,
)
model_selector = gr.Dropdown(
choices=list(models.keys()),
value="base",
label="Model Size",
interactive=True,
)
pause_threshold_slider = gr.Slider(
minimum=0, maximum=5, step=0.1, value=0,
label="Pause Threshold (seconds)",
interactive=True,
info="Set a pause duration threshold. Audio pauses longer than this will be used to split the audio into segments."
)
vocal_extraction_checkbox = gr.Checkbox(
label="Extract Vocals (improves accuracy on noisy audio)",
value=False
)
language_input = gr.Textbox(
label="Language Code (e.g., en, es, fr)",
placeholder="Enter language code",
value="en"
)
debug_checkbox = gr.Checkbox(label="Enable Debug Mode", value=False)
transcribe_btn = gr.Button("Transcribe", variant="primary")
with gr.Column():
output_text = gr.Textbox(
label="Transcription Output",
lines=20,
placeholder="Transcription will appear here..."
)
debug_output = gr.Textbox(
label="Debug Information",
lines=10,
placeholder="Debug logs will appear here...",
visible=False,
)
def toggle_debug(debug_enabled):
return gr.update(visible=debug_enabled)
debug_checkbox.change(
toggle_debug,
inputs=[debug_checkbox],
outputs=[debug_output]
)
transcribe_btn.click(
transcribe,
inputs=[audio_input, model_selector, debug_checkbox, pause_threshold_slider, vocal_extraction_checkbox, language_input],
outputs=[output_text, debug_output]
)
# -------------------------------
# Launch the App
# -------------------------------
if __name__ == "__main__":
demo.queue(max_size=4).launch()