File size: 12,621 Bytes
8f8d0f6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
import logging
from typing import Dict, List, Optional
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import re
logger = logging.getLogger(__name__)
class MultilingualTranslator:
"""Multilingual translation with support for Hindi and Tamil"""
def __init__(self):
self.translators = {}
self.language_codes = {
'Hindi': 'hi',
'Tamil': 'ta',
'English': 'en'
}
# Supported translation pairs
self.supported_pairs = {
'en-hi': 'Helsinki-NLP/opus-mt-en-hi',
'en-ta': 'Helsinki-NLP/opus-mt-en-mul', # Multilingual model for Tamil
'hi-en': 'Helsinki-NLP/opus-mt-hi-en',
'ta-en': 'Helsinki-NLP/opus-mt-mul-en'
}
self._initialize_models()
logger.info("MultilingualTranslator initialized")
def _initialize_models(self):
"""Initialize translation models on-demand"""
# Don't load all models at startup to save memory
# They will be loaded when first needed
logger.info("Translation models will be loaded on-demand")
def _load_translator(self, source_lang: str, target_lang: str) -> Optional[object]:
"""Load a specific translator model"""
pair_key = f"{source_lang}-{target_lang}"
if pair_key in self.translators:
return self.translators[pair_key]
try:
model_name = self.supported_pairs.get(pair_key)
if not model_name:
logger.error(f"No model available for {source_lang} -> {target_lang}")
return None
# Use CPU for Hugging Face Spaces compatibility
device = -1 # CPU only
translator = pipeline(
"translation",
model=model_name,
device=device,
framework="pt"
)
self.translators[pair_key] = translator
logger.info(f"Loaded translator for {source_lang} -> {target_lang}")
return translator
except Exception as e:
logger.error(f"Failed to load translator {pair_key}: {str(e)}")
return None
def translate(self, text: str, target_lang: str, source_lang: str = 'English') -> str:
"""Translate text to target language"""
if not text or not text.strip():
return ""
# Get language codes
source_code = self.language_codes.get(source_lang, 'en')
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2])
# If source and target are the same, return original text
if source_code == target_code:
return text
try:
# Load the appropriate translator
translator = self._load_translator(source_code, target_code)
if not translator:
return self._fallback_translate(text, target_lang)
# Clean and prepare text
cleaned_text = self._prepare_text_for_translation(text)
if not cleaned_text:
return text
# Split long text into chunks for translation
if len(cleaned_text.split()) > 200:
return self._translate_long_text(cleaned_text, translator)
else:
return self._translate_chunk(cleaned_text, translator)
except Exception as e:
logger.error(f"Translation failed: {str(e)}")
return self._fallback_translate(text, target_lang)
def _translate_chunk(self, text: str, translator) -> str:
"""Translate a single chunk of text"""
try:
result = translator(text, max_length=512)
if result and len(result) > 0:
translated = result[0].get('translation_text', text)
return self._post_process_translation(translated)
return text
except Exception as e:
logger.error(f"Chunk translation failed: {str(e)}")
return text
def _translate_long_text(self, text: str, translator) -> str:
"""Translate long text by splitting into chunks"""
try:
# Split by sentences
sentences = self._split_into_sentences(text)
if not sentences:
return text
translated_sentences = []
current_chunk = ""
for sentence in sentences:
# If adding this sentence would make chunk too long, translate current chunk
if len((current_chunk + " " + sentence).split()) > 150 and current_chunk:
translated = self._translate_chunk(current_chunk, translator)
translated_sentences.append(translated)
current_chunk = sentence
else:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
# Translate remaining chunk
if current_chunk:
translated = self._translate_chunk(current_chunk, translator)
translated_sentences.append(translated)
return " ".join(translated_sentences)
except Exception as e:
logger.error(f"Long text translation failed: {str(e)}")
return text
def _split_into_sentences(self, text: str) -> List[str]:
"""Split text into sentences"""
try:
# Simple sentence splitting
sentences = re.split(r'[.!?]+\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
return sentences
except Exception as e:
logger.error(f"Sentence splitting failed: {str(e)}")
return [text]
def _prepare_text_for_translation(self, text: str) -> str:
"""Prepare text for translation"""
if not text:
return ""
# Remove URLs
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
# Remove email addresses
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text)
# Clean excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters that might cause issues
text = re.sub(r'[^\w\s.,!?;:\-\'"()/%$]', '', text)
return text.strip()
def _post_process_translation(self, text: str) -> str:
"""Post-process translated text"""
if not text:
return ""
# Clean up extra spaces
text = re.sub(r'\s+', ' ', text)
# Capitalize first letter if it's a sentence
if text and len(text) > 1:
text = text[0].upper() + text[1:]
return text.strip()
def _fallback_translate(self, text: str, target_lang: str) -> str:
"""Fallback translation with basic text processing"""
logger.warning(f"Using fallback translation for {target_lang}")
# For demonstration purposes, we'll return the original text with a note
# In a production system, you might use a different translation service
if target_lang.lower() in ['hindi', 'hi']:
return f"[Hindi] {text}"
elif target_lang.lower() in ['tamil', 'ta']:
return f"[Tamil] {text}"
else:
return text
def batch_translate(self, texts: List[str], target_lang: str, source_lang: str = 'English') -> List[str]:
"""Translate multiple texts"""
translations = []
for text in texts:
try:
translation = self.translate(text, target_lang, source_lang)
translations.append(translation)
except Exception as e:
logger.error(f"Batch translation failed for one text: {str(e)}")
translations.append(self._fallback_translate(text, target_lang))
return translations
def detect_language(self, text: str) -> str:
"""Simple language detection (basic implementation)"""
try:
# Basic detection using character patterns
if not text:
return 'en'
# Check for Devanagari script (Hindi)
if re.search(r'[\u0900-\u097F]', text):
return 'hi'
# Check for Tamil script
if re.search(r'[\u0B80-\u0BFF]', text):
return 'ta'
# Default to English
return 'en'
except Exception as e:
logger.error(f"Language detection failed: {str(e)}")
return 'en'
def get_supported_languages(self) -> List[str]:
"""Get list of supported languages"""
return list(self.language_codes.keys())
def is_translation_available(self, source_lang: str, target_lang: str) -> bool:
"""Check if translation is available between two languages"""
source_code = self.language_codes.get(source_lang, source_lang.lower()[:2])
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2])
pair_key = f"{source_code}-{target_code}"
return pair_key in self.supported_pairs
def translate_with_confidence(self, text: str, target_lang: str, source_lang: str = 'English') -> Dict[str, any]:
"""Translate text and return result with confidence metrics"""
try:
translated_text = self.translate(text, target_lang, source_lang)
# Simple confidence calculation based on text characteristics
confidence = self._calculate_translation_confidence(text, translated_text, target_lang)
return {
'original_text': text,
'translated_text': translated_text,
'source_language': source_lang,
'target_language': target_lang,
'confidence': confidence,
'method': 'neural_translation' if translated_text != text else 'fallback'
}
except Exception as e:
logger.error(f"Translation with confidence failed: {str(e)}")
return {
'original_text': text,
'translated_text': text,
'source_language': source_lang,
'target_language': target_lang,
'confidence': 0.0,
'method': 'error',
'error': str(e)
}
def _calculate_translation_confidence(self, original: str, translated: str, target_lang: str) -> float:
"""Calculate a simple confidence score for translation"""
try:
# If translation failed (same as original), low confidence
if original == translated and target_lang != 'English':
return 0.2
# If text is very short, moderate confidence
if len(original.split()) < 5:
return 0.7
# If translation is significantly different in length, lower confidence
original_len = len(original.split())
translated_len = len(translated.split())
length_ratio = min(original_len, translated_len) / max(original_len, translated_len)
if length_ratio < 0.5:
return 0.6
elif length_ratio < 0.7:
return 0.8
else:
return 0.9
except Exception as e:
logger.error(f"Confidence calculation failed: {str(e)}")
return 0.5
# Utility functions
def get_language_name(code: str) -> str:
"""Get full language name from code"""
code_to_name = {
'en': 'English',
'hi': 'Hindi',
'ta': 'Tamil'
}
return code_to_name.get(code.lower(), code)
def get_language_code(name: str) -> str:
"""Get language code from name"""
name_to_code = {
'english': 'en',
'hindi': 'hi',
'tamil': 'ta'
}
return name_to_code.get(name.lower(), name.lower()[:2]) |