import io import re import torch from transformers import WhisperProcessor, WhisperForConditionalGeneration import requests from bs4 import BeautifulSoup import tempfile import os import soundfile as sf from spellchecker import SpellChecker from pydub import AudioSegment import librosa import numpy as np from pyannote.audio import Pipeline from pywebio import start_server, config, session from pywebio.input import input, input_group from pywebio.output import put_text, put_markdown, put_file from pywebio.session import run_js # Initialize the speaker diarization pipeline try: pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") print("Speaker diarization pipeline initialized successfully") except Exception as e: print(f"Error initializing speaker diarization pipeline: {str(e)}") pipeline = None # Check if CUDA is available and set the device device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {device}") # Load the Whisper model and processor model_name = "openai/whisper-small" processor = WhisperProcessor.from_pretrained(model_name) model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device) spell = SpellChecker() def download_audio_from_url(url): try: if "share" in url: print("Processing shareable link...") response = requests.get(url) soup = BeautifulSoup(response.content, 'html.parser') video_tag = soup.find('video') if video_tag and 'src' in video_tag.attrs: video_url = video_tag['src'] print(f"Extracted video URL: {video_url}") else: raise ValueError("Direct video URL not found in the shareable link.") else: video_url = url print(f"Downloading video from URL: {video_url}") response = requests.get(video_url) audio_bytes = response.content print(f"Successfully downloaded {len(audio_bytes)} bytes of data") return audio_bytes except Exception as e: print(f"Error in download_audio_from_url: {str(e)}") raise def correct_spelling(text): words = text.split() corrected_words = [spell.correction(word) or word for word in words] return ' '.join(corrected_words) def format_transcript(transcript): sentences = transcript.split('.') formatted_transcript = [] current_speaker = None for sentence in sentences: if ':' in sentence: speaker, content = sentence.split(':', 1) if speaker != current_speaker: formatted_transcript.append(f"\n\n{speaker.strip()}:{content.strip()}.") current_speaker = speaker else: formatted_transcript.append(f"{content.strip()}.") else: formatted_transcript.append(sentence.strip() + '.') return ' '.join(formatted_transcript) def transcribe_audio(audio_file): try: print("Loading audio file...") audio_input, sr = librosa.load(audio_file, sr=16000) audio_input = audio_input.astype(np.float32) print(f"Audio duration: {len(audio_input) / sr:.2f} seconds") chunk_length = 30 * sr overlap = 5 * sr transcriptions = [] print("Starting transcription...") for i in range(0, len(audio_input), chunk_length - overlap): chunk = audio_input[i:i+chunk_length] input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device) predicted_ids = model.generate(input_features) transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) transcriptions.extend(transcription) print(f"Processed {i / sr:.2f} to {(i + chunk_length) / sr:.2f} seconds") full_transcription = " ".join(transcriptions) print(f"Transcription complete. Full transcription length: {len(full_transcription)} characters") print("Applying formatting and paragraph breaks...") formatted_transcription = format_transcript_with_breaks(full_transcription) return formatted_transcription except Exception as e: print(f"Error in transcribe_audio: {str(e)}") raise def format_transcript_with_breaks(transcript): # Split into sentences sentences = re.split('(?<=[.!?]) +', transcript) paragraphs = [] current_paragraph = [] for sentence in sentences: current_paragraph.append(sentence) if len(current_paragraph) >= 3: # Adjust this number to control paragraph size paragraphs.append(' '.join(current_paragraph)) current_paragraph = [] if current_paragraph: paragraphs.append(' '.join(current_paragraph)) return '\n\n'.join(paragraphs) def transcribe_video(url): try: print(f"Attempting to download audio from URL: {url}") audio_bytes = download_audio_from_url(url) print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data") # Convert audio bytes to AudioSegment audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) print(f"Audio duration: {len(audio) / 1000} seconds") # Save as WAV file with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio: audio.export(temp_audio.name, format="wav") temp_audio_path = temp_audio.name print("Starting audio transcription...") transcript = transcribe_audio(temp_audio_path) print(f"Transcription completed. Transcript length: {len(transcript)} characters") # Clean up the temporary file os.unlink(temp_audio_path) # Apply spelling correction and formatting transcript = correct_spelling(transcript) transcript = format_transcript(transcript) return transcript except Exception as e: error_message = f"An error occurred: {str(e)}" print(error_message) return error_message def download_transcript(transcript): with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file: temp_file.write(transcript) temp_file_path = temp_file.name return temp_file_path def pdf_compressor(): put_markdown("# Video Transcription") video_url = input(label="Video URL") if video_url: put_text("Transcribing video...") transcript = transcribe_video(video_url) if transcript: put_text(transcript) download_link = download_transcript(transcript) put_file(download_link, label="Download Transcript") else: put_text("Failed to transcribe video.") if __name__ == '__main__': config(title="Video Transcription", description="Transcribe audio from a video URL using Whisper and PyAnnote") start_server(pdf_compressor, host='0.0.0.0', port=7860, debug=True, enable_rate_limit=True, max_payload_size='200M')