File size: 7,042 Bytes
1100e65
edaf161
249a3c0
 
b09f327
 
 
 
 
 
af532e7
7a13c00
1b493d6
170241f
edaf161
 
 
 
170241f
 
28b8cb5
 
 
 
 
 
a18a113
249a3c0
 
 
a18a113
249a3c0
b09f327
249a3c0
 
17ca647
b09f327
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
249a3c0
752e0a6
f1f904a
b0c825b
659b8b6
f1f904a
 
 
ae49d5b
b0c825b
ae49d5b
f1f904a
 
 
b0c825b
 
f1f904a
 
 
752e0a6
b0c825b
f1f904a
 
ae49d5b
 
 
 
752e0a6
 
 
a18a113
ae49d5b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a18a113
0cfb05e
 
 
 
 
af532e7
 
 
256795b
 
af532e7
249a3c0
af532e7
249a3c0
 
0cfb05e
249a3c0
256795b
0cfb05e
249a3c0
 
 
b09f327
 
 
 
0cfb05e
 
 
256795b
 
b09f327
 
 
 
 
 
 
edaf161
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import io
import re
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import requests
from bs4 import BeautifulSoup
import tempfile
import os
import soundfile as sf
from spellchecker import SpellChecker
from pydub import AudioSegment
import librosa
import numpy as np
from pyannote.audio import Pipeline
from pywebio import start_server, config, session
from pywebio.input import input, input_group
from pywebio.output import put_text, put_markdown, put_file
from pywebio.session import run_js

# Initialize the speaker diarization pipeline
try:
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")
    print("Speaker diarization pipeline initialized successfully")
except Exception as e:
    print(f"Error initializing speaker diarization pipeline: {str(e)}")
    pipeline = None

# Check if CUDA is available and set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Load the Whisper model and processor
model_name = "openai/whisper-small"
processor = WhisperProcessor.from_pretrained(model_name)
model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)

spell = SpellChecker()

def download_audio_from_url(url):
    try:
        if "share" in url:
            print("Processing shareable link...")
            response = requests.get(url)
            soup = BeautifulSoup(response.content, 'html.parser')
            video_tag = soup.find('video')
            if video_tag and 'src' in video_tag.attrs:
                video_url = video_tag['src']
                print(f"Extracted video URL: {video_url}")
            else:
                raise ValueError("Direct video URL not found in the shareable link.")
        else:
            video_url = url
        
        print(f"Downloading video from URL: {video_url}")
        response = requests.get(video_url)
        audio_bytes = response.content
        print(f"Successfully downloaded {len(audio_bytes)} bytes of data")
        return audio_bytes
    except Exception as e:
        print(f"Error in download_audio_from_url: {str(e)}")
        raise

def correct_spelling(text):
    words = text.split()
    corrected_words = [spell.correction(word) or word for word in words]
    return ' '.join(corrected_words)

def format_transcript(transcript):
    sentences = transcript.split('.')
    formatted_transcript = []
    current_speaker = None
    for sentence in sentences:
        if ':' in sentence:
            speaker, content = sentence.split(':', 1)
            if speaker != current_speaker:
                formatted_transcript.append(f"\n\n{speaker.strip()}:{content.strip()}.")
                current_speaker = speaker
            else:
                formatted_transcript.append(f"{content.strip()}.")
        else:
            formatted_transcript.append(sentence.strip() + '.')
    return ' '.join(formatted_transcript)

def transcribe_audio(audio_file):
    try:
        print("Loading audio file...")
        audio_input, sr = librosa.load(audio_file, sr=16000)
        audio_input = audio_input.astype(np.float32)
        print(f"Audio duration: {len(audio_input) / sr:.2f} seconds")

        chunk_length = 30 * sr
        overlap = 5 * sr
        transcriptions = []
        
        print("Starting transcription...")
        for i in range(0, len(audio_input), chunk_length - overlap):
            chunk = audio_input[i:i+chunk_length]
            input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device)
            predicted_ids = model.generate(input_features)
            transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
            transcriptions.extend(transcription)
            print(f"Processed {i / sr:.2f} to {(i + chunk_length) / sr:.2f} seconds")

        full_transcription = " ".join(transcriptions)
        print(f"Transcription complete. Full transcription length: {len(full_transcription)} characters")

        print("Applying formatting and paragraph breaks...")
        formatted_transcription = format_transcript_with_breaks(full_transcription)

        return formatted_transcription
    except Exception as e:
        print(f"Error in transcribe_audio: {str(e)}")
        raise

def format_transcript_with_breaks(transcript):
    # Split into sentences
    sentences = re.split('(?<=[.!?]) +', transcript)
    paragraphs = []
    current_paragraph = []

    for sentence in sentences:
        current_paragraph.append(sentence)
        if len(current_paragraph) >= 3:  # Adjust this number to control paragraph size
            paragraphs.append(' '.join(current_paragraph))
            current_paragraph = []

    if current_paragraph:
        paragraphs.append(' '.join(current_paragraph))

    return '\n\n'.join(paragraphs)

def transcribe_video(url):
    try:
        print(f"Attempting to download audio from URL: {url}")
        audio_bytes = download_audio_from_url(url)
        print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data")
        
        # Convert audio bytes to AudioSegment
        audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
        
        print(f"Audio duration: {len(audio) / 1000} seconds")
        
        # Save as WAV file
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
            audio.export(temp_audio.name, format="wav")
            temp_audio_path = temp_audio.name

        print("Starting audio transcription...")
        transcript = transcribe_audio(temp_audio_path)
        print(f"Transcription completed. Transcript length: {len(transcript)} characters")
        
        # Clean up the temporary file
        os.unlink(temp_audio_path)

        # Apply spelling correction and formatting
        transcript = correct_spelling(transcript)
        transcript = format_transcript(transcript)

        return transcript
    except Exception as e:
        error_message = f"An error occurred: {str(e)}"
        print(error_message)
        return error_message

def download_transcript(transcript):
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
        temp_file.write(transcript)
        temp_file_path = temp_file.name
    return temp_file_path

def pdf_compressor():
    put_markdown("# Video Transcription")
    video_url = input(label="Video URL")
    if video_url:
        put_text("Transcribing video...")
        transcript = transcribe_video(video_url)
        if transcript:
            put_text(transcript)
            download_link = download_transcript(transcript)
            put_file(download_link, label="Download Transcript")
        else:
            put_text("Failed to transcribe video.")

if __name__ == '__main__':
    config(title="Video Transcription", description="Transcribe audio from a video URL using Whisper and PyAnnote")
    start_server(pdf_compressor, host='0.0.0.0', port=7860, debug=True, enable_rate_limit=True, max_payload_size='200M')