import os import json import re from typing import List, Dict, Any, Optional import pickle from tqdm import tqdm from sentence_transformers import SentenceTransformer class DocumentChunker: def __init__(self, input_dir: str = "data/raw", output_dir: str = "data/processed", embedding_dir: str = "data/embeddings", model_name: str = "BAAI/bge-small-en-v1.5"): self.input_dir = input_dir self.output_dir = output_dir self.embedding_dir = embedding_dir # Create output directories os.makedirs(output_dir, exist_ok=True) os.makedirs(embedding_dir, exist_ok=True) # Load embedding model self.model = SentenceTransformer(model_name) def load_documents(self) -> List[Dict[str, Any]]: """Load all documents from the input directory.""" documents = [] for filename in os.listdir(self.input_dir): if filename.endswith('.json'): filepath = os.path.join(self.input_dir, filename) with open(filepath, 'r') as f: document = json.load(f) documents.append(document) return documents def chunk_by_headings(self, document: Dict[str, Any]) -> List[Dict[str, Any]]: """Split document into chunks based on headings.""" chunks = [] # If no headings, just create a single chunk if not document.get('headings'): chunk = { 'title': document['title'], 'content': document['content'], 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) return chunks # Process document based on headings headings = sorted(document['headings'], key=lambda h: h.get('level', 6)) content = document['content'] # Use headings to split content current_title = document['title'] current_content = "" content_lines = content.split('\n') line_index = 0 for heading in headings: heading_text = heading['text'] # Find the heading in the content heading_found = False for i in range(line_index, len(content_lines)): if heading_text in content_lines[i]: # Save the previous chunk if current_content.strip(): chunk = { 'title': current_title, 'content': current_content.strip(), 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) # Start new chunk current_title = heading_text current_content = "" line_index = i + 1 heading_found = True break if not heading_found: current_content += heading_text + "\n" # Add content until the next heading if line_index < len(content_lines): for i in range(line_index, len(content_lines)): # Check if line contains any of the upcoming headings if any(h['text'] in content_lines[i] for h in headings if h['text'] != heading_text): break current_content += content_lines[i] + "\n" line_index = i + 1 # Add the last chunk if current_content.strip(): chunk = { 'title': current_title, 'content': current_content.strip(), 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) return chunks def chunk_faqs(self, document: Dict[str, Any]) -> List[Dict[str, Any]]: """Extract FAQs as individual chunks.""" chunks = [] if not document.get('faqs'): return chunks for faq in document['faqs']: chunk = { 'title': faq['question'], 'content': faq['answer'], 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': 'faq', 'question': faq['question'] } chunks.append(chunk) return chunks def chunk_semantically(self, document: Dict[str, Any], max_chunk_size: int = 1000, overlap: int = 100) -> List[Dict[str, Any]]: """Split document into fixed-size chunks with overlap.""" chunks = [] content = document['content'] # Skip empty content if not content.strip(): return chunks # Split content by paragraphs paragraphs = re.split(r'\n\s*\n', content) current_chunk = "" current_length = 0 for para in paragraphs: para = para.strip() if not para: continue para_length = len(para) # If paragraph alone exceeds max size, split by sentences if para_length > max_chunk_size: sentences = re.split(r'(?<=[.!?])\s+', para) for sentence in sentences: sentence = sentence.strip() sentence_length = len(sentence) if current_length + sentence_length <= max_chunk_size: current_chunk += sentence + " " current_length += sentence_length + 1 else: # Save current chunk if current_chunk: chunk = { 'title': document['title'], 'content': current_chunk.strip(), 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) # Start new chunk current_chunk = sentence + " " current_length = sentence_length + 1 # Paragraph fits within limit elif current_length + para_length <= max_chunk_size: current_chunk += para + "\n\n" current_length += para_length + 2 # Paragraph doesn't fit, create a new chunk else: # Save current chunk if current_chunk: chunk = { 'title': document['title'], 'content': current_chunk.strip(), 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) # Start new chunk current_chunk = para + "\n\n" current_length = para_length + 2 # Add the last chunk if current_chunk: chunk = { 'title': document['title'], 'content': current_chunk.strip(), 'url': document['url'], 'categories': document.get('categories', []), 'scraped_at': document['scraped_at'], 'document_type': document.get('document_type', 'webpage') } chunks.append(chunk) return chunks def create_chunks(self) -> List[Dict[str, Any]]: """Process all documents and create chunks.""" all_chunks = [] # Load documents documents = self.load_documents() print(f"Loaded {len(documents)} documents") # Process each document for document in tqdm(documents, desc="Chunking documents"): # FAQ chunks faq_chunks = self.chunk_faqs(document) all_chunks.extend(faq_chunks) # Heading-based chunks heading_chunks = self.chunk_by_headings(document) all_chunks.extend(heading_chunks) # Semantic chunks as fallback if not heading_chunks: semantic_chunks = self.chunk_semantically(document) all_chunks.extend(semantic_chunks) # Save chunks to output directory with open(os.path.join(self.output_dir, 'chunks.json'), 'w') as f: json.dump(all_chunks, f, indent=2) print(f"Created {len(all_chunks)} chunks") return all_chunks def create_embeddings(self, chunks: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """Create embeddings for all chunks.""" if chunks is None: # Load chunks if not provided chunks_path = os.path.join(self.output_dir, 'chunks.json') if os.path.exists(chunks_path): with open(chunks_path, 'r') as f: chunks = json.load(f) else: chunks = self.create_chunks() # Prepare texts for embedding texts = [] for chunk in chunks: # For FAQs, combine question and answer if chunk.get('document_type') == 'faq': text = f"{chunk['title']} {chunk['content']}" else: # For regular chunks, use title and content text = f"{chunk['title']} {chunk['content']}" texts.append(text) # Create embeddings print("Creating embeddings...") embeddings = self.model.encode(texts, show_progress_bar=True) # Create mapping of chunk ID to embedding embedding_map = {} for i, chunk in enumerate(chunks): chunk_id = f"chunk_{i}" embedding_map[chunk_id] = { 'embedding': embeddings[i], 'chunk': chunk } # Save embeddings with open(os.path.join(self.embedding_dir, 'embeddings.pkl'), 'wb') as f: pickle.dump(embedding_map, f) print(f"Created embeddings for {len(chunks)} chunks") return embedding_map # Example usage if __name__ == "__main__": chunker = DocumentChunker() chunks = chunker.create_chunks() embedding_map = chunker.create_embeddings(chunks)