"""Base class for translation provider implementations.""" import logging import re from abc import ABC, abstractmethod from typing import List, TYPE_CHECKING if TYPE_CHECKING: from ...domain.models.translation_request import TranslationRequest from ...domain.models.text_content import TextContent from ...domain.interfaces.translation import ITranslationService from ...domain.exceptions import TranslationFailedException logger = logging.getLogger(__name__) class TranslationProviderBase(ITranslationService, ABC): """Abstract base class for translation provider implementations.""" def __init__(self, provider_name: str, supported_languages: dict[str, list[str]] = None): """ Initialize the translation provider. Args: provider_name: Name of the translation provider supported_languages: Dict mapping source languages to supported target languages """ self.provider_name = provider_name self.supported_languages = supported_languages or {} self.max_chunk_length = 1000 # Default chunk size for text processing def translate(self, request: 'TranslationRequest') -> 'TextContent': """ Translate text from source language to target language. Args: request: The translation request Returns: TextContent: The translated text Raises: TranslationFailedException: If translation fails """ try: logger.info(f"Starting translation with {self.provider_name} provider") logger.info(f"Translating from {request.source_text.language} to {request.target_language}") self._validate_request(request) # Split text into chunks for processing text_chunks = self._chunk_text(request.source_text.text) logger.info(f"Split text into {len(text_chunks)} chunks for processing") # Translate each chunk translated_chunks = [] for i, chunk in enumerate(text_chunks): logger.info(f"Translating chunk {i+1}/{len(text_chunks)}") translated_chunk = self._translate_chunk( chunk, request.source_text.language, request.target_language ) translated_chunks.append(translated_chunk) # Reassemble translated text translated_text = self._reassemble_chunks(translated_chunks) # Create TextContent from translation result from ...domain.models.text_content import TextContent result = TextContent( text=translated_text, language=request.target_language, encoding='utf-8' ) logger.info(f"Translation completed successfully with {self.provider_name}") logger.info(f"Original length: {len(request.source_text.text)}, Translated length: {len(translated_text)}") return result except Exception as e: logger.error(f"Translation failed with {self.provider_name}: {str(e)}") raise TranslationFailedException(f"Translation failed: {str(e)}") from e @abstractmethod def _translate_chunk(self, text: str, source_language: str, target_language: str) -> str: """ Translate a single chunk of text using provider-specific implementation. Args: text: The text chunk to translate source_language: Source language code target_language: Target language code Returns: str: The translated text chunk """ pass @abstractmethod def is_available(self) -> bool: """ Check if the translation provider is available and ready to use. Returns: bool: True if provider is available, False otherwise """ pass @abstractmethod def get_supported_languages(self) -> dict[str, list[str]]: """ Get supported language pairs for this provider. Returns: dict: Mapping of source languages to supported target languages """ pass def _chunk_text(self, text: str) -> List[str]: """ Split text into chunks for translation processing. Args: text: The text to chunk Returns: List[str]: List of text chunks """ if len(text) <= self.max_chunk_length: return [text] chunks = [] current_chunk = "" # Split by sentences first to maintain context sentences = self._split_into_sentences(text) for sentence in sentences: # If adding this sentence would exceed chunk limit if len(current_chunk) + len(sentence) > self.max_chunk_length: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = "" # If single sentence is too long, split by words if len(sentence) > self.max_chunk_length: word_chunks = self._split_long_sentence(sentence) chunks.extend(word_chunks[:-1]) # Add all but last chunk current_chunk = word_chunks[-1] # Start new chunk with last piece else: current_chunk = sentence else: current_chunk += " " + sentence if current_chunk else sentence # Add remaining chunk if current_chunk.strip(): chunks.append(current_chunk.strip()) logger.info(f"Text chunked into {len(chunks)} pieces") return chunks def _split_into_sentences(self, text: str) -> List[str]: """ Split text into sentences using basic punctuation rules. Args: text: The text to split Returns: List[str]: List of sentences """ # Simple sentence splitting using regex # This handles basic cases - more sophisticated NLP libraries could be used sentence_endings = r'[.!?]+\s+' sentences = re.split(sentence_endings, text) # Filter out empty sentences and strip whitespace sentences = [s.strip() for s in sentences if s.strip()] return sentences def _split_long_sentence(self, sentence: str) -> List[str]: """ Split a long sentence into smaller chunks by words. Args: sentence: The sentence to split Returns: List[str]: List of word chunks """ words = sentence.split() chunks = [] current_chunk = "" for word in words: if len(current_chunk) + len(word) + 1 > self.max_chunk_length: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = word else: # Single word is too long, just add it chunks.append(word) else: current_chunk += " " + word if current_chunk else word if current_chunk.strip(): chunks.append(current_chunk.strip()) return chunks def _reassemble_chunks(self, chunks: List[str]) -> str: """ Reassemble translated chunks into a single text. Args: chunks: List of translated text chunks Returns: str: Reassembled text """ # Simple reassembly with space separation # More sophisticated approaches could preserve original formatting return " ".join(chunk.strip() for chunk in chunks if chunk.strip()) def _validate_request(self, request: 'TranslationRequest') -> None: """ Validate the translation request. Args: request: The translation request to validate Raises: TranslationFailedException: If request is invalid """ if not request.source_text.text.strip(): raise TranslationFailedException("Source text cannot be empty") if request.source_text.language == request.target_language: raise TranslationFailedException("Source and target languages cannot be the same") # Check if language pair is supported if self.supported_languages: source_lang = request.source_text.language target_lang = request.target_language if source_lang not in self.supported_languages: raise TranslationFailedException( f"Source language {source_lang} not supported by {self.provider_name}. " f"Supported source languages: {list(self.supported_languages.keys())}" ) if target_lang not in self.supported_languages[source_lang]: raise TranslationFailedException( f"Translation from {source_lang} to {target_lang} not supported by {self.provider_name}. " f"Supported target languages for {source_lang}: {self.supported_languages[source_lang]}" ) def _preprocess_text(self, text: str) -> str: """ Preprocess text before translation. Args: text: The text to preprocess Returns: str: Preprocessed text """ # Basic text preprocessing # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Strip leading/trailing whitespace text = text.strip() return text def _postprocess_text(self, text: str) -> str: """ Postprocess text after translation. Args: text: The text to postprocess Returns: str: Postprocessed text """ # Basic text postprocessing # Remove excessive whitespace text = re.sub(r'\s+', ' ', text) # Strip leading/trailing whitespace text = text.strip() # Fix common spacing issues around punctuation text = re.sub(r'\s+([.!?,:;])', r'\1', text) text = re.sub(r'([.!?])\s*([A-Z])', r'\1 \2', text) return text def _handle_provider_error(self, error: Exception, context: str = "") -> None: """ Handle provider-specific errors and convert to domain exceptions. Args: error: The original error context: Additional context about when the error occurred """ error_msg = f"{self.provider_name} error" if context: error_msg += f" during {context}" error_msg += f": {str(error)}" logger.error(error_msg, exception=error) raise TranslationFailedException(error_msg) from error def set_chunk_size(self, chunk_size: int) -> None: """ Set the maximum chunk size for text processing. Args: chunk_size: Maximum characters per chunk """ if chunk_size <= 0: raise ValueError("Chunk size must be positive") self.max_chunk_length = chunk_size logger.info(f"Chunk size set to {chunk_size} characters") def get_translation_stats(self, request: 'TranslationRequest') -> dict: """ Get statistics about a translation request. Args: request: The translation request Returns: dict: Translation statistics """ text = request.source_text.text chunks = self._chunk_text(text) return { 'provider': self.provider_name, 'source_language': request.source_text.language, 'target_language': request.target_language, 'text_length': len(text), 'word_count': len(text.split()), 'chunk_count': len(chunks), 'max_chunk_length': max(len(chunk) for chunk in chunks) if chunks else 0, 'avg_chunk_length': sum(len(chunk) for chunk in chunks) / len(chunks) if chunks else 0 }