Spaces:
Sleeping
Sleeping
import logging | |
from typing import Dict, List, Optional | |
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM | |
import torch | |
import re | |
logger = logging.getLogger(__name__) | |
class MultilingualTranslator: | |
"""Multilingual translation with support for Hindi and Tamil""" | |
def __init__(self): | |
self.translators = {} | |
self.language_codes = { | |
'Hindi': 'hi', | |
'Tamil': 'ta', | |
'English': 'en' | |
} | |
# Supported translation pairs | |
self.supported_pairs = { | |
'en-hi': 'Helsinki-NLP/opus-mt-en-hi', | |
'en-ta': 'Helsinki-NLP/opus-mt-en-mul', # Multilingual model for Tamil | |
'hi-en': 'Helsinki-NLP/opus-mt-hi-en', | |
'ta-en': 'Helsinki-NLP/opus-mt-mul-en' | |
} | |
self._initialize_models() | |
logger.info("MultilingualTranslator initialized") | |
def _initialize_models(self): | |
"""Initialize translation models on-demand""" | |
# Don't load all models at startup to save memory | |
# They will be loaded when first needed | |
logger.info("Translation models will be loaded on-demand") | |
def _load_translator(self, source_lang: str, target_lang: str) -> Optional[object]: | |
"""Load a specific translator model""" | |
pair_key = f"{source_lang}-{target_lang}" | |
if pair_key in self.translators: | |
return self.translators[pair_key] | |
try: | |
model_name = self.supported_pairs.get(pair_key) | |
if not model_name: | |
logger.error(f"No model available for {source_lang} -> {target_lang}") | |
return None | |
# Use CPU for Hugging Face Spaces compatibility | |
device = -1 # CPU only | |
translator = pipeline( | |
"translation", | |
model=model_name, | |
device=device, | |
framework="pt" | |
) | |
self.translators[pair_key] = translator | |
logger.info(f"Loaded translator for {source_lang} -> {target_lang}") | |
return translator | |
except Exception as e: | |
logger.error(f"Failed to load translator {pair_key}: {str(e)}") | |
return None | |
def translate(self, text: str, target_lang: str, source_lang: str = 'English') -> str: | |
"""Translate text to target language""" | |
if not text or not text.strip(): | |
return "" | |
# Get language codes | |
source_code = self.language_codes.get(source_lang, 'en') | |
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) | |
# If source and target are the same, return original text | |
if source_code == target_code: | |
return text | |
try: | |
# Load the appropriate translator | |
translator = self._load_translator(source_code, target_code) | |
if not translator: | |
return self._fallback_translate(text, target_lang) | |
# Clean and prepare text | |
cleaned_text = self._prepare_text_for_translation(text) | |
if not cleaned_text: | |
return text | |
# Split long text into chunks for translation | |
if len(cleaned_text.split()) > 200: | |
return self._translate_long_text(cleaned_text, translator) | |
else: | |
return self._translate_chunk(cleaned_text, translator) | |
except Exception as e: | |
logger.error(f"Translation failed: {str(e)}") | |
return self._fallback_translate(text, target_lang) | |
def _translate_chunk(self, text: str, translator) -> str: | |
"""Translate a single chunk of text""" | |
try: | |
result = translator(text, max_length=512) | |
if result and len(result) > 0: | |
translated = result[0].get('translation_text', text) | |
return self._post_process_translation(translated) | |
return text | |
except Exception as e: | |
logger.error(f"Chunk translation failed: {str(e)}") | |
return text | |
def _translate_long_text(self, text: str, translator) -> str: | |
"""Translate long text by splitting into chunks""" | |
try: | |
# Split by sentences | |
sentences = self._split_into_sentences(text) | |
if not sentences: | |
return text | |
translated_sentences = [] | |
current_chunk = "" | |
for sentence in sentences: | |
# If adding this sentence would make chunk too long, translate current chunk | |
if len((current_chunk + " " + sentence).split()) > 150 and current_chunk: | |
translated = self._translate_chunk(current_chunk, translator) | |
translated_sentences.append(translated) | |
current_chunk = sentence | |
else: | |
if current_chunk: | |
current_chunk += " " + sentence | |
else: | |
current_chunk = sentence | |
# Translate remaining chunk | |
if current_chunk: | |
translated = self._translate_chunk(current_chunk, translator) | |
translated_sentences.append(translated) | |
return " ".join(translated_sentences) | |
except Exception as e: | |
logger.error(f"Long text translation failed: {str(e)}") | |
return text | |
def _split_into_sentences(self, text: str) -> List[str]: | |
"""Split text into sentences""" | |
try: | |
# Simple sentence splitting | |
sentences = re.split(r'[.!?]+\s+', text) | |
sentences = [s.strip() for s in sentences if s.strip()] | |
return sentences | |
except Exception as e: | |
logger.error(f"Sentence splitting failed: {str(e)}") | |
return [text] | |
def _prepare_text_for_translation(self, text: str) -> str: | |
"""Prepare text for translation""" | |
if not text: | |
return "" | |
# Remove URLs | |
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) | |
# Remove email addresses | |
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) | |
# Clean excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Remove special characters that might cause issues | |
text = re.sub(r'[^\w\s.,!?;:\-\'"()/%$]', '', text) | |
return text.strip() | |
def _post_process_translation(self, text: str) -> str: | |
"""Post-process translated text""" | |
if not text: | |
return "" | |
# Clean up extra spaces | |
text = re.sub(r'\s+', ' ', text) | |
# Capitalize first letter if it's a sentence | |
if text and len(text) > 1: | |
text = text[0].upper() + text[1:] | |
return text.strip() | |
def _fallback_translate(self, text: str, target_lang: str) -> str: | |
"""Fallback translation with basic text processing""" | |
logger.warning(f"Using fallback translation for {target_lang}") | |
# For demonstration purposes, we'll return the original text with a note | |
# In a production system, you might use a different translation service | |
if target_lang.lower() in ['hindi', 'hi']: | |
return f"[Hindi] {text}" | |
elif target_lang.lower() in ['tamil', 'ta']: | |
return f"[Tamil] {text}" | |
else: | |
return text | |
def batch_translate(self, texts: List[str], target_lang: str, source_lang: str = 'English') -> List[str]: | |
"""Translate multiple texts""" | |
translations = [] | |
for text in texts: | |
try: | |
translation = self.translate(text, target_lang, source_lang) | |
translations.append(translation) | |
except Exception as e: | |
logger.error(f"Batch translation failed for one text: {str(e)}") | |
translations.append(self._fallback_translate(text, target_lang)) | |
return translations | |
def detect_language(self, text: str) -> str: | |
"""Simple language detection (basic implementation)""" | |
try: | |
# Basic detection using character patterns | |
if not text: | |
return 'en' | |
# Check for Devanagari script (Hindi) | |
if re.search(r'[\u0900-\u097F]', text): | |
return 'hi' | |
# Check for Tamil script | |
if re.search(r'[\u0B80-\u0BFF]', text): | |
return 'ta' | |
# Default to English | |
return 'en' | |
except Exception as e: | |
logger.error(f"Language detection failed: {str(e)}") | |
return 'en' | |
def get_supported_languages(self) -> List[str]: | |
"""Get list of supported languages""" | |
return list(self.language_codes.keys()) | |
def is_translation_available(self, source_lang: str, target_lang: str) -> bool: | |
"""Check if translation is available between two languages""" | |
source_code = self.language_codes.get(source_lang, source_lang.lower()[:2]) | |
target_code = self.language_codes.get(target_lang, target_lang.lower()[:2]) | |
pair_key = f"{source_code}-{target_code}" | |
return pair_key in self.supported_pairs | |
def translate_with_confidence(self, text: str, target_lang: str, source_lang: str = 'English') -> Dict[str, any]: | |
"""Translate text and return result with confidence metrics""" | |
try: | |
translated_text = self.translate(text, target_lang, source_lang) | |
# Simple confidence calculation based on text characteristics | |
confidence = self._calculate_translation_confidence(text, translated_text, target_lang) | |
return { | |
'original_text': text, | |
'translated_text': translated_text, | |
'source_language': source_lang, | |
'target_language': target_lang, | |
'confidence': confidence, | |
'method': 'neural_translation' if translated_text != text else 'fallback' | |
} | |
except Exception as e: | |
logger.error(f"Translation with confidence failed: {str(e)}") | |
return { | |
'original_text': text, | |
'translated_text': text, | |
'source_language': source_lang, | |
'target_language': target_lang, | |
'confidence': 0.0, | |
'method': 'error', | |
'error': str(e) | |
} | |
def _calculate_translation_confidence(self, original: str, translated: str, target_lang: str) -> float: | |
"""Calculate a simple confidence score for translation""" | |
try: | |
# If translation failed (same as original), low confidence | |
if original == translated and target_lang != 'English': | |
return 0.2 | |
# If text is very short, moderate confidence | |
if len(original.split()) < 5: | |
return 0.7 | |
# If translation is significantly different in length, lower confidence | |
original_len = len(original.split()) | |
translated_len = len(translated.split()) | |
length_ratio = min(original_len, translated_len) / max(original_len, translated_len) | |
if length_ratio < 0.5: | |
return 0.6 | |
elif length_ratio < 0.7: | |
return 0.8 | |
else: | |
return 0.9 | |
except Exception as e: | |
logger.error(f"Confidence calculation failed: {str(e)}") | |
return 0.5 | |
# Utility functions | |
def get_language_name(code: str) -> str: | |
"""Get full language name from code""" | |
code_to_name = { | |
'en': 'English', | |
'hi': 'Hindi', | |
'ta': 'Tamil' | |
} | |
return code_to_name.get(code.lower(), code) | |
def get_language_code(name: str) -> str: | |
"""Get language code from name""" | |
name_to_code = { | |
'english': 'en', | |
'hindi': 'hi', | |
'tamil': 'ta' | |
} | |
return name_to_code.get(name.lower(), name.lower()[:2]) |