Spaces:
Sleeping
Sleeping
# import torch | |
# import torchaudio | |
# from transformers import ( | |
# WhisperProcessor, | |
# WhisperForConditionalGeneration, | |
# pipeline | |
# ) | |
# from pyannote.audio import Pipeline | |
# import librosa | |
# import numpy as np | |
# from pydub import AudioSegment | |
# import tempfile | |
# import os # ADD THIS LINE - FIX FOR THE ERROR | |
# class SpeechProcessor: | |
# def __init__(self): | |
# # Load Whisper for ASR | |
# self.whisper_processor = WhisperProcessor.from_pretrained( | |
# "openai/whisper-medium" | |
# ) | |
# self.whisper_model = WhisperForConditionalGeneration.from_pretrained( | |
# "openai/whisper-medium" | |
# ) | |
# # Load speaker diarization | |
# try: | |
# self.diarization_pipeline = Pipeline.from_pretrained( | |
# "pyannote/speaker-diarization-3.1", | |
# use_auth_token=os.environ.get("HF_TOKEN") # Now os is imported | |
# ) | |
# except Exception as e: | |
# print(f"Warning: Could not load diarization model: {e}") | |
# self.diarization_pipeline = None | |
# def process_audio(self, audio_path, language="id"): | |
# """ | |
# Process audio file untuk ASR dan speaker diarization | |
# """ | |
# # Convert to WAV if needed | |
# audio_path = self._ensure_wav_format(audio_path) | |
# # Load audio | |
# waveform, sample_rate = torchaudio.load(audio_path) | |
# # Speaker diarization | |
# if self.diarization_pipeline: | |
# try: | |
# diarization = self.diarization_pipeline(audio_path) | |
# # Process each speaker segment | |
# transcript_segments = [] | |
# for turn, _, speaker in diarization.itertracks(yield_label=True): | |
# # Extract segment audio | |
# start_sample = int(turn.start * sample_rate) | |
# end_sample = int(turn.end * sample_rate) | |
# segment_waveform = waveform[:, start_sample:end_sample] | |
# # ASR on segment | |
# text = self._transcribe_segment( | |
# segment_waveform, | |
# sample_rate, | |
# language | |
# ) | |
# transcript_segments.append({ | |
# "start": round(turn.start, 2), | |
# "end": round(turn.end, 2), | |
# "speaker": speaker, | |
# "text": text | |
# }) | |
# return self._merge_consecutive_segments(transcript_segments) | |
# except Exception as e: | |
# print(f"Diarization failed, falling back to simple transcription: {e}") | |
# # Fallback: simple transcription without diarization | |
# return self._simple_transcription(waveform, sample_rate, language) | |
# def _simple_transcription(self, waveform, sample_rate, language): | |
# """Fallback transcription without speaker diarization""" | |
# # Process in 30-second chunks | |
# chunk_length = 30 * sample_rate | |
# segments = [] | |
# for i in range(0, waveform.shape[1], chunk_length): | |
# chunk = waveform[:, i:i + chunk_length] | |
# text = self._transcribe_segment(chunk, sample_rate, language) | |
# if text.strip(): | |
# segments.append({ | |
# "start": i / sample_rate, | |
# "end": min((i + chunk_length) / sample_rate, waveform.shape[1] / sample_rate), | |
# "speaker": "SPEAKER_01", | |
# "text": text | |
# }) | |
# return segments | |
# def _transcribe_segment(self, waveform, sample_rate, language): | |
# """ | |
# Transcribe audio segment menggunakan Whisper | |
# """ | |
# # Resample if needed | |
# if sample_rate != 16000: | |
# resampler = torchaudio.transforms.Resample(sample_rate, 16000) | |
# waveform = resampler(waveform) | |
# # Prepare input | |
# input_features = self.whisper_processor( | |
# waveform.squeeze().numpy(), | |
# sampling_rate=16000, | |
# return_tensors="pt" | |
# ).input_features | |
# # Generate transcription | |
# forced_decoder_ids = self.whisper_processor.get_decoder_prompt_ids( | |
# language=language, | |
# task="transcribe" | |
# ) | |
# predicted_ids = self.whisper_model.generate( | |
# input_features, | |
# forced_decoder_ids=forced_decoder_ids, | |
# max_length=448 | |
# ) | |
# transcription = self.whisper_processor.batch_decode( | |
# predicted_ids, | |
# skip_special_tokens=True | |
# )[0] | |
# return transcription.strip() | |
# def _ensure_wav_format(self, audio_path): | |
# """ | |
# Convert audio to WAV format if needed | |
# """ | |
# if not audio_path.endswith('.wav'): | |
# audio = AudioSegment.from_file(audio_path) | |
# wav_path = tempfile.mktemp(suffix='.wav') | |
# audio.export(wav_path, format='wav') | |
# return wav_path | |
# return audio_path | |
# def _merge_consecutive_segments(self, segments): | |
# """ | |
# Merge consecutive segments from same speaker | |
# """ | |
# if not segments: | |
# return segments | |
# merged = [segments[0]] | |
# for current in segments[1:]: | |
# last = merged[-1] | |
# # Merge if same speaker and close in time | |
# if (last['speaker'] == current['speaker'] and | |
# current['start'] - last['end'] < 1.0): | |
# last['end'] = current['end'] | |
# last['text'] += ' ' + current['text'] | |
# else: | |
# merged.append(current) | |
# return merged | |
import torch | |
import torchaudio | |
from transformers import ( | |
WhisperProcessor, | |
WhisperForConditionalGeneration, | |
pipeline | |
) | |
import librosa | |
import numpy as np | |
from pydub import AudioSegment | |
import tempfile | |
import os | |
class SpeechProcessor: | |
def __init__(self): | |
# Load Whisper for ASR | |
print("Loading Whisper model...") | |
self.whisper_processor = WhisperProcessor.from_pretrained( | |
"openai/whisper-small" # Use small for HF Spaces | |
) | |
self.whisper_model = WhisperForConditionalGeneration.from_pretrained( | |
"openai/whisper-small" | |
) | |
# No diarization in this version | |
self.diarization_pipeline = None | |
print("Speech processor initialized (without speaker diarization)") | |
def process_audio(self, audio_path, language="id"): | |
""" | |
Process audio file for ASR (without speaker diarization) | |
""" | |
# Convert to WAV if needed | |
audio_path = self._ensure_wav_format(audio_path) | |
# Load audio | |
waveform, sample_rate = torchaudio.load(audio_path) | |
# Process audio in chunks | |
return self._process_audio_chunks(waveform, sample_rate, language) | |
def _process_audio_chunks(self, waveform, sample_rate, language): | |
"""Process audio in manageable chunks""" | |
chunk_length = 30 * sample_rate # 30-second chunks | |
segments = [] | |
total_chunks = (waveform.shape[1] + chunk_length - 1) // chunk_length | |
for i in range(0, waveform.shape[1], chunk_length): | |
chunk_num = i // chunk_length + 1 | |
print(f"Processing chunk {chunk_num}/{total_chunks}...") | |
chunk = waveform[:, i:i + chunk_length] | |
# Skip very short chunks | |
if chunk.shape[1] < sample_rate * 0.5: | |
continue | |
text = self._transcribe_segment(chunk, sample_rate, language) | |
if text.strip(): | |
segments.append({ | |
"start": round(i / sample_rate, 2), | |
"end": round(min((i + chunk_length) / sample_rate, | |
waveform.shape[1] / sample_rate), 2), | |
"speaker": "SPEAKER_01", | |
"text": text | |
}) | |
return segments | |
def _transcribe_segment(self, waveform, sample_rate, language): | |
""" | |
Transcribe audio segment using Whisper | |
""" | |
# Resample if needed | |
if sample_rate != 16000: | |
resampler = torchaudio.transforms.Resample(sample_rate, 16000) | |
waveform = resampler(waveform) | |
# Prepare input | |
input_features = self.whisper_processor( | |
waveform.squeeze().numpy(), | |
sampling_rate=16000, | |
return_tensors="pt" | |
).input_features | |
# Generate transcription | |
forced_decoder_ids = self.whisper_processor.get_decoder_prompt_ids( | |
language=language, | |
task="transcribe" | |
) | |
with torch.no_grad(): | |
predicted_ids = self.whisper_model.generate( | |
input_features, | |
forced_decoder_ids=forced_decoder_ids, | |
max_length=448 | |
) | |
transcription = self.whisper_processor.batch_decode( | |
predicted_ids, | |
skip_special_tokens=True | |
)[0] | |
return transcription.strip() | |
def _ensure_wav_format(self, audio_path): | |
""" | |
Convert audio to WAV format if needed | |
""" | |
if not audio_path.endswith('.wav'): | |
print("Converting audio to WAV format...") | |
audio = AudioSegment.from_file(audio_path) | |
wav_path = tempfile.mktemp(suffix='.wav') | |
audio.export(wav_path, format='wav') | |
return wav_path | |
return audio_path |