File size: 5,628 Bytes
1100e65
b09f327
249a3c0
 
b09f327
 
 
 
 
 
af532e7
7a13c00
1b493d6
a18a113
249a3c0
 
 
a18a113
249a3c0
b09f327
249a3c0
 
17ca647
b09f327
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
249a3c0
752e0a6
b0c825b
 
659b8b6
 
 
 
256795b
b0c825b
256795b
b0c825b
752e0a6
256795b
b0c825b
 
 
 
 
752e0a6
b0c825b
 
0653992
256795b
b0c825b
752e0a6
 
 
a18a113
 
0cfb05e
 
 
 
 
af532e7
 
 
256795b
 
af532e7
249a3c0
af532e7
249a3c0
 
0cfb05e
249a3c0
256795b
0cfb05e
249a3c0
 
 
b09f327
 
 
 
0cfb05e
 
 
256795b
 
b09f327
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import io
import gradio as gr
import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import requests
from bs4 import BeautifulSoup
import tempfile
import os
import soundfile as sf
from spellchecker import SpellChecker
from pydub import AudioSegment
import librosa
import numpy as np

# Check if CUDA is available and set the device
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# Load the Whisper model and processor
model_name = "openai/whisper-small"
processor = WhisperProcessor.from_pretrained(model_name)
model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)

spell = SpellChecker()

def download_audio_from_url(url):
    try:
        if "share" in url:
            print("Processing shareable link...")
            response = requests.get(url)
            soup = BeautifulSoup(response.content, 'html.parser')
            video_tag = soup.find('video')
            if video_tag and 'src' in video_tag.attrs:
                video_url = video_tag['src']
                print(f"Extracted video URL: {video_url}")
            else:
                raise ValueError("Direct video URL not found in the shareable link.")
        else:
            video_url = url
        
        print(f"Downloading video from URL: {video_url}")
        response = requests.get(video_url)
        audio_bytes = response.content
        print(f"Successfully downloaded {len(audio_bytes)} bytes of data")
        return audio_bytes
    except Exception as e:
        print(f"Error in download_audio_from_url: {str(e)}")
        raise

def correct_spelling(text):
    words = text.split()
    corrected_words = [spell.correction(word) or word for word in words]
    return ' '.join(corrected_words)

def format_transcript(transcript):
    sentences = transcript.split('.')
    formatted_transcript = []
    current_speaker = None
    for sentence in sentences:
        if ':' in sentence:
            speaker, content = sentence.split(':', 1)
            if speaker != current_speaker:
                formatted_transcript.append(f"\n\n{speaker.strip()}:{content.strip()}.")
                current_speaker = speaker
            else:
                formatted_transcript.append(f"{content.strip()}.")
        else:
            formatted_transcript.append(sentence.strip() + '.')
    return ' '.join(formatted_transcript)

def transcribe_audio(audio_file):
    try:
        # Load the entire audio file
        audio_input, sr = librosa.load(audio_file, sr=16000)
        
        # Convert to float32 numpy array
        audio_input = audio_input.astype(np.float32)

        # Process in chunks of 30 seconds with overlap
        chunk_length = 30 * sr
        overlap = 5 * sr  # 5 seconds overlap
        transcriptions = []

        for i in range(0, len(audio_input), chunk_length - overlap):
            chunk = audio_input[i:i+chunk_length]
            input_features = processor(chunk, sampling_rate=16000, return_tensors="pt").input_features.to(device)
            predicted_ids = model.generate(input_features)
            transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True)
            transcriptions.extend(transcription)

        # Join all transcriptions
        full_transcription = " ".join(transcriptions)

        print(f"Full transcription length: {len(full_transcription)} characters")
        return full_transcription
    except Exception as e:
        print(f"Error in transcribe_audio: {str(e)}")
        raise

def transcribe_video(url):
    try:
        print(f"Attempting to download audio from URL: {url}")
        audio_bytes = download_audio_from_url(url)
        print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data")
        
        # Convert audio bytes to AudioSegment
        audio = AudioSegment.from_file(io.BytesIO(audio_bytes))
        
        print(f"Audio duration: {len(audio) / 1000} seconds")
        
        # Save as WAV file
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio:
            audio.export(temp_audio.name, format="wav")
            temp_audio_path = temp_audio.name

        print("Starting audio transcription...")
        transcript = transcribe_audio(temp_audio_path)
        print(f"Transcription completed. Transcript length: {len(transcript)} characters")
        
        # Clean up the temporary file
        os.unlink(temp_audio_path)

        # Apply spelling correction and formatting
        transcript = correct_spelling(transcript)
        transcript = format_transcript(transcript)

        return transcript
    except Exception as e:
        error_message = f"An error occurred: {str(e)}"
        print(error_message)
        return error_message

def download_transcript(transcript):
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_file:
        temp_file.write(transcript)
        temp_file_path = temp_file.name
    return temp_file_path

# Create the Gradio interface
with gr.Blocks(title="Video Transcription") as demo:
    gr.Markdown("# Video Transcription")
    video_url = gr.Textbox(label="Video URL")
    transcribe_button = gr.Button("Transcribe")
    transcript_output = gr.Textbox(label="Transcript", lines=20)
    download_button = gr.Button("Download Transcript")
    download_link = gr.File(label="Download Transcript")
    
    transcribe_button.click(fn=transcribe_video, inputs=video_url, outputs=transcript_output)
    download_button.click(fn=download_transcript, inputs=transcript_output, outputs=download_link)

demo.launch()