|
import gradio as gr |
|
import torchaudio |
|
import torchaudio.transforms as T |
|
from transformers import pipeline |
|
import requests |
|
from pydub import AudioSegment |
|
from pydub.silence import split_on_silence |
|
import io |
|
import os |
|
from bs4 import BeautifulSoup |
|
import re |
|
import numpy as np |
|
|
|
|
|
transcription_pipeline = pipeline("automatic-speech-recognition", model="facebook/wav2vec2-base-960h") |
|
|
|
def download_audio_from_url(url): |
|
try: |
|
if "share" in url: |
|
print("Processing shareable link...") |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
video_tag = soup.find('video') |
|
if video_tag and 'src' in video_tag.attrs: |
|
video_url = video_tag['src'] |
|
print(f"Extracted video URL: {video_url}") |
|
else: |
|
raise ValueError("Direct video URL not found in the shareable link.") |
|
else: |
|
video_url = url |
|
|
|
print(f"Downloading video from URL: {video_url}") |
|
response = requests.get(video_url) |
|
audio_bytes = response.content |
|
print(f"Successfully downloaded {len(audio_bytes)} bytes of data") |
|
return audio_bytes |
|
except Exception as e: |
|
print(f"Error in download_audio_from_url: {str(e)}") |
|
raise |
|
|
|
def transcribe_audio(audio_bytes): |
|
audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) |
|
audio.export("temp_audio.wav", format="wav") |
|
waveform, sample_rate = torchaudio.load("temp_audio.wav") |
|
os.remove("temp_audio.wav") |
|
|
|
|
|
waveform_np = waveform.numpy().squeeze() |
|
|
|
|
|
result = transcription_pipeline(waveform_np, chunk_length_s=30) |
|
transcript = result['text'] |
|
|
|
|
|
chunks = split_on_silence(audio, min_silence_len=500, silence_thresh=-40) |
|
paragraphs = [] |
|
current_paragraph = "" |
|
|
|
for chunk in chunks: |
|
chunk.export("temp_chunk.wav", format="wav") |
|
waveform_chunk, sample_rate_chunk = torchaudio.load("temp_chunk.wav") |
|
os.remove("temp_chunk.wav") |
|
|
|
|
|
waveform_chunk_np = waveform_chunk.numpy().squeeze() |
|
|
|
chunk_result = transcription_pipeline(waveform_chunk_np, chunk_length_s=30) |
|
chunk_transcript = chunk_result['text'] |
|
|
|
if chunk_transcript: |
|
if current_paragraph: |
|
current_paragraph += " " + chunk_transcript |
|
else: |
|
current_paragraph = chunk_transcript |
|
else: |
|
if current_paragraph: |
|
paragraphs.append(current_paragraph) |
|
current_paragraph = "" |
|
|
|
if current_paragraph: |
|
paragraphs.append(current_paragraph) |
|
|
|
formatted_transcript = "\n\n".join(paragraphs) |
|
return formatted_transcript |
|
|
|
def transcribe_video(url): |
|
try: |
|
print(f"Attempting to download audio from URL: {url}") |
|
audio_bytes = download_audio_from_url(url) |
|
print(f"Successfully downloaded {len(audio_bytes)} bytes of audio data") |
|
|
|
print("Starting audio transcription...") |
|
transcript = transcribe_audio(audio_bytes) |
|
print("Transcription completed successfully") |
|
|
|
return transcript |
|
except Exception as e: |
|
error_message = f"An error occurred: {str(e)}" |
|
print(error_message) |
|
return error_message |
|
|
|
def download_transcript(transcript): |
|
return transcript, "transcript.txt" |
|
|
|
|
|
with gr.Blocks(title="Video Transcription") as demo: |
|
gr.Markdown("# Video Transcription") |
|
video_url = gr.Textbox(label="Video URL") |
|
transcribe_button = gr.Button("Transcribe") |
|
transcript_output = gr.Textbox(label="Transcript", lines=20) |
|
download_button = gr.Button("Download Transcript") |
|
download_link = gr.File(label="Download Transcript") |
|
|
|
transcribe_button.click(fn=transcribe_video, inputs=video_url, outputs=transcript_output) |
|
download_button.click(fn=download_transcript, inputs=transcript_output, outputs=[download_link, download_link]) |
|
|
|
demo.launch() |