import logging from typing import Dict, List, Optional from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM import torch import re logger = logging.getLogger(__name__) class MultilingualTranslator: """Multilingual translation with support for Hindi and Tamil""" def __init__(self): self.translators = {} self.language_codes = { 'Hindi': 'hi', 'Tamil': 'ta', 'English': 'en' } # Supported translation pairs self.supported_pairs = { 'en-hi': 'Helsinki-NLP/opus-mt-en-hi', 'en-ta': 'Helsinki-NLP/opus-mt-en-mul', # Multilingual model for Tamil 'hi-en': 'Helsinki-NLP/opus-mt-hi-en', 'ta-en': 'Helsinki-NLP/opus-mt-mul-en' } self._initialize_models() logger.info("MultilingualTranslator initialized") def _initialize_models(self): """Initialize translation models on-demand""" # Don't load all models at startup to save memory # They will be loaded when first needed logger.info("Translation models will be loaded on-demand") def _load_translator(self, source_lang: str, target_lang: str) -> Optional[object]: """Load a specific translator model""" pair_key = f"{source_lang}-{target_lang}" if pair_key in self.translators: return self.translators[pair_key] try: model_name = self.supported_pairs.get(pair_key) if not model_name: logger.error(f"No model available for {source_lang} -> {target_lang}") return None # Use CPU for Hugging Face Spaces compatibility device = -1 # CPU only translator = pipeline( "translation", model=model_name, device=device, framework="pt" ) self.translators[pair_key] = translator logger.info(f"Loaded translator for {source_lang} -> {target_lang}") return translator except Exception as e: logger.error(f"Failed to load translator {pair_key}: {str(e)}") return None def translate(self, text: str, target_lang: str, source_lang: str = 'English') -> str: """Translate text to target language""" if not text or not text.strip(): return "" # Get language codes source_code = self.language_codes.get(source_lang, 'en') target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) # If source and target are the same, return original text if source_code == target_code: return text try: # Load the appropriate translator translator = self._load_translator(source_code, target_code) if not translator: return self._fallback_translate(text, target_lang) # Clean and prepare text cleaned_text = self._prepare_text_for_translation(text) if not cleaned_text: return text # Split long text into chunks for translation if len(cleaned_text.split()) > 200: return self._translate_long_text(cleaned_text, translator) else: return self._translate_chunk(cleaned_text, translator) except Exception as e: logger.error(f"Translation failed: {str(e)}") return self._fallback_translate(text, target_lang) def _translate_chunk(self, text: str, translator) -> str: """Translate a single chunk of text""" try: result = translator(text, max_length=512) if result and len(result) > 0: translated = result[0].get('translation_text', text) return self._post_process_translation(translated) return text except Exception as e: logger.error(f"Chunk translation failed: {str(e)}") return text def _translate_long_text(self, text: str, translator) -> str: """Translate long text by splitting into chunks""" try: # Split by sentences sentences = self._split_into_sentences(text) if not sentences: return text translated_sentences = [] current_chunk = "" for sentence in sentences: # If adding this sentence would make chunk too long, translate current chunk if len((current_chunk + " " + sentence).split()) > 150 and current_chunk: translated = self._translate_chunk(current_chunk, translator) translated_sentences.append(translated) current_chunk = sentence else: if current_chunk: current_chunk += " " + sentence else: current_chunk = sentence # Translate remaining chunk if current_chunk: translated = self._translate_chunk(current_chunk, translator) translated_sentences.append(translated) return " ".join(translated_sentences) except Exception as e: logger.error(f"Long text translation failed: {str(e)}") return text def _split_into_sentences(self, text: str) -> List[str]: """Split text into sentences""" try: # Simple sentence splitting sentences = re.split(r'[.!?]+\s+', text) sentences = [s.strip() for s in sentences if s.strip()] return sentences except Exception as e: logger.error(f"Sentence splitting failed: {str(e)}") return [text] def _prepare_text_for_translation(self, text: str) -> str: """Prepare text for translation""" if not text: return "" # Remove URLs text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) # Remove email addresses text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) # Clean excessive whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters that might cause issues text = re.sub(r'[^\w\s.,!?;:\-\'"()/%$]', '', text) return text.strip() def _post_process_translation(self, text: str) -> str: """Post-process translated text""" if not text: return "" # Clean up extra spaces text = re.sub(r'\s+', ' ', text) # Capitalize first letter if it's a sentence if text and len(text) > 1: text = text[0].upper() + text[1:] return text.strip() def _fallback_translate(self, text: str, target_lang: str) -> str: """Fallback translation with basic text processing""" logger.warning(f"Using fallback translation for {target_lang}") # For demonstration purposes, we'll return the original text with a note # In a production system, you might use a different translation service if target_lang.lower() in ['hindi', 'hi']: return f"[Hindi] {text}" elif target_lang.lower() in ['tamil', 'ta']: return f"[Tamil] {text}" else: return text def batch_translate(self, texts: List[str], target_lang: str, source_lang: str = 'English') -> List[str]: """Translate multiple texts""" translations = [] for text in texts: try: translation = self.translate(text, target_lang, source_lang) translations.append(translation) except Exception as e: logger.error(f"Batch translation failed for one text: {str(e)}") translations.append(self._fallback_translate(text, target_lang)) return translations def detect_language(self, text: str) -> str: """Simple language detection (basic implementation)""" try: # Basic detection using character patterns if not text: return 'en' # Check for Devanagari script (Hindi) if re.search(r'[\u0900-\u097F]', text): return 'hi' # Check for Tamil script if re.search(r'[\u0B80-\u0BFF]', text): return 'ta' # Default to English return 'en' except Exception as e: logger.error(f"Language detection failed: {str(e)}") return 'en' def get_supported_languages(self) -> List[str]: """Get list of supported languages""" return list(self.language_codes.keys()) def is_translation_available(self, source_lang: str, target_lang: str) -> bool: """Check if translation is available between two languages""" source_code = self.language_codes.get(source_lang, source_lang.lower()[:2]) target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) pair_key = f"{source_code}-{target_code}" return pair_key in self.supported_pairs def translate_with_confidence(self, text: str, target_lang: str, source_lang: str = 'English') -> Dict[str, any]: """Translate text and return result with confidence metrics""" try: translated_text = self.translate(text, target_lang, source_lang) # Simple confidence calculation based on text characteristics confidence = self._calculate_translation_confidence(text, translated_text, target_lang) return { 'original_text': text, 'translated_text': translated_text, 'source_language': source_lang, 'target_language': target_lang, 'confidence': confidence, 'method': 'neural_translation' if translated_text != text else 'fallback' } except Exception as e: logger.error(f"Translation with confidence failed: {str(e)}") return { 'original_text': text, 'translated_text': text, 'source_language': source_lang, 'target_language': target_lang, 'confidence': 0.0, 'method': 'error', 'error': str(e) } def _calculate_translation_confidence(self, original: str, translated: str, target_lang: str) -> float: """Calculate a simple confidence score for translation""" try: # If translation failed (same as original), low confidence if original == translated and target_lang != 'English': return 0.2 # If text is very short, moderate confidence if len(original.split()) < 5: return 0.7 # If translation is significantly different in length, lower confidence original_len = len(original.split()) translated_len = len(translated.split()) length_ratio = min(original_len, translated_len) / max(original_len, translated_len) if length_ratio < 0.5: return 0.6 elif length_ratio < 0.7: return 0.8 else: return 0.9 except Exception as e: logger.error(f"Confidence calculation failed: {str(e)}") return 0.5 # Utility functions def get_language_name(code: str) -> str: """Get full language name from code""" code_to_name = { 'en': 'English', 'hi': 'Hindi', 'ta': 'Tamil' } return code_to_name.get(code.lower(), code) def get_language_code(name: str) -> str: """Get language code from name""" name_to_code = { 'english': 'en', 'hindi': 'hi', 'tamil': 'ta' } return name_to_code.get(name.lower(), name.lower()[:2])