File size: 7,042 Bytes
1100e65 edaf161 249a3c0 b09f327 af532e7 7a13c00 1b493d6 170241f edaf161 170241f 28b8cb5 a18a113 249a3c0 a18a113 249a3c0 b09f327 249a3c0 17ca647 b09f327 249a3c0 752e0a6 f1f904a b0c825b 659b8b6 f1f904a ae49d5b b0c825b ae49d5b f1f904a b0c825b f1f904a 752e0a6 b0c825b f1f904a ae49d5b 752e0a6 a18a113 ae49d5b a18a113 0cfb05e af532e7 256795b af532e7 249a3c0 af532e7 249a3c0 0cfb05e 249a3c0 256795b 0cfb05e 249a3c0 b09f327 0cfb05e 256795b b09f327 edaf161 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import io
import re
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import requests
from bs4 import BeautifulSoup
import tempfile
import os
import soundfile as sf
from spellchecker import SpellChecker
from pydub import AudioSegment
import librosa
import numpy as np
from pyannote.audio import Pipeline
from pywebio import start_server, config, session
from pywebio.input import input, input_group
from pywebio.output import put_text, put_markdown, put_file
from pywebio.session import run_js
# Initialize the speaker diarization pipeline
try:
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")
print("Speaker diarization pipeline initialized successfully")
except Exception as e:
print(f"Error initializing speaker diarization pipeline: {str(e)}")
pipeline = None
# Check if CUDA is available and set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# Load the Whisper model and processor
model_name = "openai/whisper-small"
processor = WhisperProcessor.from_pretrained(model_name)
model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)
spell = SpellChecker()
def download_audio_from_url(url):
try:
if "share" in url:
print("Processing shareable link...")
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
video_tag = soup.find('video')
if video_tag and 'src' in video_tag.attrs:
video_url = video_tag['src']
print(f"Extracted video URL: {video_url}")
else:
raise ValueError("Direct video URL not found in the shareable link.")
else:
video_url = url
print(f"Downloading video from URL: {video_url}")
response = requests.get(video_url)
audio_bytes = response.content
print(f"Successfully downloaded {len(audio_bytes)} bytes of data")
return audio_bytes
except Exception as e:
print(f"Error in download_audio_from_url: {str(e)}")
raise
def correct_spelling(text):
words = text.split()
corrected_words = [spell.correction(word) or word for word in words]
return ' '.join(corrected_words)
def format_transcript(transcript):
sentences = transcript.split('.')
formatted_transcript = []
current_speaker = None
for sentence in sentences:
if ':' in sentence:
speaker, content = sentence.split(':', 1)
if speaker != current_speaker:
formatted_transcript.append(f"\n\n{speaker.strip()}:{content.strip()}.")
current_speaker = speaker
else:
formatted_transcript.append(f"{content.strip()}.")
else:
formatted_transcript.append(sentence.strip() + '.')
return ' '.join(formatted_transcript)
def transcribe_audio(audio_file):
try:
print("Loading audio file...")
audio_input, sr = librosa.load(audio_file, sr=16000)
audio_input = audio_input.astype(np.float32)
print(f"Audio duration: {len(audio_input) / sr:.2f} seconds")
chunk_length = 30 * sr
overlap = 5 * sr
transcriptions = []
print("Starting transcription...")
for i in range(0, len(audio_input), chunk_length - overlap):
chunk = audio_input[i:i+chunk_length]
input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device)
predicted_ids = model.generate(input_features)
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
transcriptions.extend(transcription)
print(f"Processed {i / sr:.2f} to {(i + chunk_length) / sr:.2f} seconds")
full_transcription = " ".join(transcriptions)
print(f"Transcription complete. Full transcription length: {len(full_transcription)} characters")
print("Applying formatting and paragraph breaks...")
formatted_transcription = format_transcript_with_breaks(full_transcription)
return formatted_transcription
except Exception as e:
print(f"Error in transcribe_audio: {str(e)}")
raise
def format_transcript_with_breaks(transcript):
# Split into sentences
sentences = re.split('(?<=[.!?]) +', transcript)
paragraphs = []
current_paragraph = []
for sentence in sentences:
current_paragraph.append(sentence)
if len(current_paragraph) >= 3: # Adjust this number to control paragraph size
paragraphs.append(' '.join(current_paragraph))
current_paragraph = []
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
return '\n\n'.join(paragraphs)
def transcribe_video(url):
try:
print(f"Attempting to download audio from URL: {url}")
audio_bytes = download_audio_from_url(url)
print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data")
# Convert audio bytes to AudioSegment
audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
print(f"Audio duration: {len(audio) / 1000} seconds")
# Save as WAV file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
audio.export(temp_audio.name, format="wav")
temp_audio_path = temp_audio.name
print("Starting audio transcription...")
transcript = transcribe_audio(temp_audio_path)
print(f"Transcription completed. Transcript length: {len(transcript)} characters")
# Clean up the temporary file
os.unlink(temp_audio_path)
# Apply spelling correction and formatting
transcript = correct_spelling(transcript)
transcript = format_transcript(transcript)
return transcript
except Exception as e:
error_message = f"An error occurred: {str(e)}"
print(error_message)
return error_message
def download_transcript(transcript):
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
temp_file.write(transcript)
temp_file_path = temp_file.name
return temp_file_path
def pdf_compressor():
put_markdown("# Video Transcription")
video_url = input(label="Video URL")
if video_url:
put_text("Transcribing video...")
transcript = transcribe_video(video_url)
if transcript:
put_text(transcript)
download_link = download_transcript(transcript)
put_file(download_link, label="Download Transcript")
else:
put_text("Failed to transcribe video.")
if __name__ == '__main__':
config(title="Video Transcription", description="Transcribe audio from a video URL using Whisper and PyAnnote")
start_server(pdf_compressor, host='0.0.0.0', port=7860, debug=True, enable_rate_limit=True, max_payload_size='200M') |