Spaces:
Build error
Build error
"""Base class for translation provider implementations.""" | |
import logging | |
import re | |
from abc import ABC, abstractmethod | |
from typing import List, TYPE_CHECKING | |
if TYPE_CHECKING: | |
from ...domain.models.translation_request import TranslationRequest | |
from ...domain.models.text_content import TextContent | |
from ...domain.interfaces.translation import ITranslationService | |
from ...domain.exceptions import TranslationFailedException | |
logger = logging.getLogger(__name__) | |
class TranslationProviderBase(ITranslationService, ABC): | |
"""Abstract base class for translation provider implementations.""" | |
def __init__(self, provider_name: str, supported_languages: dict[str, list[str]] = None): | |
""" | |
Initialize the translation provider. | |
Args: | |
provider_name: Name of the translation provider | |
supported_languages: Dict mapping source languages to supported target languages | |
""" | |
self.provider_name = provider_name | |
self.supported_languages = supported_languages or {} | |
self.max_chunk_length = 1000 # Default chunk size for text processing | |
def translate(self, request: 'TranslationRequest') -> 'TextContent': | |
""" | |
Translate text from source language to target language. | |
Args: | |
request: The translation request | |
Returns: | |
TextContent: The translated text | |
Raises: | |
TranslationFailedException: If translation fails | |
""" | |
try: | |
logger.info(f"Starting translation with {self.provider_name} provider") | |
logger.info(f"Translating from {request.source_text.language} to {request.target_language}") | |
self._validate_request(request) | |
# Split text into chunks for processing | |
text_chunks = self._chunk_text(request.source_text.text) | |
logger.info(f"Split text into {len(text_chunks)} chunks for processing") | |
# Translate each chunk | |
translated_chunks = [] | |
for i, chunk in enumerate(text_chunks): | |
logger.info(f"Translating chunk {i+1}/{len(text_chunks)}") | |
translated_chunk = self._translate_chunk( | |
chunk, | |
request.source_text.language, | |
request.target_language | |
) | |
translated_chunks.append(translated_chunk) | |
# Reassemble translated text | |
translated_text = self._reassemble_chunks(translated_chunks) | |
# Create TextContent from translation result | |
from ...domain.models.text_content import TextContent | |
result = TextContent( | |
text=translated_text, | |
language=request.target_language, | |
encoding='utf-8' | |
) | |
logger.info(f"Translation completed successfully with {self.provider_name}") | |
logger.info(f"Original length: {len(request.source_text.text)}, Translated length: {len(translated_text)}") | |
return result | |
except Exception as e: | |
logger.error(f"Translation failed with {self.provider_name}: {str(e)}") | |
raise TranslationFailedException(f"Translation failed: {str(e)}") from e | |
def _translate_chunk(self, text: str, source_language: str, target_language: str) -> str: | |
""" | |
Translate a single chunk of text using provider-specific implementation. | |
Args: | |
text: The text chunk to translate | |
source_language: Source language code | |
target_language: Target language code | |
Returns: | |
str: The translated text chunk | |
""" | |
pass | |
def is_available(self) -> bool: | |
""" | |
Check if the translation provider is available and ready to use. | |
Returns: | |
bool: True if provider is available, False otherwise | |
""" | |
pass | |
def get_supported_languages(self) -> dict[str, list[str]]: | |
""" | |
Get supported language pairs for this provider. | |
Returns: | |
dict: Mapping of source languages to supported target languages | |
""" | |
pass | |
def _chunk_text(self, text: str) -> List[str]: | |
""" | |
Split text into chunks for translation processing. | |
Args: | |
text: The text to chunk | |
Returns: | |
List[str]: List of text chunks | |
""" | |
if len(text) <= self.max_chunk_length: | |
return [text] | |
chunks = [] | |
current_chunk = "" | |
# Split by sentences first to maintain context | |
sentences = self._split_into_sentences(text) | |
for sentence in sentences: | |
# If adding this sentence would exceed chunk limit | |
if len(current_chunk) + len(sentence) > self.max_chunk_length: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = "" | |
# If single sentence is too long, split by words | |
if len(sentence) > self.max_chunk_length: | |
word_chunks = self._split_long_sentence(sentence) | |
chunks.extend(word_chunks[:-1]) # Add all but last chunk | |
current_chunk = word_chunks[-1] # Start new chunk with last piece | |
else: | |
current_chunk = sentence | |
else: | |
current_chunk += " " + sentence if current_chunk else sentence | |
# Add remaining chunk | |
if current_chunk.strip(): | |
chunks.append(current_chunk.strip()) | |
logger.info(f"Text chunked into {len(chunks)} pieces") | |
return chunks | |
def _split_into_sentences(self, text: str) -> List[str]: | |
""" | |
Split text into sentences using basic punctuation rules. | |
Args: | |
text: The text to split | |
Returns: | |
List[str]: List of sentences | |
""" | |
# Simple sentence splitting using regex | |
# This handles basic cases - more sophisticated NLP libraries could be used | |
sentence_endings = r'[.!?]+\s+' | |
sentences = re.split(sentence_endings, text) | |
# Filter out empty sentences and strip whitespace | |
sentences = [s.strip() for s in sentences if s.strip()] | |
return sentences | |
def _split_long_sentence(self, sentence: str) -> List[str]: | |
""" | |
Split a long sentence into smaller chunks by words. | |
Args: | |
sentence: The sentence to split | |
Returns: | |
List[str]: List of word chunks | |
""" | |
words = sentence.split() | |
chunks = [] | |
current_chunk = "" | |
for word in words: | |
if len(current_chunk) + len(word) + 1 > self.max_chunk_length: | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
current_chunk = word | |
else: | |
# Single word is too long, just add it | |
chunks.append(word) | |
else: | |
current_chunk += " " + word if current_chunk else word | |
if current_chunk.strip(): | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def _reassemble_chunks(self, chunks: List[str]) -> str: | |
""" | |
Reassemble translated chunks into a single text. | |
Args: | |
chunks: List of translated text chunks | |
Returns: | |
str: Reassembled text | |
""" | |
# Simple reassembly with space separation | |
# More sophisticated approaches could preserve original formatting | |
return " ".join(chunk.strip() for chunk in chunks if chunk.strip()) | |
def _validate_request(self, request: 'TranslationRequest') -> None: | |
""" | |
Validate the translation request. | |
Args: | |
request: The translation request to validate | |
Raises: | |
TranslationFailedException: If request is invalid | |
""" | |
if not request.source_text.text.strip(): | |
raise TranslationFailedException("Source text cannot be empty") | |
if request.source_text.language == request.target_language: | |
raise TranslationFailedException("Source and target languages cannot be the same") | |
# Check if language pair is supported | |
if self.supported_languages: | |
source_lang = request.source_text.language | |
target_lang = request.target_language | |
if source_lang not in self.supported_languages: | |
raise TranslationFailedException( | |
f"Source language {source_lang} not supported by {self.provider_name}. " | |
f"Supported source languages: {list(self.supported_languages.keys())}" | |
) | |
if target_lang not in self.supported_languages[source_lang]: | |
raise TranslationFailedException( | |
f"Translation from {source_lang} to {target_lang} not supported by {self.provider_name}. " | |
f"Supported target languages for {source_lang}: {self.supported_languages[source_lang]}" | |
) | |
def _preprocess_text(self, text: str) -> str: | |
""" | |
Preprocess text before translation. | |
Args: | |
text: The text to preprocess | |
Returns: | |
str: Preprocessed text | |
""" | |
# Basic text preprocessing | |
# Remove excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Strip leading/trailing whitespace | |
text = text.strip() | |
return text | |
def _postprocess_text(self, text: str) -> str: | |
""" | |
Postprocess text after translation. | |
Args: | |
text: The text to postprocess | |
Returns: | |
str: Postprocessed text | |
""" | |
# Basic text postprocessing | |
# Remove excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Strip leading/trailing whitespace | |
text = text.strip() | |
# Fix common spacing issues around punctuation | |
text = re.sub(r'\s+([.!?,:;])', r'\1', text) | |
text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text) | |
return text | |
def _handle_provider_error(self, error: Exception, context: str = "") -> None: | |
""" | |
Handle provider-specific errors and convert to domain exceptions. | |
Args: | |
error: The original error | |
context: Additional context about when the error occurred | |
""" | |
error_msg = f"{self.provider_name} error" | |
if context: | |
error_msg += f" during {context}" | |
error_msg += f": {str(error)}" | |
logger.error(error_msg, exception=error) | |
raise TranslationFailedException(error_msg) from error | |
def set_chunk_size(self, chunk_size: int) -> None: | |
""" | |
Set the maximum chunk size for text processing. | |
Args: | |
chunk_size: Maximum characters per chunk | |
""" | |
if chunk_size <= 0: | |
raise ValueError("Chunk size must be positive") | |
self.max_chunk_length = chunk_size | |
logger.info(f"Chunk size set to {chunk_size} characters") | |
def get_translation_stats(self, request: 'TranslationRequest') -> dict: | |
""" | |
Get statistics about a translation request. | |
Args: | |
request: The translation request | |
Returns: | |
dict: Translation statistics | |
""" | |
text = request.source_text.text | |
chunks = self._chunk_text(text) | |
return { | |
'provider': self.provider_name, | |
'source_language': request.source_text.language, | |
'target_language': request.target_language, | |
'text_length': len(text), | |
'word_count': len(text.split()), | |
'chunk_count': len(chunks), | |
'max_chunk_length': max(len(chunk) for chunk in chunks) if chunks else 0, | |
'avg_chunk_length': sum(len(chunk) for chunk in chunks) / len(chunks) if chunks else 0 | |
} |