|
import logging |
|
from typing import Dict, List, Optional |
|
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM |
|
import torch |
|
import re |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class MultilingualTranslator: |
|
"""Multilingual translation with support for Hindi and Tamil""" |
|
|
|
def __init__(self): |
|
self.translators = {} |
|
self.language_codes = { |
|
'Hindi': 'hi', |
|
'Tamil': 'ta', |
|
'English': 'en' |
|
} |
|
|
|
|
|
self.supported_pairs = { |
|
'en-hi': 'Helsinki-NLP/opus-mt-en-hi', |
|
'en-ta': 'Helsinki-NLP/opus-mt-en-mul', |
|
'hi-en': 'Helsinki-NLP/opus-mt-hi-en', |
|
'ta-en': 'Helsinki-NLP/opus-mt-mul-en' |
|
} |
|
|
|
self._initialize_models() |
|
logger.info("MultilingualTranslator initialized") |
|
|
|
def _initialize_models(self): |
|
"""Initialize translation models on-demand""" |
|
|
|
|
|
logger.info("Translation models will be loaded on-demand") |
|
|
|
def _load_translator(self, source_lang: str, target_lang: str) -> Optional[object]: |
|
"""Load a specific translator model""" |
|
pair_key = f"{source_lang}-{target_lang}" |
|
|
|
if pair_key in self.translators: |
|
return self.translators[pair_key] |
|
|
|
try: |
|
model_name = self.supported_pairs.get(pair_key) |
|
if not model_name: |
|
logger.error(f"No model available for {source_lang} -> {target_lang}") |
|
return None |
|
|
|
|
|
device = -1 |
|
|
|
translator = pipeline( |
|
"translation", |
|
model=model_name, |
|
device=device, |
|
framework="pt" |
|
) |
|
|
|
self.translators[pair_key] = translator |
|
logger.info(f"Loaded translator for {source_lang} -> {target_lang}") |
|
|
|
return translator |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to load translator {pair_key}: {str(e)}") |
|
return None |
|
|
|
def translate(self, text: str, target_lang: str, source_lang: str = 'English') -> str: |
|
"""Translate text to target language""" |
|
if not text or not text.strip(): |
|
return "" |
|
|
|
|
|
source_code = self.language_codes.get(source_lang, 'en') |
|
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) |
|
|
|
|
|
if source_code == target_code: |
|
return text |
|
|
|
try: |
|
|
|
translator = self._load_translator(source_code, target_code) |
|
|
|
if not translator: |
|
return self._fallback_translate(text, target_lang) |
|
|
|
|
|
cleaned_text = self._prepare_text_for_translation(text) |
|
|
|
if not cleaned_text: |
|
return text |
|
|
|
|
|
if len(cleaned_text.split()) > 200: |
|
return self._translate_long_text(cleaned_text, translator) |
|
else: |
|
return self._translate_chunk(cleaned_text, translator) |
|
|
|
except Exception as e: |
|
logger.error(f"Translation failed: {str(e)}") |
|
return self._fallback_translate(text, target_lang) |
|
|
|
def _translate_chunk(self, text: str, translator) -> str: |
|
"""Translate a single chunk of text""" |
|
try: |
|
result = translator(text, max_length=512) |
|
|
|
if result and len(result) > 0: |
|
translated = result[0].get('translation_text', text) |
|
return self._post_process_translation(translated) |
|
|
|
return text |
|
|
|
except Exception as e: |
|
logger.error(f"Chunk translation failed: {str(e)}") |
|
return text |
|
|
|
def _translate_long_text(self, text: str, translator) -> str: |
|
"""Translate long text by splitting into chunks""" |
|
try: |
|
|
|
sentences = self._split_into_sentences(text) |
|
|
|
if not sentences: |
|
return text |
|
|
|
translated_sentences = [] |
|
current_chunk = "" |
|
|
|
for sentence in sentences: |
|
|
|
if len((current_chunk + " " + sentence).split()) > 150 and current_chunk: |
|
translated = self._translate_chunk(current_chunk, translator) |
|
translated_sentences.append(translated) |
|
current_chunk = sentence |
|
else: |
|
if current_chunk: |
|
current_chunk += " " + sentence |
|
else: |
|
current_chunk = sentence |
|
|
|
|
|
if current_chunk: |
|
translated = self._translate_chunk(current_chunk, translator) |
|
translated_sentences.append(translated) |
|
|
|
return " ".join(translated_sentences) |
|
|
|
except Exception as e: |
|
logger.error(f"Long text translation failed: {str(e)}") |
|
return text |
|
|
|
def _split_into_sentences(self, text: str) -> List[str]: |
|
"""Split text into sentences""" |
|
try: |
|
|
|
sentences = re.split(r'[.!?]+\s+', text) |
|
sentences = [s.strip() for s in sentences if s.strip()] |
|
|
|
return sentences |
|
|
|
except Exception as e: |
|
logger.error(f"Sentence splitting failed: {str(e)}") |
|
return [text] |
|
|
|
def _prepare_text_for_translation(self, text: str) -> str: |
|
"""Prepare text for translation""" |
|
if not text: |
|
return "" |
|
|
|
|
|
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) |
|
|
|
|
|
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'[^\w\s.,!?;:\-\'"()/%$]', '', text) |
|
|
|
return text.strip() |
|
|
|
def _post_process_translation(self, text: str) -> str: |
|
"""Post-process translated text""" |
|
if not text: |
|
return "" |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
if text and len(text) > 1: |
|
text = text[0].upper() + text[1:] |
|
|
|
return text.strip() |
|
|
|
def _fallback_translate(self, text: str, target_lang: str) -> str: |
|
"""Fallback translation with basic text processing""" |
|
logger.warning(f"Using fallback translation for {target_lang}") |
|
|
|
|
|
|
|
if target_lang.lower() in ['hindi', 'hi']: |
|
return f"[Hindi] {text}" |
|
elif target_lang.lower() in ['tamil', 'ta']: |
|
return f"[Tamil] {text}" |
|
else: |
|
return text |
|
|
|
def batch_translate(self, texts: List[str], target_lang: str, source_lang: str = 'English') -> List[str]: |
|
"""Translate multiple texts""" |
|
translations = [] |
|
|
|
for text in texts: |
|
try: |
|
translation = self.translate(text, target_lang, source_lang) |
|
translations.append(translation) |
|
except Exception as e: |
|
logger.error(f"Batch translation failed for one text: {str(e)}") |
|
translations.append(self._fallback_translate(text, target_lang)) |
|
|
|
return translations |
|
|
|
def detect_language(self, text: str) -> str: |
|
"""Simple language detection (basic implementation)""" |
|
try: |
|
|
|
if not text: |
|
return 'en' |
|
|
|
|
|
if re.search(r'[\u0900-\u097F]', text): |
|
return 'hi' |
|
|
|
|
|
if re.search(r'[\u0B80-\u0BFF]', text): |
|
return 'ta' |
|
|
|
|
|
return 'en' |
|
|
|
except Exception as e: |
|
logger.error(f"Language detection failed: {str(e)}") |
|
return 'en' |
|
|
|
def get_supported_languages(self) -> List[str]: |
|
"""Get list of supported languages""" |
|
return list(self.language_codes.keys()) |
|
|
|
def is_translation_available(self, source_lang: str, target_lang: str) -> bool: |
|
"""Check if translation is available between two languages""" |
|
source_code = self.language_codes.get(source_lang, source_lang.lower()[:2]) |
|
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) |
|
|
|
pair_key = f"{source_code}-{target_code}" |
|
return pair_key in self.supported_pairs |
|
|
|
def translate_with_confidence(self, text: str, target_lang: str, source_lang: str = 'English') -> Dict[str, any]: |
|
"""Translate text and return result with confidence metrics""" |
|
try: |
|
translated_text = self.translate(text, target_lang, source_lang) |
|
|
|
|
|
confidence = self._calculate_translation_confidence(text, translated_text, target_lang) |
|
|
|
return { |
|
'original_text': text, |
|
'translated_text': translated_text, |
|
'source_language': source_lang, |
|
'target_language': target_lang, |
|
'confidence': confidence, |
|
'method': 'neural_translation' if translated_text != text else 'fallback' |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Translation with confidence failed: {str(e)}") |
|
return { |
|
'original_text': text, |
|
'translated_text': text, |
|
'source_language': source_lang, |
|
'target_language': target_lang, |
|
'confidence': 0.0, |
|
'method': 'error', |
|
'error': str(e) |
|
} |
|
|
|
def _calculate_translation_confidence(self, original: str, translated: str, target_lang: str) -> float: |
|
"""Calculate a simple confidence score for translation""" |
|
try: |
|
|
|
if original == translated and target_lang != 'English': |
|
return 0.2 |
|
|
|
|
|
if len(original.split()) < 5: |
|
return 0.7 |
|
|
|
|
|
original_len = len(original.split()) |
|
translated_len = len(translated.split()) |
|
|
|
length_ratio = min(original_len, translated_len) / max(original_len, translated_len) |
|
|
|
if length_ratio < 0.5: |
|
return 0.6 |
|
elif length_ratio < 0.7: |
|
return 0.8 |
|
else: |
|
return 0.9 |
|
|
|
except Exception as e: |
|
logger.error(f"Confidence calculation failed: {str(e)}") |
|
return 0.5 |
|
|
|
|
|
def get_language_name(code: str) -> str: |
|
"""Get full language name from code""" |
|
code_to_name = { |
|
'en': 'English', |
|
'hi': 'Hindi', |
|
'ta': 'Tamil' |
|
} |
|
return code_to_name.get(code.lower(), code) |
|
|
|
def get_language_code(name: str) -> str: |
|
"""Get language code from name""" |
|
name_to_code = { |
|
'english': 'en', |
|
'hindi': 'hi', |
|
'tamil': 'ta' |
|
} |
|
return name_to_code.get(name.lower(), name.lower()[:2]) |