Spaces:
Sleeping
Sleeping
File size: 4,800 Bytes
19576da 0a928fe 19576da 51c343c 19576da 0a928fe 19576da 0a928fe 19576da 51c343c 19576da 51c343c 19576da 51c343c 19576da 51c343c 19576da |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import gradio as gr
import numpy as np
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
import librosa
from collections import deque
import time
import spaces
# Model settings
MODEL_ID = "openai/whisper-small"
DEVICE = "cpu" # ZeroGPU uses CPU
WINDOW_SECONDS = 1.0 # Window size for transcription
OVERLAP_SECONDS = 0.5 # Overlap between windows
RATE = 16000 # Whisper expects 16kHz audio
# Initialize Whisper model and processor
model = AutoModelForSpeechSeq2Seq.from_pretrained(
MODEL_ID, low_cpu_mem_usage=True, use_safetensors=True
).to(DEVICE)
processor = AutoProcessor.from_pretrained(MODEL_ID)
# Global state
audio_buffer = deque()
buffer_duration = 0.0
last_transcription = ""
def process_audio_chunk(audio_chunk):
"""Process a single audio chunk and update buffer."""
global audio_buffer, buffer_duration
# Convert audio chunk to numpy array if not already
if not isinstance(audio_chunk, np.ndarray):
audio_array = np.array(audio_chunk, dtype=np.float32)
else:
audio_array = audio_chunk # Already a numpy array with correct type
audio_buffer.append(audio_array)
buffer_duration += len(audio_array) / RATE
return audio_array
def transcribe_audio():
"""Process audio buffer with sliding window and yield transcriptions."""
global audio_buffer, buffer_duration, last_transcription
window_samples = int(WINDOW_SECONDS * RATE)
overlap_samples = int(OVERLAP_SECONDS * RATE)
step_samples = window_samples - overlap_samples # Step size for sliding window
while buffer_duration >= WINDOW_SECONDS:
# Concatenate buffer into a window
audio_window = np.concatenate(list(audio_buffer))
audio_window = audio_window[:window_samples] # Trim to window size
# Process audio with Whisper
try:
# Ensure audio is in the correct format for librosa
audio_window = audio_window.astype(np.float32)
audio_input, _ = librosa.load(audio_window, sr=RATE, mono=True)
inputs = processor(audio_input, sampling_rate=RATE, return_tensors="pt").to(DEVICE)
with torch.no_grad():
predicted_ids = model.generate(inputs["input_features"])
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0].strip()
except Exception as e:
print(f"Error during transcription: {e}")
continue
# Yield transcription if different from the last one
if transcription and transcription != last_transcription:
last_transcription = transcription
yield transcription
# Slide window: remove samples up to step size
samples_to_remove = step_samples
while samples_to_remove > 0 and audio_buffer:
if len(audio_buffer[0]) > samples_to_remove:
audio_buffer[0] = audio_buffer[0][samples_to_remove:]
buffer_duration -= samples_to_remove / RATE
break
else:
samples_to_remove -= len(audio_buffer[0])
buffer_duration -= len(audio_buffer[0]) / RATE
audio_buffer.popleft()
@spaces.GPU
def audio_stream(audio):
"""Handle streaming audio input from Gradio."""
# Audio is a tuple (sample_rate, data) from Gradio
sample_rate, audio_data = audio
# Ensure audio data is floating-point for librosa
audio_data = np.array(audio_data, dtype=np.float32)
# Resample audio to 16kHz if needed
if sample_rate != RATE:
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=RATE)
# Process audio chunk
process_audio_chunk(audio_data)
# Transcribe and yield results
for transcription in transcribe_audio():
yield transcription
# Initialize application state
def init_app():
"""Initialize the application state."""
global audio_buffer, buffer_duration, last_transcription
audio_buffer = deque()
buffer_duration = 0.0
last_transcription = ""
return "Transcription is active. Speak into the microphone."
# Gradio interface
with gr.Blocks() as demo:
gr.Markdown("# Real-Time Speech-to-Text with Whisper")
gr.Markdown("Record audio using the microphone and see transcriptions in real-time. Hosted on Hugging Face Spaces with ZeroGPU.")
audio_input = gr.Audio(sources=["microphone"], streaming=True, label="Speak Here")
output_text = gr.Textbox(label="Transcription", value="Transcription is active. Speak into the microphone.", interactive=False)
demo.load(init_app, outputs=output_text)
audio_input.stream(audio_stream, inputs=audio_input, outputs=output_text)
# Launch the app
demo.launch() |