Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import gradio as gr | |
import requests | |
import torch | |
import os | |
from transformers import MarianMTModel, MarianTokenizer, AutoTokenizer, AutoModelForSeq2SeqLM | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from youtube_transcript_api.proxies import WebshareProxyConfig | |
from gtts import gTTS | |
# Initialize YouTubeTranscriptApi | |
proxy_username = os.environ.get('WEBSHARE_PROXY_UN') | |
proxy_password = os.environ.get('WEBSHARE_PROXY_PW') | |
ytt_api = None | |
try: | |
if proxy_username and proxy_password: | |
ytt_api = YouTubeTranscriptApi( | |
proxy_config=WebshareProxyConfig( | |
proxy_username=proxy_username, | |
proxy_password=proxy_password, | |
filter_ip_locations=["us"], | |
) | |
) | |
print(f"Successfully connected to the Youtube API with proxy.") | |
else: | |
ytt_api = YouTubeTranscriptApi() | |
print(f"Successfully connected to the Youtube API without proxy.") | |
except Exception as e: | |
print(f"A proxy error occurred in connecting to the Youtube API: {e}") | |
ytt_api = YouTubeTranscriptApi() # Fallback if proxy fails | |
def getEnglishTranscript(video_id): | |
"""Retrieves the English transcript for a given YouTube video ID.""" | |
if not ytt_api: | |
print("YouTubeTranscriptApi not initialized.") | |
return "" | |
try: | |
transcript_list = ytt_api.list(video_id) | |
english_original = None | |
for transcript in transcript_list: | |
if(transcript.language_code == 'en'): | |
english_original = transcript.fetch() | |
break | |
english_output = "" | |
if english_original: | |
for snippet in english_original: | |
english_output += snippet.text + " " | |
else: | |
print(f"No English transcript found for video ID: {video_id}") | |
return english_output.strip() | |
except Exception as e: | |
print(f"Error retrieving English transcript for video ID {video_id}: {e}") | |
return "" | |
def getArabicTranscript(video_id): | |
"""Retrieves the Arabic transcript for a given YouTube video ID, translating if necessary.""" | |
if not ytt_api: | |
print("YouTubeTranscriptApi not initialized.") | |
return "" | |
try: | |
transcript_list = ytt_api.list(video_id) | |
arabic_translation = None | |
for transcript in transcript_list: | |
if(transcript.is_translatable): | |
arabic_language_code = None | |
for lang in transcript.translation_languages: | |
if lang.language == 'Arabic': | |
arabic_language_code = lang.language_code | |
break | |
if arabic_language_code: | |
print(f"\nTranslating to Arabic ({arabic_language_code})...") | |
arabic_translation = transcript.translate(arabic_language_code).fetch() | |
print("Arabic Translation Found and Stored.") | |
break # Exit after finding the first Arabic translation | |
arabic_output = "" | |
if arabic_translation: | |
for snippet in arabic_translation: | |
arabic_output += snippet.text + " " | |
else: | |
print(f"No translatable transcript found for Arabic for video ID: {video_id}") | |
return arabic_output.strip() | |
except Exception as e: | |
print(f"Error retrieving or translating Arabic transcript for video ID {video_id}: {e}") | |
return "" | |
def getFrenchTranscript(video_id): | |
"""Retrieves the French transcript for a given YouTube video ID, translating if necessary.""" | |
if not ytt_api: | |
print("YouTubeTranscriptApi not initialized.") | |
return "" | |
try: | |
transcript_list = ytt_api.list(video_id) | |
french_translation = None | |
for transcript in transcript_list: | |
if(transcript.is_translatable): | |
french_language_code = None | |
for lang in transcript.translation_languages: | |
if lang.language == 'French': | |
french_language_code = lang.language_code | |
break | |
if french_language_code: | |
print(f"\nTranslating to French ({french_language_code})...") | |
french_translation = transcript.translate(french_language_code).fetch() | |
print("French Translation Found and Stored.") | |
break # Exit after finding the first French translation | |
french_output = "" | |
if french_translation: | |
for snippet in french_translation: | |
french_output += snippet.text + " " | |
else: | |
print(f"No translatable transcript found for French for video ID: {video_id}") | |
return french_output.strip() | |
except Exception as e: | |
print(f"Error retrieving or translating French transcript for video ID {video_id}: {e}") | |
return "" | |
model, tokenizer, device = None, None, None | |
formatted_language_code = "" | |
def setModelAndTokenizer(language_code): | |
"""Sets the appropriate translation model and tokenizer based on the target language code.""" | |
global model, tokenizer, device, formatted_language_code | |
_MODEL_NAME = None | |
_readable_name = None | |
if language_code == 'ar': | |
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-ar" | |
_readable_name = "English to Arabic" | |
elif language_code == 'fr': | |
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-fr" | |
_readable_name = "English to French" | |
elif language_code == 'ha': | |
_MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
_readable_name = "English to Hausa" | |
formatted_language_code = "hau_Latn" | |
elif language_code == 'fa': | |
_MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
_readable_name = "English to Dari/Afghan Persian" | |
formatted_language_code = "pes_Arab" | |
elif language_code == 'ps': | |
_MODEL_NAME = "facebook/nllb-200-distilled-600M" | |
_readable_name = "English to Pashto" | |
formatted_language_code = "pbt_Arab" | |
else: | |
return f"Language code '{language_code}' not supported for translation model." | |
if model is not None and tokenizer is not None and hasattr(tokenizer, 'name_or_path') and tokenizer.name_or_path == _MODEL_NAME: | |
print(f"Model and tokenizer for {_readable_name} already loaded.") | |
return f"Model and tokenizer for {_readable_name} already loaded." | |
print(f"Loading model and tokenizer for {_readable_name}...") | |
if "Helsinki-NLP" in _MODEL_NAME: | |
try: | |
tokenizer = MarianTokenizer.from_pretrained(_MODEL_NAME) | |
model = MarianMTModel.from_pretrained(_MODEL_NAME) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
print(f"Successfully loaded Helsinki-NLP model: {_MODEL_NAME}") | |
except Exception as e: | |
print(f"Error loading Helsinki-NLP model or tokenizer: {e}") | |
return "Error loading translation model." | |
elif "facebook" in _MODEL_NAME: | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME) | |
model = AutoModelForSeq2SeqLM.from_pretrained(_MODEL_NAME, device_map="auto") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
model.to(device) | |
print(f"Successfully loaded Facebook NLLB model: {_MODEL_NAME}") | |
except Exception as e: | |
print(f"Error loading Facebook NLLB model or tokenizer: {e}") | |
return "Error loading translation model." | |
else: | |
return f"Unknown model type for {_MODEL_NAME}" | |
return f"Model and tokenizer set for {_readable_name}." | |
def chunk_text_by_tokens(text, tokenizer, max_tokens): | |
"""Splits text into chunks based on token count.""" | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
for word in words: | |
trial_chunk = current_chunk + [word] | |
# Use add_special_tokens=False to get token count of just the words | |
num_tokens = len(tokenizer(" ".join(trial_chunk), add_special_tokens=False).input_ids) | |
if num_tokens > max_tokens: | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
current_chunk = [word] | |
else: | |
current_chunk = trial_chunk | |
if current_chunk: | |
chunks.append(" ".join(current_chunk)) | |
return chunks | |
def translate_me(text, language_code): | |
"""Translates the input text to the target language using the loaded model.""" | |
global model, tokenizer, device, formatted_language_code | |
if model is None or tokenizer is None: | |
status = setModelAndTokenizer(language_code) | |
if "Error" in status or "not supported" in status: | |
print(status) | |
return f"Translation failed: {status}" | |
if text is None or text.strip() == "": | |
return "No text to translate." | |
try: | |
if language_code in ['ar', 'fr']: | |
inputs = tokenizer(text, return_tensors="pt", padding=True).to(device) | |
translated = model.generate(**inputs) | |
return tokenizer.decode(translated[0], skip_special_tokens=True) | |
elif language_code in ['ha','fa','ps']: | |
SAFE_CHUNK_SIZE = 900 | |
tokenizer.src_lang = "eng_Latn" # English | |
bos_token_id = tokenizer.convert_tokens_to_ids([formatted_language_code])[0] | |
chunks = chunk_text_by_tokens(text, tokenizer, SAFE_CHUNK_SIZE) | |
translations = [] | |
for chunk in chunks: | |
inputs = tokenizer(chunk, return_tensors="pt").to(device) | |
translated_tokens = model.generate( | |
**inputs, | |
forced_bos_token_id=bos_token_id, | |
max_length=512 | |
) | |
translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True) | |
translations.append(translation) | |
return "\n".join(translations) | |
else: | |
return f"Translation not implemented for language code: {language_code}" | |
except Exception as e: | |
print(f"Error during translation: {e}") | |
return "Error during translation." | |
def say_it_api(text, _out_lang): | |
""" | |
Converts text to speech using gTTS and saves it to a temporary file. | |
Returns the file path. | |
""" | |
if text is None or text.strip() == "": | |
print("No text provided for gTTS speech generation.") | |
return None | |
try: | |
tts = gTTS(text=text, lang=_out_lang) | |
filename = "/tmp/gtts_audio.mp3" | |
tts.save(filename) | |
return filename | |
except Exception as e: | |
print(f"Error during gTTS speech generation: {e}") | |
return None | |
def speak_with_elevenlabs_api(text, language_code): | |
""" | |
Converts text to speech using ElevenLabs API and saves it to a temporary file. | |
Returns the file path. | |
""" | |
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY') | |
VOICE_ID = "EXAVITQu4vr4xnSDxMaL" # Rachel; see docs for voices | |
if not ELEVENLABS_API_KEY: | |
print("ElevenLabs API key not found in environment variables.") | |
return None | |
if text is None or text.strip() == "": | |
print("No text provided for ElevenLabs speech generation.") | |
return None | |
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" | |
headers = { | |
"xi-api-key": ELEVENLABS_API_KEY, | |
"Content-Type": "application/json" | |
} | |
data = { | |
"text": text, | |
"model_id": "eleven_multilingual_v2", | |
"voice_settings": { | |
"stability": 0.5, | |
"similarity_boost": 0.5 | |
} | |
} | |
try: | |
response = requests.post(url, headers=headers, json=data) | |
if response.status_code == 200: | |
filename = "/tmp/elevenlabs_audio.mp3" | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
return filename | |
else: | |
print(f"Error from ElevenLabs API: Status Code {response.status_code}, Response: {response.text}") | |
return None | |
except Exception as e: | |
print(f"Error calling ElevenLabs API: {e}") | |
return None | |
def speechRouter_api(text,language_code): | |
""" | |
Routes text-to-speech requests based on language code and returns the audio file path. | |
""" | |
if text is None or text.strip() == "": | |
return None # No text to speak | |
if language_code == 'ar': | |
return say_it_api(text,language_code) | |
elif language_code == 'fr': | |
return say_it_api(text,language_code) | |
elif language_code in ['ha', 'fa', 'ps']: | |
return speak_with_elevenlabs_api(text, language_code) | |
else: | |
print(f"Language code '{language_code}' not supported for speech generation.") | |
return None | |
def translate_and_speak_api_wrapper(video_id, out_lang): | |
""" | |
Translates the given English text from a Youtube video transcript | |
to other languages and generates speech for the translated text. | |
Args: | |
video_id: The Youtube video ID to translate and speak. | |
out_lang: The language to translate to. | |
Returns: | |
A tuple containing: | |
- translated_text (str): The translated text. | |
- audio_file_path (str or None): The path to the generated audio file, or None if speech generation failed. | |
""" | |
# Ensure model and tokenizer are loaded for the target language | |
model_status = setModelAndTokenizer(out_lang) | |
if "Error" in model_status or "not supported" in model_status: | |
return f"Translation failed: {model_status}", None | |
english_text = getEnglishTranscript(video_id) | |
if english_text == "": | |
return "No English transcript available to translate.", None | |
translated_text = "" | |
if out_lang == "ar": | |
translated_text = getArabicTranscript(video_id) | |
if translated_text.strip() == "": # If no direct Arabic transcript, translate English | |
print("No direct Arabic transcript found, translating from English.") | |
translated_text = translate_me(english_text,out_lang) | |
elif out_lang == "fr": | |
translated_text = getFrenchTranscript(video_id) | |
if translated_text.strip() == "": # If no direct French transcript, translate English | |
print("No direct French transcript found, translating from English.") | |
translated_text = translate_me(english_text,out_lang) | |
elif out_lang in ["ha", "fa", "ps"]: | |
translated_text = translate_me(english_text,out_lang) | |
else: | |
return f"Language code '{out_lang}' not supported for translation.", None | |
if translated_text is None or translated_text.strip() == "" or "Translation failed" in translated_text: | |
return f"Translation to {out_lang} failed.", None | |
# Generate speech using the API wrapper | |
audio_file_path = speechRouter_api(translated_text, out_lang) | |
return translated_text, audio_file_path | |
# This function will serve as the API endpoint for Gradio. | |
def translate_and_speak_api(video_id: str, language_code: str): | |
""" | |
API endpoint to translate and speak YouTube video transcripts. | |
""" | |
print(f"Received request for video ID: {video_id}, language: {language_code}") | |
translated_text, audio_file_path = translate_and_speak_api_wrapper(video_id, language_code) | |
# Return the translated text and the audio file path (or an empty string if None) | |
# Returning an empty string instead of None for the audio output might resolve | |
# the TypeError when autoplay is True. | |
return translated_text, audio_file_path if audio_file_path is not None else "" | |
# Define input components | |
video_id_input = gr.Textbox(label="YouTube Video ID") | |
language_dropdown = gr.Dropdown( | |
label="Target Language", | |
choices=['ar', 'fr', 'ha', 'fa', 'ps'], # Supported language codes | |
value='ar' # Default value | |
) | |
# Define output components | |
translated_text_output = gr.Textbox(label="Translated Text") | |
audio_output = gr.Audio(label="Translated Speech", autoplay=True) | |
# Combine components and the translate_and_speak_api function into a Gradio interface | |
demo = gr.Interface( | |
fn=translate_and_speak_api, # Use the API endpoint function | |
inputs=[video_id_input, language_dropdown], # Inputs match the API function arguments | |
outputs=[translated_text_output, audio_output], # Outputs match the API function return values | |
title="YouTube Translator and Speaker", | |
description="Enter a YouTube video ID and select a language to get the translated transcript and speech." | |
) | |
# ---- Launch Gradio ---- | |
if __name__ == "__main__": | |
demo.launch() | |