|
import io |
|
import re |
|
import torch |
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import tempfile |
|
import os |
|
import soundfile as sf |
|
from spellchecker import SpellChecker |
|
from pydub import AudioSegment |
|
import librosa |
|
import numpy as np |
|
from pyannote.audio import Pipeline |
|
from pywebio import start_server, config, session |
|
from pywebio.input import input, input_group |
|
from pywebio.output import put_text, put_markdown, put_file |
|
from pywebio.session import run_js |
|
|
|
|
|
try: |
|
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") |
|
print("Speaker diarization pipeline initialized successfully") |
|
except Exception as e: |
|
print(f"Error initializing speaker diarization pipeline: {str(e)}") |
|
pipeline = None |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
print(f"Using device: {device}") |
|
|
|
|
|
model_name = "openai/whisper-small" |
|
processor = WhisperProcessor.from_pretrained(model_name) |
|
model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device) |
|
|
|
spell = SpellChecker() |
|
|
|
def download_audio_from_url(url): |
|
try: |
|
if "share" in url: |
|
print("Processing shareable link...") |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
video_tag = soup.find('video') |
|
if video_tag and 'src' in video_tag.attrs: |
|
video_url = video_tag['src'] |
|
print(f"Extracted video URL: {video_url}") |
|
else: |
|
raise ValueError("Direct video URL not found in the shareable link.") |
|
else: |
|
video_url = url |
|
|
|
print(f"Downloading video from URL: {video_url}") |
|
response = requests.get(video_url) |
|
audio_bytes = response.content |
|
print(f"Successfully downloaded {len(audio_bytes)} bytes of data") |
|
return audio_bytes |
|
except Exception as e: |
|
print(f"Error in download_audio_from_url: {str(e)}") |
|
raise |
|
|
|
def correct_spelling(text): |
|
words = text.split() |
|
corrected_words = [spell.correction(word) or word for word in words] |
|
return ' '.join(corrected_words) |
|
|
|
def format_transcript(transcript): |
|
sentences = transcript.split('.') |
|
formatted_transcript = [] |
|
current_speaker = None |
|
for sentence in sentences: |
|
if ':' in sentence: |
|
speaker, content = sentence.split(':', 1) |
|
if speaker != current_speaker: |
|
formatted_transcript.append(f"\n\n{speaker.strip()}:{content.strip()}.") |
|
current_speaker = speaker |
|
else: |
|
formatted_transcript.append(f"{content.strip()}.") |
|
else: |
|
formatted_transcript.append(sentence.strip() + '.') |
|
return ' '.join(formatted_transcript) |
|
|
|
def transcribe_audio(audio_file): |
|
try: |
|
print("Loading audio file...") |
|
audio_input, sr = librosa.load(audio_file, sr=16000) |
|
audio_input = audio_input.astype(np.float32) |
|
print(f"Audio duration: {len(audio_input) / sr:.2f} seconds") |
|
|
|
chunk_length = 30 * sr |
|
overlap = 5 * sr |
|
transcriptions = [] |
|
|
|
print("Starting transcription...") |
|
for i in range(0, len(audio_input), chunk_length - overlap): |
|
chunk = audio_input[i:i+chunk_length] |
|
input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device) |
|
predicted_ids = model.generate(input_features) |
|
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) |
|
transcriptions.extend(transcription) |
|
print(f"Processed {i / sr:.2f} to {(i + chunk_length) / sr:.2f} seconds") |
|
|
|
full_transcription = " ".join(transcriptions) |
|
print(f"Transcription complete. Full transcription length: {len(full_transcription)} characters") |
|
|
|
print("Applying formatting and paragraph breaks...") |
|
formatted_transcription = format_transcript_with_breaks(full_transcription) |
|
|
|
return formatted_transcription |
|
except Exception as e: |
|
print(f"Error in transcribe_audio: {str(e)}") |
|
raise |
|
|
|
def format_transcript_with_breaks(transcript): |
|
|
|
sentences = re.split('(?<=[.!?]) +', transcript) |
|
paragraphs = [] |
|
current_paragraph = [] |
|
|
|
for sentence in sentences: |
|
current_paragraph.append(sentence) |
|
if len(current_paragraph) >= 3: |
|
paragraphs.append(' '.join(current_paragraph)) |
|
current_paragraph = [] |
|
|
|
if current_paragraph: |
|
paragraphs.append(' '.join(current_paragraph)) |
|
|
|
return '\n\n'.join(paragraphs) |
|
|
|
def transcribe_video(url): |
|
try: |
|
print(f"Attempting to download audio from URL: {url}") |
|
audio_bytes = download_audio_from_url(url) |
|
print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data") |
|
|
|
|
|
audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) |
|
|
|
print(f"Audio duration: {len(audio) / 1000} seconds") |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: |
|
audio.export(temp_audio.name, format="wav") |
|
temp_audio_path = temp_audio.name |
|
|
|
print("Starting audio transcription...") |
|
transcript = transcribe_audio(temp_audio_path) |
|
print(f"Transcription completed. Transcript length: {len(transcript)} characters") |
|
|
|
|
|
os.unlink(temp_audio_path) |
|
|
|
|
|
transcript = correct_spelling(transcript) |
|
transcript = format_transcript(transcript) |
|
|
|
return transcript |
|
except Exception as e: |
|
error_message = f"An error occurred: {str(e)}" |
|
print(error_message) |
|
return error_message |
|
|
|
def download_transcript(transcript): |
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: |
|
temp_file.write(transcript) |
|
temp_file_path = temp_file.name |
|
return temp_file_path |
|
|
|
def pdf_compressor(): |
|
put_markdown("# Video Transcription") |
|
video_url = input(label="Video URL") |
|
if video_url: |
|
put_text("Transcribing video...") |
|
transcript = transcribe_video(video_url) |
|
if transcript: |
|
put_text(transcript) |
|
download_link = download_transcript(transcript) |
|
put_file(download_link, label="Download Transcript") |
|
else: |
|
put_text("Failed to transcribe video.") |
|
|
|
if __name__ == '__main__': |
|
config(title="Video Transcription", description="Transcribe audio from a video URL using Whisper and PyAnnote") |
|
start_server(pdf_compressor, host='0.0.0.0', port=7860, debug=True, enable_rate_limit=True, max_payload_size='200M') |