Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
File size: 16,195 Bytes
a26dd44 834a5a5 a26dd44 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 |
import gradio as gr
import requests
import torch
import os
from transformers import MarianMTModel, MarianTokenizer, AutoTokenizer, AutoModelForSeq2SeqLM
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.proxies import WebshareProxyConfig
from gtts import gTTS
# Initialize YouTubeTranscriptApi
proxy_username = os.environ.get('WEBSHARE_PROXY_UN')
proxy_password = os.environ.get('WEBSHARE_PROXY_PW')
ytt_api = None
try:
if proxy_username and proxy_password:
ytt_api = YouTubeTranscriptApi(
proxy_config=WebshareProxyConfig(
proxy_username=proxy_username,
proxy_password=proxy_password,
filter_ip_locations=["us"],
)
)
print(f"Successfully connected to the Youtube API with proxy.")
else:
ytt_api = YouTubeTranscriptApi()
print(f"Successfully connected to the Youtube API without proxy.")
except Exception as e:
print(f"A proxy error occurred in connecting to the Youtube API: {e}")
ytt_api = YouTubeTranscriptApi() # Fallback if proxy fails
def getEnglishTranscript(video_id):
"""Retrieves the English transcript for a given YouTube video ID."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
english_original = None
for transcript in transcript_list:
if(transcript.language_code == 'en'):
english_original = transcript.fetch()
break
english_output = ""
if english_original:
for snippet in english_original:
english_output += snippet.text + " "
else:
print(f"No English transcript found for video ID: {video_id}")
return english_output.strip()
except Exception as e:
print(f"Error retrieving English transcript for video ID {video_id}: {e}")
return ""
def getArabicTranscript(video_id):
"""Retrieves the Arabic transcript for a given YouTube video ID, translating if necessary."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
arabic_translation = None
for transcript in transcript_list:
if(transcript.is_translatable):
arabic_language_code = None
for lang in transcript.translation_languages:
if lang.language == 'Arabic':
arabic_language_code = lang.language_code
break
if arabic_language_code:
print(f"\nTranslating to Arabic ({arabic_language_code})...")
arabic_translation = transcript.translate(arabic_language_code).fetch()
print("Arabic Translation Found and Stored.")
break # Exit after finding the first Arabic translation
arabic_output = ""
if arabic_translation:
for snippet in arabic_translation:
arabic_output += snippet.text + " "
else:
print(f"No translatable transcript found for Arabic for video ID: {video_id}")
return arabic_output.strip()
except Exception as e:
print(f"Error retrieving or translating Arabic transcript for video ID {video_id}: {e}")
return ""
def getFrenchTranscript(video_id):
"""Retrieves the French transcript for a given YouTube video ID, translating if necessary."""
if not ytt_api:
print("YouTubeTranscriptApi not initialized.")
return ""
try:
transcript_list = ytt_api.list(video_id)
french_translation = None
for transcript in transcript_list:
if(transcript.is_translatable):
french_language_code = None
for lang in transcript.translation_languages:
if lang.language == 'French':
french_language_code = lang.language_code
break
if french_language_code:
print(f"\nTranslating to French ({french_language_code})...")
french_translation = transcript.translate(french_language_code).fetch()
print("French Translation Found and Stored.")
break # Exit after finding the first French translation
french_output = ""
if french_translation:
for snippet in french_translation:
french_output += snippet.text + " "
else:
print(f"No translatable transcript found for French for video ID: {video_id}")
return french_output.strip()
except Exception as e:
print(f"Error retrieving or translating French transcript for video ID {video_id}: {e}")
return ""
model, tokenizer, device = None, None, None
formatted_language_code = ""
def setModelAndTokenizer(language_code):
"""Sets the appropriate translation model and tokenizer based on the target language code."""
global model, tokenizer, device, formatted_language_code
_MODEL_NAME = None
_readable_name = None
if language_code == 'ar':
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-ar"
_readable_name = "English to Arabic"
elif language_code == 'fr':
_MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-fr"
_readable_name = "English to French"
elif language_code == 'ha':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Hausa"
formatted_language_code = "hau_Latn"
elif language_code == 'fa':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Dari/Afghan Persian"
formatted_language_code = "pes_Arab"
elif language_code == 'ps':
_MODEL_NAME = "facebook/nllb-200-distilled-600M"
_readable_name = "English to Pashto"
formatted_language_code = "pbt_Arab"
else:
return f"Language code '{language_code}' not supported for translation model."
if model is not None and tokenizer is not None and hasattr(tokenizer, 'name_or_path') and tokenizer.name_or_path == _MODEL_NAME:
print(f"Model and tokenizer for {_readable_name} already loaded.")
return f"Model and tokenizer for {_readable_name} already loaded."
print(f"Loading model and tokenizer for {_readable_name}...")
if "Helsinki-NLP" in _MODEL_NAME:
try:
tokenizer = MarianTokenizer.from_pretrained(_MODEL_NAME)
model = MarianMTModel.from_pretrained(_MODEL_NAME)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Successfully loaded Helsinki-NLP model: {_MODEL_NAME}")
except Exception as e:
print(f"Error loading Helsinki-NLP model or tokenizer: {e}")
return "Error loading translation model."
elif "facebook" in _MODEL_NAME:
try:
tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME)
model = AutoModelForSeq2SeqLM.from_pretrained(_MODEL_NAME, device_map="auto")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Successfully loaded Facebook NLLB model: {_MODEL_NAME}")
except Exception as e:
print(f"Error loading Facebook NLLB model or tokenizer: {e}")
return "Error loading translation model."
else:
return f"Unknown model type for {_MODEL_NAME}"
return f"Model and tokenizer set for {_readable_name}."
def chunk_text_by_tokens(text, tokenizer, max_tokens):
"""Splits text into chunks based on token count."""
words = text.split()
chunks = []
current_chunk = []
for word in words:
trial_chunk = current_chunk + [word]
# Use add_special_tokens=False to get token count of just the words
num_tokens = len(tokenizer(" ".join(trial_chunk), add_special_tokens=False).input_ids)
if num_tokens > max_tokens:
if current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
else:
current_chunk = trial_chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def translate_me(text, language_code):
"""Translates the input text to the target language using the loaded model."""
global model, tokenizer, device, formatted_language_code
if model is None or tokenizer is None:
status = setModelAndTokenizer(language_code)
if "Error" in status or "not supported" in status:
print(status)
return f"Translation failed: {status}"
if text is None or text.strip() == "":
return "No text to translate."
try:
if language_code in ['ar', 'fr']:
inputs = tokenizer(text, return_tensors="pt", padding=True).to(device)
translated = model.generate(**inputs)
return tokenizer.decode(translated[0], skip_special_tokens=True)
elif language_code in ['ha','fa','ps']:
SAFE_CHUNK_SIZE = 900
tokenizer.src_lang = "eng_Latn" # English
bos_token_id = tokenizer.convert_tokens_to_ids([formatted_language_code])[0]
chunks = chunk_text_by_tokens(text, tokenizer, SAFE_CHUNK_SIZE)
translations = []
for chunk in chunks:
inputs = tokenizer(chunk, return_tensors="pt").to(device)
translated_tokens = model.generate(
**inputs,
forced_bos_token_id=bos_token_id,
max_length=512
)
translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
translations.append(translation)
return "\n".join(translations)
else:
return f"Translation not implemented for language code: {language_code}"
except Exception as e:
print(f"Error during translation: {e}")
return "Error during translation."
def say_it_api(text, _out_lang):
"""
Converts text to speech using gTTS and saves it to a temporary file.
Returns the file path.
"""
if text is None or text.strip() == "":
print("No text provided for gTTS speech generation.")
return None
try:
tts = gTTS(text=text, lang=_out_lang)
filename = "/tmp/gtts_audio.mp3"
tts.save(filename)
return filename
except Exception as e:
print(f"Error during gTTS speech generation: {e}")
return None
def speak_with_elevenlabs_api(text, language_code):
"""
Converts text to speech using ElevenLabs API and saves it to a temporary file.
Returns the file path.
"""
ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY')
VOICE_ID = "EXAVITQu4vr4xnSDxMaL" # Rachel; see docs for voices
if not ELEVENLABS_API_KEY:
print("ElevenLabs API key not found in environment variables.")
return None
if text is None or text.strip() == "":
print("No text provided for ElevenLabs speech generation.")
return None
url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}"
headers = {
"xi-api-key": ELEVENLABS_API_KEY,
"Content-Type": "application/json"
}
data = {
"text": text,
"model_id": "eleven_multilingual_v2",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.5
}
}
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
filename = "/tmp/elevenlabs_audio.mp3"
with open(filename, 'wb') as f:
f.write(response.content)
return filename
else:
print(f"Error from ElevenLabs API: Status Code {response.status_code}, Response: {response.text}")
return None
except Exception as e:
print(f"Error calling ElevenLabs API: {e}")
return None
def speechRouter_api(text,language_code):
"""
Routes text-to-speech requests based on language code and returns the audio file path.
"""
if text is None or text.strip() == "":
return None # No text to speak
if language_code == 'ar':
return say_it_api(text,language_code)
elif language_code == 'fr':
return say_it_api(text,language_code)
elif language_code in ['ha', 'fa', 'ps']:
return speak_with_elevenlabs_api(text, language_code)
else:
print(f"Language code '{language_code}' not supported for speech generation.")
return None
def translate_and_speak_api_wrapper(video_id, out_lang):
"""
Translates the given English text from a Youtube video transcript
to other languages and generates speech for the translated text.
Args:
video_id: The Youtube video ID to translate and speak.
out_lang: The language to translate to.
Returns:
A tuple containing:
- translated_text (str): The translated text.
- audio_file_path (str or None): The path to the generated audio file, or None if speech generation failed.
"""
# Ensure model and tokenizer are loaded for the target language
model_status = setModelAndTokenizer(out_lang)
if "Error" in model_status or "not supported" in model_status:
return f"Translation failed: {model_status}", None
english_text = getEnglishTranscript(video_id)
if english_text == "":
return "No English transcript available to translate.", None
translated_text = ""
if out_lang == "ar":
translated_text = getArabicTranscript(video_id)
if translated_text.strip() == "": # If no direct Arabic transcript, translate English
print("No direct Arabic transcript found, translating from English.")
translated_text = translate_me(english_text,out_lang)
elif out_lang == "fr":
translated_text = getFrenchTranscript(video_id)
if translated_text.strip() == "": # If no direct French transcript, translate English
print("No direct French transcript found, translating from English.")
translated_text = translate_me(english_text,out_lang)
elif out_lang in ["ha", "fa", "ps"]:
translated_text = translate_me(english_text,out_lang)
else:
return f"Language code '{out_lang}' not supported for translation.", None
if translated_text is None or translated_text.strip() == "" or "Translation failed" in translated_text:
return f"Translation to {out_lang} failed.", None
# Generate speech using the API wrapper
audio_file_path = speechRouter_api(translated_text, out_lang)
return translated_text, audio_file_path
# This function will serve as the API endpoint for Gradio.
def translate_and_speak_api(video_id: str, language_code: str):
"""
API endpoint to translate and speak YouTube video transcripts.
"""
print(f"Received request for video ID: {video_id}, language: {language_code}")
translated_text, audio_file_path = translate_and_speak_api_wrapper(video_id, language_code)
# Return the translated text and the audio file path (or an empty string if None)
# Returning an empty string instead of None for the audio output might resolve
# the TypeError when autoplay is True.
return translated_text, audio_file_path if audio_file_path is not None else ""
# Define input components
video_id_input = gr.Textbox(label="YouTube Video ID")
language_dropdown = gr.Dropdown(
label="Target Language",
choices=['ar', 'fr', 'ha', 'fa', 'ps'], # Supported language codes
value='ar' # Default value
)
# Define output components
translated_text_output = gr.Textbox(label="Translated Text")
audio_output = gr.Audio(label="Translated Speech", autoplay=True)
# Combine components and the translate_and_speak_api function into a Gradio interface
demo = gr.Interface(
fn=translate_and_speak_api, # Use the API endpoint function
inputs=[video_id_input, language_dropdown], # Inputs match the API function arguments
outputs=[translated_text_output, audio_output], # Outputs match the API function return values
title="YouTube Translator and Speaker",
description="Enter a YouTube video ID and select a language to get the translated transcript and speech."
)
# ---- Launch Gradio ----
if __name__ == "__main__":
demo.launch()
|