Spaces:
Sleeping
Sleeping
""" | |
Text Extractor Module | |
This module is responsible for cleaning, normalizing, and chunking text | |
from various sources with complete NLP functionality. | |
Technologies: NLTK, spaCy, regex, langdetect | |
""" | |
import re | |
import logging | |
from datetime import datetime | |
from typing import Dict, List, Any, Optional, Union | |
import unicodedata | |
# Import NLP libraries | |
try: | |
import nltk | |
from nltk.tokenize import sent_tokenize, word_tokenize | |
from nltk.corpus import stopwords | |
from nltk.stem import PorterStemmer | |
import spacy | |
from langdetect import detect | |
from langdetect.lang_detect_exception import LangDetectException as LangDetectError | |
# Download required NLTK data | |
try: | |
nltk.data.find("tokenizers/punkt") | |
except LookupError: | |
nltk.download("punkt", quiet=True) | |
try: | |
nltk.data.find("corpora/stopwords") | |
except LookupError: | |
nltk.download("stopwords", quiet=True) | |
except ImportError as e: | |
logging.warning(f"Some NLP libraries are not installed: {e}") | |
from utils.error_handler import error_handler, ErrorType | |
class TextExtractor: | |
""" | |
Cleans, normalizes, and chunks text from various sources with intelligent processing. | |
Features: | |
- Advanced text cleaning and normalization | |
- Language detection | |
- Intelligent sentence segmentation | |
- Smart text chunking with overlap | |
- Metadata preservation | |
""" | |
def __init__(self, config: Optional[Dict[str, Any]] = None): | |
""" | |
Initialize the TextExtractor with configuration. | |
Args: | |
config: Configuration dictionary with processing parameters | |
""" | |
self.config = config or {} | |
self.logger = logging.getLogger(__name__) | |
# Configuration settings | |
self.chunk_size = self.config.get("chunk_size", 1000) | |
self.chunk_overlap = self.config.get("chunk_overlap", 200) | |
self.min_chunk_size = self.config.get("min_chunk_size", 100) | |
self.max_chunk_size = self.config.get("max_chunk_size", 2000) | |
# NLP settings | |
self.enable_language_detection = self.config.get( | |
"enable_language_detection", True | |
) | |
self.preserve_formatting = self.config.get("preserve_formatting", False) | |
self.remove_stopwords = self.config.get("remove_stopwords", False) | |
# Initialize NLP components | |
self.nlp = None | |
self.stemmer = None | |
self.stop_words = set() | |
self._initialize_nlp_components() | |
def _initialize_nlp_components(self): | |
"""Initialize NLP components with error handling.""" | |
try: | |
# Load spaCy model for advanced processing | |
self.nlp = spacy.load("en_core_web_sm") | |
self.logger.info("spaCy model loaded successfully") | |
except Exception as e: | |
self.logger.warning(f"Could not load spaCy model: {str(e)}") | |
try: | |
# Initialize NLTK components | |
self.stemmer = PorterStemmer() | |
self.stop_words = set(stopwords.words("english")) | |
self.logger.info("NLTK components initialized") | |
except Exception as e: | |
self.logger.warning(f"Could not initialize NLTK components: {str(e)}") | |
def process_text( | |
self, | |
text: Union[str, List[str]], | |
metadata: Optional[Dict[str, Any]] = None, | |
preserve_structure: bool = False, | |
) -> List[Dict[str, Any]]: | |
""" | |
Process text by cleaning, normalizing, and chunking with intelligence. | |
Args: | |
text: Raw text content (string or list of strings) | |
metadata: Optional metadata to include with each chunk | |
preserve_structure: Whether to preserve original text structure | |
Returns: | |
List of dictionaries containing processed text chunks and metadata | |
""" | |
if not text: | |
return [] | |
# Convert list to string if needed | |
if isinstance(text, list): | |
text = "\n".join(str(item) for item in text if item) | |
if not text.strip(): | |
return [] | |
self.logger.info(f"Processing text: {len(text)} characters") | |
# Detect language | |
language = self._detect_language(text) | |
# Clean and normalize the text | |
cleaned_text = self._clean_text(text, preserve_structure) | |
if len(cleaned_text.strip()) < self.min_chunk_size: | |
self.logger.warning( | |
f"Text too short after cleaning: {len(cleaned_text)} chars" | |
) | |
return [] | |
# Split text into chunks | |
chunks = self._chunk_text(cleaned_text) | |
# Prepare result with enhanced metadata | |
result = [] | |
base_metadata = metadata.copy() if metadata else {} | |
base_metadata.update( | |
{ | |
"language": language, | |
"original_length": len(text), | |
"cleaned_length": len(cleaned_text), | |
"chunk_count": len(chunks), | |
"processing_time": datetime.now().isoformat(), | |
"chunk_size_config": self.chunk_size, | |
"chunk_overlap_config": self.chunk_overlap, | |
} | |
) | |
for i, chunk in enumerate(chunks): | |
chunk_metadata = base_metadata.copy() | |
chunk_stats = self._analyze_chunk(chunk) | |
chunk_metadata.update( | |
{ | |
"chunk_index": i, | |
"chunk_id": f"chunk_{i}_{hash(chunk) % 10000}", | |
**chunk_stats, | |
} | |
) | |
result.append({"content": chunk, "metadata": chunk_metadata}) | |
self.logger.info(f"Processed text into {len(chunks)} chunks") | |
return result | |
def _detect_language(self, text: str) -> str: | |
""" | |
Detect the language of the text. | |
Args: | |
text: Text to analyze | |
Returns: | |
Language code (e.g., 'en', 'es', 'fr') | |
""" | |
if not self.enable_language_detection: | |
return "en" # Default to English | |
try: | |
# Use a sample of text for detection (first 1000 chars) | |
sample = text[:1000].strip() | |
if len(sample) < 50: # Too short for reliable detection | |
return "en" | |
language = detect(sample) | |
self.logger.info(f"Detected language: {language}") | |
return language | |
except (LangDetectError, Exception) as e: | |
self.logger.warning(f"Language detection failed: {str(e)}") | |
return "en" # Default to English | |
def _clean_text(self, text: str, preserve_structure: bool = False) -> str: | |
""" | |
Clean and normalize text with advanced processing. | |
Args: | |
text: Raw text to clean | |
preserve_structure: Whether to preserve formatting | |
Returns: | |
Cleaned and normalized text | |
""" | |
# Unicode normalization | |
text = unicodedata.normalize("NFKC", text) | |
if not preserve_structure: | |
# Basic cleaning operations | |
# Remove excessive whitespace but preserve paragraph breaks | |
text = re.sub(r"[ \t]+", " ", text) # Multiple spaces/tabs to single space | |
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text) # Multiple newlines to double | |
# Remove or normalize special characters | |
# Keep basic punctuation and common symbols | |
text = re.sub(r'[^\w\s.,;:!?\'"\-()[\]{}/@#$%&*+=<>|\\~`\n]', " ", text) | |
# Clean up whitespace again | |
text = re.sub(r"[ \t]+", " ", text) | |
text = re.sub(r"\n\s*\n+", "\n\n", text) | |
# Remove common artifacts | |
# Remove page numbers and headers/footers patterns | |
text = re.sub(r"\n\s*\d+\s*\n", "\n", text) # Standalone page numbers | |
text = re.sub(r"\n\s*Page \d+.*?\n", "\n", text, flags=re.IGNORECASE) | |
# Remove excessive punctuation | |
text = re.sub(r"[.]{3,}", "...", text) # Multiple dots | |
text = re.sub(r"[-]{3,}", "---", text) # Multiple dashes | |
# Final cleanup | |
text = text.strip() | |
return text | |
def _chunk_text(self, text: str) -> List[str]: | |
""" | |
Split text into chunks with intelligent boundary detection. | |
Args: | |
text: Cleaned text to chunk | |
Returns: | |
List of text chunks | |
""" | |
if len(text) <= self.chunk_size: | |
return [text] | |
chunks = [] | |
# Try intelligent chunking with spaCy first | |
if self.nlp: | |
try: | |
return self._chunk_with_spacy(text) | |
except Exception as e: | |
self.logger.warning(f"spaCy chunking failed: {str(e)}") | |
# Fallback to NLTK sentence-based chunking | |
try: | |
return self._chunk_with_sentences(text) | |
except Exception as e: | |
self.logger.warning(f"Sentence chunking failed: {str(e)}") | |
# Final fallback to character-based chunking | |
return self._chunk_by_characters(text) | |
def _chunk_with_spacy(self, text: str) -> List[str]: | |
""" | |
Intelligent chunking using spaCy for better semantic boundaries. | |
Args: | |
text: Text to chunk | |
Returns: | |
List of text chunks | |
""" | |
doc = self.nlp(text) | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for sent in doc.sents: | |
sent_text = sent.text.strip() | |
sent_size = len(sent_text) | |
# 📏 Check if adding this sentence exceeds chunk size | |
if current_size + sent_size > self.chunk_size and current_chunk: | |
# 📦 Finalize current chunk | |
chunk_text = " ".join(current_chunk) | |
chunks.append(chunk_text) | |
# Start new chunk with overlap | |
overlap_chunk, overlap_size = self._create_overlap(current_chunk) | |
current_chunk = overlap_chunk | |
current_size = overlap_size | |
current_chunk.append(sent_text) | |
current_size += sent_size | |
# 📦 Add the last chunk | |
if current_chunk: | |
chunk_text = " ".join(current_chunk) | |
if len(chunk_text.strip()) >= self.min_chunk_size: | |
chunks.append(chunk_text) | |
return chunks | |
def _chunk_with_sentences(self, text: str) -> List[str]: | |
""" | |
Chunk text using NLTK sentence tokenization. | |
Args: | |
text: Text to chunk | |
Returns: | |
List of text chunks | |
""" | |
sentences = sent_tokenize(text) | |
chunks = [] | |
current_chunk = [] | |
current_size = 0 | |
for sentence in sentences: | |
sentence = sentence.strip() | |
sentence_size = len(sentence) | |
# 📏 Check chunk size limit | |
if current_size + sentence_size > self.chunk_size and current_chunk: | |
# 📦 Finalize current chunk | |
chunk_text = " ".join(current_chunk) | |
chunks.append(chunk_text) | |
# Create overlap | |
overlap_chunk, overlap_size = self._create_overlap(current_chunk) | |
current_chunk = overlap_chunk | |
current_size = overlap_size | |
current_chunk.append(sentence) | |
current_size += sentence_size | |
# 📦 Add final chunk | |
if current_chunk: | |
chunk_text = " ".join(current_chunk) | |
if len(chunk_text.strip()) >= self.min_chunk_size: | |
chunks.append(chunk_text) | |
return chunks | |
def _chunk_by_characters(self, text: str) -> List[str]: | |
""" | |
Fallback character-based chunking with boundary detection. | |
Args: | |
text: Text to chunk | |
Returns: | |
List of text chunks | |
""" | |
chunks = [] | |
start = 0 | |
while start < len(text): | |
end = start + self.chunk_size | |
# Try to find a good boundary | |
if end < len(text): | |
# Look for sentence boundaries first | |
for boundary in [". ", "! ", "? ", "\n\n", "\n", ". "]: | |
boundary_pos = text.rfind(boundary, start, end) | |
if boundary_pos > start + self.min_chunk_size: | |
end = boundary_pos + len(boundary) | |
break | |
chunk = text[start:end].strip() | |
if len(chunk) >= self.min_chunk_size: | |
chunks.append(chunk) | |
# Move start position with overlap | |
start = max(start + 1, end - self.chunk_overlap) | |
return chunks | |
def _create_overlap(self, sentences: List[str]) -> tuple: | |
""" | |
Create overlap from previous chunk sentences. | |
Args: | |
sentences: List of sentences from previous chunk | |
Returns: | |
Tuple of (overlap_sentences, overlap_size) | |
""" | |
overlap_sentences = [] | |
overlap_size = 0 | |
# Add sentences from the end for overlap | |
for sentence in reversed(sentences): | |
if overlap_size + len(sentence) <= self.chunk_overlap: | |
overlap_sentences.insert(0, sentence) | |
overlap_size += len(sentence) | |
else: | |
break | |
return overlap_sentences, overlap_size | |
def _analyze_chunk(self, chunk: str) -> Dict[str, Any]: | |
""" | |
Analyze chunk statistics and properties. | |
Args: | |
chunk: Text chunk to analyze | |
Returns: | |
Dictionary with chunk statistics | |
""" | |
words = chunk.split() | |
stats = { | |
"character_count": len(chunk), | |
"word_count": len(words), | |
"sentence_count": len(sent_tokenize(chunk)) if chunk else 0, | |
"avg_word_length": ( | |
sum(len(word) for word in words) / len(words) if words else 0 | |
), | |
} | |
# Advanced analysis with spaCy if available | |
if self.nlp: | |
try: | |
doc = self.nlp(chunk) | |
stats.update( | |
{ | |
"entity_count": len(doc.ents), | |
"noun_count": len( | |
[token for token in doc if token.pos_ == "NOUN"] | |
), | |
"verb_count": len( | |
[token for token in doc if token.pos_ == "VERB"] | |
), | |
} | |
) | |
except Exception: | |
pass # Skip advanced analysis if it fails | |
return stats | |
def extract_keywords(self, text: str, max_keywords: int = 10) -> List[str]: | |
""" | |
Extract keywords from text using NLP techniques. | |
Args: | |
text: Text to extract keywords from | |
max_keywords: Maximum number of keywords to return | |
Returns: | |
List of extracted keywords | |
""" | |
if not self.nlp: | |
return [] | |
try: | |
doc = self.nlp(text) | |
# Extract keywords based on POS tags and frequency | |
keywords = [] | |
word_freq = {} | |
for token in doc: | |
if ( | |
token.pos_ in ["NOUN", "PROPN", "ADJ"] | |
and not token.is_stop | |
and not token.is_punct | |
and len(token.text) > 2 | |
): | |
word = token.lemma_.lower() | |
word_freq[word] = word_freq.get(word, 0) + 1 | |
# Sort by frequency and return top keywords | |
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True) | |
keywords = [word for word, freq in sorted_words[:max_keywords]] | |
return keywords | |
except Exception as e: | |
self.logger.warning(f"Keyword extraction failed: {str(e)}") | |
return [] | |
def get_text_statistics(self, text: str) -> Dict[str, Any]: | |
""" | |
Get comprehensive text statistics. | |
Args: | |
text: Text to analyze | |
Returns: | |
Dictionary with text statistics | |
""" | |
words = text.split() | |
sentences = sent_tokenize(text) if text else [] | |
stats = { | |
"character_count": len(text), | |
"word_count": len(words), | |
"sentence_count": len(sentences), | |
"paragraph_count": len([p for p in text.split("\n\n") if p.strip()]), | |
"avg_words_per_sentence": len(words) / len(sentences) if sentences else 0, | |
"avg_chars_per_word": ( | |
sum(len(word) for word in words) / len(words) if words else 0 | |
), | |
"language": self._detect_language(text), | |
} | |
return stats | |