teachingAssistant / src /infrastructure /base /translation_provider_base.py
Michael Hu
add more logs
fdc056d
"""Base class for translation provider implementations."""
import logging
import re
from abc import ABC, abstractmethod
from typing import List, TYPE_CHECKING
if TYPE_CHECKING:
from ...domain.models.translation_request import TranslationRequest
from ...domain.models.text_content import TextContent
from ...domain.interfaces.translation import ITranslationService
from ...domain.exceptions import TranslationFailedException
logger = logging.getLogger(__name__)
class TranslationProviderBase(ITranslationService, ABC):
"""Abstract base class for translation provider implementations."""
def __init__(self, provider_name: str, supported_languages: dict[str, list[str]] = None):
"""
Initialize the translation provider.
Args:
provider_name: Name of the translation provider
supported_languages: Dict mapping source languages to supported target languages
"""
self.provider_name = provider_name
self.supported_languages = supported_languages or {}
self.max_chunk_length = 1000 # Default chunk size for text processing
def translate(self, request: 'TranslationRequest') -> 'TextContent':
"""
Translate text from source language to target language.
Args:
request: The translation request
Returns:
TextContent: The translated text
Raises:
TranslationFailedException: If translation fails
"""
try:
logger.info(f"Starting translation with {self.provider_name} provider")
logger.info(f"Translating from {request.source_text.language} to {request.target_language}")
self._validate_request(request)
# Split text into chunks for processing
text_chunks = self._chunk_text(request.source_text.text)
logger.info(f"Split text into {len(text_chunks)} chunks for processing")
# Translate each chunk
translated_chunks = []
for i, chunk in enumerate(text_chunks):
logger.info(f"Translating chunk {i+1}/{len(text_chunks)}")
translated_chunk = self._translate_chunk(
chunk,
request.source_text.language,
request.target_language
)
translated_chunks.append(translated_chunk)
# Reassemble translated text
translated_text = self._reassemble_chunks(translated_chunks)
# Create TextContent from translation result
from ...domain.models.text_content import TextContent
result = TextContent(
text=translated_text,
language=request.target_language,
encoding='utf-8'
)
logger.info(f"Translation completed successfully with {self.provider_name}")
logger.info(f"Original length: {len(request.source_text.text)}, Translated length: {len(translated_text)}")
return result
except Exception as e:
logger.error(f"Translation failed with {self.provider_name}: {str(e)}")
raise TranslationFailedException(f"Translation failed: {str(e)}") from e
@abstractmethod
def _translate_chunk(self, text: str, source_language: str, target_language: str) -> str:
"""
Translate a single chunk of text using provider-specific implementation.
Args:
text: The text chunk to translate
source_language: Source language code
target_language: Target language code
Returns:
str: The translated text chunk
"""
pass
@abstractmethod
def is_available(self) -> bool:
"""
Check if the translation provider is available and ready to use.
Returns:
bool: True if provider is available, False otherwise
"""
pass
@abstractmethod
def get_supported_languages(self) -> dict[str, list[str]]:
"""
Get supported language pairs for this provider.
Returns:
dict: Mapping of source languages to supported target languages
"""
pass
def _chunk_text(self, text: str) -> List[str]:
"""
Split text into chunks for translation processing.
Args:
text: The text to chunk
Returns:
List[str]: List of text chunks
"""
if len(text) <= self.max_chunk_length:
return [text]
chunks = []
current_chunk = ""
# Split by sentences first to maintain context
sentences = self._split_into_sentences(text)
for sentence in sentences:
# If adding this sentence would exceed chunk limit
if len(current_chunk) + len(sentence) > self.max_chunk_length:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
# If single sentence is too long, split by words
if len(sentence) > self.max_chunk_length:
word_chunks = self._split_long_sentence(sentence)
chunks.extend(word_chunks[:-1]) # Add all but last chunk
current_chunk = word_chunks[-1] # Start new chunk with last piece
else:
current_chunk = sentence
else:
current_chunk += " " + sentence if current_chunk else sentence
# Add remaining chunk
if current_chunk.strip():
chunks.append(current_chunk.strip())
logger.info(f"Text chunked into {len(chunks)} pieces")
return chunks
def _split_into_sentences(self, text: str) -> List[str]:
"""
Split text into sentences using basic punctuation rules.
Args:
text: The text to split
Returns:
List[str]: List of sentences
"""
# Simple sentence splitting using regex
# This handles basic cases - more sophisticated NLP libraries could be used
sentence_endings = r'[.!?]+\s+'
sentences = re.split(sentence_endings, text)
# Filter out empty sentences and strip whitespace
sentences = [s.strip() for s in sentences if s.strip()]
return sentences
def _split_long_sentence(self, sentence: str) -> List[str]:
"""
Split a long sentence into smaller chunks by words.
Args:
sentence: The sentence to split
Returns:
List[str]: List of word chunks
"""
words = sentence.split()
chunks = []
current_chunk = ""
for word in words:
if len(current_chunk) + len(word) + 1 > self.max_chunk_length:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = word
else:
# Single word is too long, just add it
chunks.append(word)
else:
current_chunk += " " + word if current_chunk else word
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def _reassemble_chunks(self, chunks: List[str]) -> str:
"""
Reassemble translated chunks into a single text.
Args:
chunks: List of translated text chunks
Returns:
str: Reassembled text
"""
# Simple reassembly with space separation
# More sophisticated approaches could preserve original formatting
return " ".join(chunk.strip() for chunk in chunks if chunk.strip())
def _validate_request(self, request: 'TranslationRequest') -> None:
"""
Validate the translation request.
Args:
request: The translation request to validate
Raises:
TranslationFailedException: If request is invalid
"""
if not request.source_text.text.strip():
raise TranslationFailedException("Source text cannot be empty")
if request.source_text.language == request.target_language:
raise TranslationFailedException("Source and target languages cannot be the same")
# Check if language pair is supported
if self.supported_languages:
source_lang = request.source_text.language
target_lang = request.target_language
if source_lang not in self.supported_languages:
raise TranslationFailedException(
f"Source language {source_lang} not supported by {self.provider_name}. "
f"Supported source languages: {list(self.supported_languages.keys())}"
)
if target_lang not in self.supported_languages[source_lang]:
raise TranslationFailedException(
f"Translation from {source_lang} to {target_lang} not supported by {self.provider_name}. "
f"Supported target languages for {source_lang}: {self.supported_languages[source_lang]}"
)
def _preprocess_text(self, text: str) -> str:
"""
Preprocess text before translation.
Args:
text: The text to preprocess
Returns:
str: Preprocessed text
"""
# Basic text preprocessing
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing whitespace
text = text.strip()
return text
def _postprocess_text(self, text: str) -> str:
"""
Postprocess text after translation.
Args:
text: The text to postprocess
Returns:
str: Postprocessed text
"""
# Basic text postprocessing
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing whitespace
text = text.strip()
# Fix common spacing issues around punctuation
text = re.sub(r'\s+([.!?,:;])', r'\1', text)
text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text)
return text
def _handle_provider_error(self, error: Exception, context: str = "") -> None:
"""
Handle provider-specific errors and convert to domain exceptions.
Args:
error: The original error
context: Additional context about when the error occurred
"""
error_msg = f"{self.provider_name} error"
if context:
error_msg += f" during {context}"
error_msg += f": {str(error)}"
logger.error(error_msg, exception=error)
raise TranslationFailedException(error_msg) from error
def set_chunk_size(self, chunk_size: int) -> None:
"""
Set the maximum chunk size for text processing.
Args:
chunk_size: Maximum characters per chunk
"""
if chunk_size <= 0:
raise ValueError("Chunk size must be positive")
self.max_chunk_length = chunk_size
logger.info(f"Chunk size set to {chunk_size} characters")
def get_translation_stats(self, request: 'TranslationRequest') -> dict:
"""
Get statistics about a translation request.
Args:
request: The translation request
Returns:
dict: Translation statistics
"""
text = request.source_text.text
chunks = self._chunk_text(text)
return {
'provider': self.provider_name,
'source_language': request.source_text.language,
'target_language': request.target_language,
'text_length': len(text),
'word_count': len(text.split()),
'chunk_count': len(chunks),
'max_chunk_length': max(len(chunk) for chunk in chunks) if chunks else 0,
'avg_chunk_length': sum(len(chunk) for chunk in chunks) / len(chunks) if chunks else 0
}