asmarx's picture
Upload 2 files
834a5a5 verified
import gradio as gr
import requests
import torch
import os
from transformers import MarianMTModel, MarianTokenizer, AutoTokenizer, AutoModelForSeq2SeqLM
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.proxies import WebshareProxyConfig
from gtts import gTTS
# Initialize YouTubeTranscriptApi
proxy_username = os.environ.get('WEBSHARE_PROXY_UN')
proxy_password = os.environ.get('WEBSHARE_PROXY_PW')
ytt_api = None
try:
if proxy_username and proxy_password:
ytt_api = YouTubeTranscriptApi(
proxy_config=WebshareProxyConfig(
proxy_username=proxy_username,
proxy_password=proxy_password,
filter_ip_locations=["us"],
)
)
print(f"Successfully connected to the Youtube API with proxy.")
else:
ytt_api = YouTubeTranscriptApi()
print(f"Successfully connected to the Youtube API without proxy.")
except Exception as e:
print(f"A proxy error occurred in connecting to the Youtube API: {e}")
ytt_api = YouTubeTranscriptApi() # Fallback if proxy fails
def getEnglishTranscript(video_id):
"""Retrieves the English transcript for a given YouTube video ID."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
english_original = None
for transcript in transcript_list:
if(transcript.language_code == 'en'):
english_original = transcript.fetch()
break
english_output = ""
if english_original:
for snippet in english_original:
english_output += snippet.text + " "
else:
print(f"No English transcript found for video ID: {video_id}")
return english_output.strip()
except Exception as e:
print(f"Error retrieving English transcript for video ID {video_id}: {e}")
return ""
def getArabicTranscript(video_id):
"""Retrieves the Arabic transcript for a given YouTube video ID, translating if necessary."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
arabic_translation = None
for transcript in transcript_list:
if(transcript.is_translatable):
arabic_language_code = None
for lang in transcript.translation_languages:
if lang.language == 'Arabic':
arabic_language_code = lang.language_code
break
if arabic_language_code:
print(f"\nTranslating to Arabic ({arabic_language_code})...")
arabic_translation = transcript.translate(arabic_language_code).fetch()
print("Arabic Translation Found and Stored.")
break # Exit after finding the first Arabic translation
arabic_output = ""
if arabic_translation:
for snippet in arabic_translation:
arabic_output += snippet.text + " "
else:
print(f"No translatable transcript found for Arabic for video ID: {video_id}")
return arabic_output.strip()
except Exception as e:
print(f"Error retrieving or translating Arabic transcript for video ID {video_id}: {e}")
return ""
def getFrenchTranscript(video_id):
"""Retrieves the French transcript for a given YouTube video ID, translating if necessary."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
french_translation = None
for transcript in transcript_list:
if(transcript.is_translatable):
french_language_code = None
for lang in transcript.translation_languages:
if lang.language == 'French':
french_language_code = lang.language_code
break
if french_language_code:
print(f"\nTranslating to French ({french_language_code})...")
french_translation = transcript.translate(french_language_code).fetch()
print("French Translation Found and Stored.")
break # Exit after finding the first French translation
french_output = ""
if french_translation:
for snippet in french_translation:
french_output += snippet.text + " "
else:
print(f"No translatable transcript found for French for video ID: {video_id}")
return french_output.strip()
except Exception as e:
print(f"Error retrieving or translating French transcript for video ID {video_id}: {e}")
return ""
model, tokenizer, device = None, None, None
formatted_language_code = ""
def setModelAndTokenizer(language_code):
"""Sets the appropriate translation model and tokenizer based on the target language code."""
global model, tokenizer, device, formatted_language_code
_MODEL_NAME = None
_readable_name = None
if language_code == 'ar':
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-ar"
_readable_name = "English to Arabic"
elif language_code == 'fr':
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-fr"
_readable_name = "English to French"
elif language_code == 'ha':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Hausa"
formatted_language_code = "hau_Latn"
elif language_code == 'fa':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Dari/Afghan Persian"
formatted_language_code = "pes_Arab"
elif language_code == 'ps':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Pashto"
formatted_language_code = "pbt_Arab"
else:
return f"Language code '{language_code}' not supported for translation model."
if model is not None and tokenizer is not None and hasattr(tokenizer, 'name_or_path') and tokenizer.name_or_path == _MODEL_NAME:
print(f"Model and tokenizer for {_readable_name} already loaded.")
return f"Model and tokenizer for {_readable_name} already loaded."
print(f"Loading model and tokenizer for {_readable_name}...")
if "Helsinki-NLP" in _MODEL_NAME:
try:
tokenizer = MarianTokenizer.from_pretrained(_MODEL_NAME)
model = MarianMTModel.from_pretrained(_MODEL_NAME)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Successfully loaded Helsinki-NLP model: {_MODEL_NAME}")
except Exception as e:
print(f"Error loading Helsinki-NLP model or tokenizer: {e}")
return "Error loading translation model."
elif "facebook" in _MODEL_NAME:
try:
tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(_MODEL_NAME, device_map="auto")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Successfully loaded Facebook NLLB model: {_MODEL_NAME}")
except Exception as e:
print(f"Error loading Facebook NLLB model or tokenizer: {e}")
return "Error loading translation model."
else:
return f"Unknown model type for {_MODEL_NAME}"
return f"Model and tokenizer set for {_readable_name}."
def chunk_text_by_tokens(text, tokenizer, max_tokens):
"""Splits text into chunks based on token count."""
words = text.split()
chunks = []
current_chunk = []
for word in words:
trial_chunk = current_chunk + [word]
# Use add_special_tokens=False to get token count of just the words
num_tokens = len(tokenizer(" ".join(trial_chunk), add_special_tokens=False).input_ids)
if num_tokens > max_tokens:
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
else:
current_chunk = trial_chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def translate_me(text, language_code):
"""Translates the input text to the target language using the loaded model."""
global model, tokenizer, device, formatted_language_code
if model is None or tokenizer is None:
status = setModelAndTokenizer(language_code)
if "Error" in status or "not supported" in status:
print(status)
return f"Translation failed: {status}"
if text is None or text.strip() == "":
return "No text to translate."
try:
if language_code in ['ar', 'fr']:
inputs = tokenizer(text, return_tensors="pt", padding=True).to(device)
translated = model.generate(**inputs)
return tokenizer.decode(translated[0], skip_special_tokens=True)
elif language_code in ['ha','fa','ps']:
SAFE_CHUNK_SIZE = 900
tokenizer.src_lang = "eng_Latn" # English
bos_token_id = tokenizer.convert_tokens_to_ids([formatted_language_code])[0]
chunks = chunk_text_by_tokens(text, tokenizer, SAFE_CHUNK_SIZE)
translations = []
for chunk in chunks:
inputs = tokenizer(chunk, return_tensors="pt").to(device)
translated_tokens = model.generate(
**inputs,
forced_bos_token_id=bos_token_id,
max_length=512
)
translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
translations.append(translation)
return "\n".join(translations)
else:
return f"Translation not implemented for language code: {language_code}"
except Exception as e:
print(f"Error during translation: {e}")
return "Error during translation."
def say_it_api(text, _out_lang):
"""
Converts text to speech using gTTS and saves it to a temporary file.
Returns the file path.
"""
if text is None or text.strip() == "":
print("No text provided for gTTS speech generation.")
return None
try:
tts = gTTS(text=text, lang=_out_lang)
filename = "/tmp/gtts_audio.mp3"
tts.save(filename)
return filename
except Exception as e:
print(f"Error during gTTS speech generation: {e}")
return None
def speak_with_elevenlabs_api(text, language_code):
"""
Converts text to speech using ElevenLabs API and saves it to a temporary file.
Returns the file path.
"""
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY')
VOICE_ID = "EXAVITQu4vr4xnSDxMaL" # Rachel; see docs for voices
if not ELEVENLABS_API_KEY:
print("ElevenLabs API key not found in environment variables.")
return None
if text is None or text.strip() == "":
print("No text provided for ElevenLabs speech generation.")
return None
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
"Content-Type": "application/json"
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5
}
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
filename = "/tmp/elevenlabs_audio.mp3"
with open(filename, 'wb') as f:
f.write(response.content)
return filename
else:
print(f"Error from ElevenLabs API: Status Code {response.status_code}, Response: {response.text}")
return None
except Exception as e:
print(f"Error calling ElevenLabs API: {e}")
return None
def speechRouter_api(text,language_code):
"""
Routes text-to-speech requests based on language code and returns the audio file path.
"""
if text is None or text.strip() == "":
return None # No text to speak
if language_code == 'ar':
return say_it_api(text,language_code)
elif language_code == 'fr':
return say_it_api(text,language_code)
elif language_code in ['ha', 'fa', 'ps']:
return speak_with_elevenlabs_api(text, language_code)
else:
print(f"Language code '{language_code}' not supported for speech generation.")
return None
def translate_and_speak_api_wrapper(video_id, out_lang):
"""
Translates the given English text from a Youtube video transcript
to other languages and generates speech for the translated text.
Args:
video_id: The Youtube video ID to translate and speak.
out_lang: The language to translate to.
Returns:
A tuple containing:
- translated_text (str): The translated text.
- audio_file_path (str or None): The path to the generated audio file, or None if speech generation failed.
"""
# Ensure model and tokenizer are loaded for the target language
model_status = setModelAndTokenizer(out_lang)
if "Error" in model_status or "not supported" in model_status:
return f"Translation failed: {model_status}", None
english_text = getEnglishTranscript(video_id)
if english_text == "":
return "No English transcript available to translate.", None
translated_text = ""
if out_lang == "ar":
translated_text = getArabicTranscript(video_id)
if translated_text.strip() == "": # If no direct Arabic transcript, translate English
print("No direct Arabic transcript found, translating from English.")
translated_text = translate_me(english_text,out_lang)
elif out_lang == "fr":
translated_text = getFrenchTranscript(video_id)
if translated_text.strip() == "": # If no direct French transcript, translate English
print("No direct French transcript found, translating from English.")
translated_text = translate_me(english_text,out_lang)
elif out_lang in ["ha", "fa", "ps"]:
translated_text = translate_me(english_text,out_lang)
else:
return f"Language code '{out_lang}' not supported for translation.", None
if translated_text is None or translated_text.strip() == "" or "Translation failed" in translated_text:
return f"Translation to {out_lang} failed.", None
# Generate speech using the API wrapper
audio_file_path = speechRouter_api(translated_text, out_lang)
return translated_text, audio_file_path
# This function will serve as the API endpoint for Gradio.
def translate_and_speak_api(video_id: str, language_code: str):
"""
API endpoint to translate and speak YouTube video transcripts.
"""
print(f"Received request for video ID: {video_id}, language: {language_code}")
translated_text, audio_file_path = translate_and_speak_api_wrapper(video_id, language_code)
# Return the translated text and the audio file path (or an empty string if None)
# Returning an empty string instead of None for the audio output might resolve
# the TypeError when autoplay is True.
return translated_text, audio_file_path if audio_file_path is not None else ""
# Define input components
video_id_input = gr.Textbox(label="YouTube Video ID")
language_dropdown = gr.Dropdown(
label="Target Language",
choices=['ar', 'fr', 'ha', 'fa', 'ps'], # Supported language codes
value='ar' # Default value
)
# Define output components
translated_text_output = gr.Textbox(label="Translated Text")
audio_output = gr.Audio(label="Translated Speech", autoplay=True)
# Combine components and the translate_and_speak_api function into a Gradio interface
demo = gr.Interface(
fn=translate_and_speak_api, # Use the API endpoint function
inputs=[video_id_input, language_dropdown], # Inputs match the API function arguments
outputs=[translated_text_output, audio_output], # Outputs match the API function return values
title="YouTube Translator and Speaker",
description="Enter a YouTube video ID and select a language to get the translated transcript and speech."
)
# ---- Launch Gradio ----
if __name__ == "__main__":
demo.launch()