import gradio as gr import requests import torch import os from transformers import MarianMTModel, MarianTokenizer, AutoTokenizer, AutoModelForSeq2SeqLM from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.proxies import WebshareProxyConfig from gtts import gTTS # Initialize YouTubeTranscriptApi proxy_username = os.environ.get('WEBSHARE_PROXY_UN') proxy_password = os.environ.get('WEBSHARE_PROXY_PW') ytt_api = None try: if proxy_username and proxy_password: ytt_api = YouTubeTranscriptApi( proxy_config=WebshareProxyConfig( proxy_username=proxy_username, proxy_password=proxy_password, filter_ip_locations=["us"], ) ) print(f"Successfully connected to the Youtube API with proxy.") else: ytt_api = YouTubeTranscriptApi() print(f"Successfully connected to the Youtube API without proxy.") except Exception as e: print(f"A proxy error occurred in connecting to the Youtube API: {e}") ytt_api = YouTubeTranscriptApi() # Fallback if proxy fails def getEnglishTranscript(video_id): """Retrieves the English transcript for a given YouTube video ID.""" if not ytt_api: print("YouTubeTranscriptApi not initialized.") return "" try: transcript_list = ytt_api.list(video_id) english_original = None for transcript in transcript_list: if(transcript.language_code == 'en'): english_original = transcript.fetch() break english_output = "" if english_original: for snippet in english_original: english_output += snippet.text + " " else: print(f"No English transcript found for video ID: {video_id}") return english_output.strip() except Exception as e: print(f"Error retrieving English transcript for video ID {video_id}: {e}") return "" def getArabicTranscript(video_id): """Retrieves the Arabic transcript for a given YouTube video ID, translating if necessary.""" if not ytt_api: print("YouTubeTranscriptApi not initialized.") return "" try: transcript_list = ytt_api.list(video_id) arabic_translation = None for transcript in transcript_list: if(transcript.is_translatable): arabic_language_code = None for lang in transcript.translation_languages: if lang.language == 'Arabic': arabic_language_code = lang.language_code break if arabic_language_code: print(f"\nTranslating to Arabic ({arabic_language_code})...") arabic_translation = transcript.translate(arabic_language_code).fetch() print("Arabic Translation Found and Stored.") break # Exit after finding the first Arabic translation arabic_output = "" if arabic_translation: for snippet in arabic_translation: arabic_output += snippet.text + " " else: print(f"No translatable transcript found for Arabic for video ID: {video_id}") return arabic_output.strip() except Exception as e: print(f"Error retrieving or translating Arabic transcript for video ID {video_id}: {e}") return "" def getFrenchTranscript(video_id): """Retrieves the French transcript for a given YouTube video ID, translating if necessary.""" if not ytt_api: print("YouTubeTranscriptApi not initialized.") return "" try: transcript_list = ytt_api.list(video_id) french_translation = None for transcript in transcript_list: if(transcript.is_translatable): french_language_code = None for lang in transcript.translation_languages: if lang.language == 'French': french_language_code = lang.language_code break if french_language_code: print(f"\nTranslating to French ({french_language_code})...") french_translation = transcript.translate(french_language_code).fetch() print("French Translation Found and Stored.") break # Exit after finding the first French translation french_output = "" if french_translation: for snippet in french_translation: french_output += snippet.text + " " else: print(f"No translatable transcript found for French for video ID: {video_id}") return french_output.strip() except Exception as e: print(f"Error retrieving or translating French transcript for video ID {video_id}: {e}") return "" model, tokenizer, device = None, None, None formatted_language_code = "" def setModelAndTokenizer(language_code): """Sets the appropriate translation model and tokenizer based on the target language code.""" global model, tokenizer, device, formatted_language_code _MODEL_NAME = None _readable_name = None if language_code == 'ar': _MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-ar" _readable_name = "English to Arabic" elif language_code == 'fr': _MODEL_NAME = "Helsinki-NLP/opus-mt-tc-big-en-fr" _readable_name = "English to French" elif language_code == 'ha': _MODEL_NAME = "facebook/nllb-200-distilled-600M" _readable_name = "English to Hausa" formatted_language_code = "hau_Latn" elif language_code == 'fa': _MODEL_NAME = "facebook/nllb-200-distilled-600M" _readable_name = "English to Dari/Afghan Persian" formatted_language_code = "pes_Arab" elif language_code == 'ps': _MODEL_NAME = "facebook/nllb-200-distilled-600M" _readable_name = "English to Pashto" formatted_language_code = "pbt_Arab" else: return f"Language code '{language_code}' not supported for translation model." if model is not None and tokenizer is not None and hasattr(tokenizer, 'name_or_path') and tokenizer.name_or_path == _MODEL_NAME: print(f"Model and tokenizer for {_readable_name} already loaded.") return f"Model and tokenizer for {_readable_name} already loaded." print(f"Loading model and tokenizer for {_readable_name}...") if "Helsinki-NLP" in _MODEL_NAME: try: tokenizer = MarianTokenizer.from_pretrained(_MODEL_NAME) model = MarianMTModel.from_pretrained(_MODEL_NAME) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) print(f"Successfully loaded Helsinki-NLP model: {_MODEL_NAME}") except Exception as e: print(f"Error loading Helsinki-NLP model or tokenizer: {e}") return "Error loading translation model." elif "facebook" in _MODEL_NAME: try: tokenizer = AutoTokenizer.from_pretrained(_MODEL_NAME) model = AutoModelForSeq2SeqLM.from_pretrained(_MODEL_NAME, device_map="auto") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) print(f"Successfully loaded Facebook NLLB model: {_MODEL_NAME}") except Exception as e: print(f"Error loading Facebook NLLB model or tokenizer: {e}") return "Error loading translation model." else: return f"Unknown model type for {_MODEL_NAME}" return f"Model and tokenizer set for {_readable_name}." def chunk_text_by_tokens(text, tokenizer, max_tokens): """Splits text into chunks based on token count.""" words = text.split() chunks = [] current_chunk = [] for word in words: trial_chunk = current_chunk + [word] # Use add_special_tokens=False to get token count of just the words num_tokens = len(tokenizer(" ".join(trial_chunk), add_special_tokens=False).input_ids) if num_tokens > max_tokens: if current_chunk: chunks.append(" ".join(current_chunk)) current_chunk = [word] else: current_chunk = trial_chunk if current_chunk: chunks.append(" ".join(current_chunk)) return chunks def translate_me(text, language_code): """Translates the input text to the target language using the loaded model.""" global model, tokenizer, device, formatted_language_code if model is None or tokenizer is None: status = setModelAndTokenizer(language_code) if "Error" in status or "not supported" in status: print(status) return f"Translation failed: {status}" if text is None or text.strip() == "": return "No text to translate." try: if language_code in ['ar', 'fr']: inputs = tokenizer(text, return_tensors="pt", padding=True).to(device) translated = model.generate(**inputs) return tokenizer.decode(translated[0], skip_special_tokens=True) elif language_code in ['ha','fa','ps']: SAFE_CHUNK_SIZE = 900 tokenizer.src_lang = "eng_Latn" # English bos_token_id = tokenizer.convert_tokens_to_ids([formatted_language_code])[0] chunks = chunk_text_by_tokens(text, tokenizer, SAFE_CHUNK_SIZE) translations = [] for chunk in chunks: inputs = tokenizer(chunk, return_tensors="pt").to(device) translated_tokens = model.generate( **inputs, forced_bos_token_id=bos_token_id, max_length=512 ) translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True) translations.append(translation) return "\n".join(translations) else: return f"Translation not implemented for language code: {language_code}" except Exception as e: print(f"Error during translation: {e}") return "Error during translation." def say_it_api(text, _out_lang): """ Converts text to speech using gTTS and saves it to a temporary file. Returns the file path. """ if text is None or text.strip() == "": print("No text provided for gTTS speech generation.") return None try: tts = gTTS(text=text, lang=_out_lang) filename = "/tmp/gtts_audio.mp3" tts.save(filename) return filename except Exception as e: print(f"Error during gTTS speech generation: {e}") return None def speak_with_elevenlabs_api(text, language_code): """ Converts text to speech using ElevenLabs API and saves it to a temporary file. Returns the file path. """ ELEVENLABS_API_KEY = os.environ.get('ELEVENLABS_API_KEY') VOICE_ID = "EXAVITQu4vr4xnSDxMaL" # Rachel; see docs for voices if not ELEVENLABS_API_KEY: print("ElevenLabs API key not found in environment variables.") return None if text is None or text.strip() == "": print("No text provided for ElevenLabs speech generation.") return None url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" headers = { "xi-api-key": ELEVENLABS_API_KEY, "Content-Type": "application/json" } data = { "text": text, "model_id": "eleven_multilingual_v2", "voice_settings": { "stability": 0.5, "similarity_boost": 0.5 } } try: response = requests.post(url, headers=headers, json=data) if response.status_code == 200: filename = "/tmp/elevenlabs_audio.mp3" with open(filename, 'wb') as f: f.write(response.content) return filename else: print(f"Error from ElevenLabs API: Status Code {response.status_code}, Response: {response.text}") return None except Exception as e: print(f"Error calling ElevenLabs API: {e}") return None def speechRouter_api(text,language_code): """ Routes text-to-speech requests based on language code and returns the audio file path. """ if text is None or text.strip() == "": return None # No text to speak if language_code == 'ar': return say_it_api(text,language_code) elif language_code == 'fr': return say_it_api(text,language_code) elif language_code in ['ha', 'fa', 'ps']: return speak_with_elevenlabs_api(text, language_code) else: print(f"Language code '{language_code}' not supported for speech generation.") return None def translate_and_speak_api_wrapper(video_id, out_lang): """ Translates the given English text from a Youtube video transcript to other languages and generates speech for the translated text. Args: video_id: The Youtube video ID to translate and speak. out_lang: The language to translate to. Returns: A tuple containing: - translated_text (str): The translated text. - audio_file_path (str or None): The path to the generated audio file, or None if speech generation failed. """ # Ensure model and tokenizer are loaded for the target language model_status = setModelAndTokenizer(out_lang) if "Error" in model_status or "not supported" in model_status: return f"Translation failed: {model_status}", None english_text = getEnglishTranscript(video_id) if english_text == "": return "No English transcript available to translate.", None translated_text = "" if out_lang == "ar": translated_text = getArabicTranscript(video_id) if translated_text.strip() == "": # If no direct Arabic transcript, translate English print("No direct Arabic transcript found, translating from English.") translated_text = translate_me(english_text,out_lang) elif out_lang == "fr": translated_text = getFrenchTranscript(video_id) if translated_text.strip() == "": # If no direct French transcript, translate English print("No direct French transcript found, translating from English.") translated_text = translate_me(english_text,out_lang) elif out_lang in ["ha", "fa", "ps"]: translated_text = translate_me(english_text,out_lang) else: return f"Language code '{out_lang}' not supported for translation.", None if translated_text is None or translated_text.strip() == "" or "Translation failed" in translated_text: return f"Translation to {out_lang} failed.", None # Generate speech using the API wrapper audio_file_path = speechRouter_api(translated_text, out_lang) return translated_text, audio_file_path # This function will serve as the API endpoint for Gradio. def translate_and_speak_api(video_id: str, language_code: str): """ API endpoint to translate and speak YouTube video transcripts. """ print(f"Received request for video ID: {video_id}, language: {language_code}") translated_text, audio_file_path = translate_and_speak_api_wrapper(video_id, language_code) # Return the translated text and the audio file path (or an empty string if None) # Returning an empty string instead of None for the audio output might resolve # the TypeError when autoplay is True. return translated_text, audio_file_path if audio_file_path is not None else "" # Define input components video_id_input = gr.Textbox(label="YouTube Video ID") language_dropdown = gr.Dropdown( label="Target Language", choices=['ar', 'fr', 'ha', 'fa', 'ps'], # Supported language codes value='ar' # Default value ) # Define output components translated_text_output = gr.Textbox(label="Translated Text") audio_output = gr.Audio(label="Translated Speech", autoplay=True) # Combine components and the translate_and_speak_api function into a Gradio interface demo = gr.Interface( fn=translate_and_speak_api, # Use the API endpoint function inputs=[video_id_input, language_dropdown], # Inputs match the API function arguments outputs=[translated_text_output, audio_output], # Outputs match the API function return values title="YouTube Translator and Speaker", description="Enter a YouTube video ID and select a language to get the translated transcript and speech." ) # ---- Launch Gradio ---- if __name__ == "__main__": demo.launch()