|
""" |
|
Semantic chunking for intelligent context segmentation. |
|
""" |
|
|
|
import logging |
|
import uuid |
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
|
from efficient_context.chunking.base import BaseChunker, Chunk |
|
from efficient_context.utils.text import split_into_sentences, calculate_text_overlap |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class SemanticChunker(BaseChunker): |
|
""" |
|
Chunker that creates chunks based on semantic boundaries. |
|
|
|
This chunker aims to keep semantically related content together, unlike |
|
simple token-based chunking that might split content mid-thought. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
chunk_size: int = 512, |
|
chunk_overlap: int = 50, |
|
respect_paragraphs: bool = True, |
|
min_chunk_size: int = 100, |
|
max_chunk_size: int = 1024 |
|
): |
|
""" |
|
Initialize the SemanticChunker. |
|
|
|
Args: |
|
chunk_size: Target size for chunks in tokens (words) |
|
chunk_overlap: Number of tokens to overlap between chunks |
|
respect_paragraphs: Whether to avoid breaking paragraphs across chunks |
|
min_chunk_size: Minimum chunk size in tokens |
|
max_chunk_size: Maximum chunk size in tokens |
|
""" |
|
self.chunk_size = chunk_size |
|
self.chunk_overlap = chunk_overlap |
|
self.respect_paragraphs = respect_paragraphs |
|
self.min_chunk_size = min_chunk_size |
|
self.max_chunk_size = max_chunk_size |
|
|
|
logger.info( |
|
"SemanticChunker initialized with target size: %d tokens, overlap: %d tokens", |
|
chunk_size, chunk_overlap |
|
) |
|
|
|
def _estimate_tokens(self, text: str) -> int: |
|
""" |
|
Estimate the number of tokens in text. |
|
|
|
Args: |
|
text: Text to estimate tokens for |
|
|
|
Returns: |
|
token_count: Estimated number of tokens |
|
""" |
|
|
|
|
|
return len(text.split()) |
|
|
|
def _identify_paragraphs(self, content: str) -> List[str]: |
|
""" |
|
Split content into paragraphs. |
|
|
|
Args: |
|
content: Content to split |
|
|
|
Returns: |
|
paragraphs: List of paragraphs |
|
""" |
|
|
|
paragraphs = [p.strip() for p in content.split("\n\n")] |
|
|
|
|
|
result = [] |
|
current = "" |
|
|
|
for p in paragraphs: |
|
|
|
if not p: |
|
continue |
|
|
|
|
|
lines = p.split("\n") |
|
for line in lines: |
|
if not line.strip(): |
|
if current: |
|
result.append(current) |
|
current = "" |
|
else: |
|
if current: |
|
current += " " + line.strip() |
|
else: |
|
current = line.strip() |
|
|
|
if current: |
|
result.append(current) |
|
current = "" |
|
|
|
|
|
if current: |
|
result.append(current) |
|
|
|
return result if result else [content] |
|
|
|
def _create_semantic_chunks( |
|
self, |
|
paragraphs: List[str], |
|
document_id: Optional[str] = None, |
|
metadata: Optional[Dict[str, Any]] = None |
|
) -> List[Chunk]: |
|
""" |
|
Create chunks from paragraphs respecting semantic boundaries. |
|
|
|
Args: |
|
paragraphs: List of paragraphs to chunk |
|
document_id: Optional ID of the source document |
|
metadata: Optional metadata for the chunks |
|
|
|
Returns: |
|
chunks: List of Chunk objects |
|
""" |
|
chunks = [] |
|
current_chunk_text = "" |
|
current_token_count = 0 |
|
|
|
for paragraph in paragraphs: |
|
paragraph_tokens = self._estimate_tokens(paragraph) |
|
|
|
|
|
if (current_token_count + paragraph_tokens > self.max_chunk_size and |
|
current_token_count >= self.min_chunk_size): |
|
|
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=current_chunk_text.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
|
|
|
|
if self.chunk_overlap > 0 and current_chunk_text: |
|
|
|
words = current_chunk_text.split() |
|
overlap_text = " ".join(words[-min(self.chunk_overlap, len(words)):]) |
|
current_chunk_text = overlap_text + " " + paragraph |
|
current_token_count = self._estimate_tokens(current_chunk_text) |
|
else: |
|
|
|
current_chunk_text = paragraph |
|
current_token_count = paragraph_tokens |
|
|
|
elif paragraph_tokens > self.max_chunk_size: |
|
|
|
if current_chunk_text: |
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=current_chunk_text.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
current_chunk_text = "" |
|
current_token_count = 0 |
|
|
|
|
|
sentences = split_into_sentences(paragraph) |
|
sentence_chunk = "" |
|
sentence_token_count = 0 |
|
|
|
for sentence in sentences: |
|
sentence_tokens = self._estimate_tokens(sentence) |
|
|
|
|
|
if (sentence_token_count + sentence_tokens > self.max_chunk_size and |
|
sentence_token_count >= self.min_chunk_size): |
|
|
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=sentence_chunk.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
|
|
|
|
if self.chunk_overlap > 0 and sentence_chunk: |
|
words = sentence_chunk.split() |
|
overlap_text = " ".join(words[-min(self.chunk_overlap, len(words)):]) |
|
sentence_chunk = overlap_text + " " + sentence |
|
sentence_token_count = self._estimate_tokens(sentence_chunk) |
|
else: |
|
sentence_chunk = sentence |
|
sentence_token_count = sentence_tokens |
|
else: |
|
|
|
if sentence_chunk: |
|
sentence_chunk += " " + sentence |
|
else: |
|
sentence_chunk = sentence |
|
sentence_token_count += sentence_tokens |
|
|
|
|
|
if sentence_chunk: |
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=sentence_chunk.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
else: |
|
|
|
if current_chunk_text: |
|
current_chunk_text += " " + paragraph |
|
else: |
|
current_chunk_text = paragraph |
|
current_token_count += paragraph_tokens |
|
|
|
|
|
if current_token_count >= self.chunk_size: |
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=current_chunk_text.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
|
|
|
|
if self.chunk_overlap > 0: |
|
words = current_chunk_text.split() |
|
current_chunk_text = " ".join(words[-min(self.chunk_overlap, len(words)):]) |
|
current_token_count = self._estimate_tokens(current_chunk_text) |
|
else: |
|
current_chunk_text = "" |
|
current_token_count = 0 |
|
|
|
|
|
if current_chunk_text and current_token_count >= self.min_chunk_size: |
|
chunk_id = str(uuid.uuid4()) |
|
chunk = Chunk( |
|
content=current_chunk_text.strip(), |
|
chunk_id=chunk_id, |
|
document_id=document_id, |
|
metadata=metadata |
|
) |
|
chunks.append(chunk) |
|
|
|
return chunks |
|
|
|
def chunk( |
|
self, |
|
content: str, |
|
metadata: Optional[Dict[str, Any]] = None, |
|
document_id: Optional[str] = None |
|
) -> List[Chunk]: |
|
""" |
|
Split content into semantic chunks. |
|
|
|
Args: |
|
content: Content to be chunked |
|
metadata: Optional metadata to associate with chunks |
|
document_id: Optional document ID to associate with chunks |
|
|
|
Returns: |
|
chunks: List of Chunk objects |
|
""" |
|
if not content.strip(): |
|
return [] |
|
|
|
|
|
if self.respect_paragraphs: |
|
paragraphs = self._identify_paragraphs(content) |
|
else: |
|
|
|
paragraphs = [content] |
|
|
|
|
|
chunks = self._create_semantic_chunks(paragraphs, document_id, metadata) |
|
|
|
logger.info("Created %d chunks from content", len(chunks)) |
|
return chunks |
|
|