sivakum4's picture
Feat: HF Inference API
9108a9a
import os
import json
import re
from typing import List, Dict, Any, Optional
import pickle
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
class DocumentChunker:
def __init__(self, input_dir: str = "data/raw",
output_dir: str = "data/processed",
embedding_dir: str = "data/embeddings",
model_name: str = "BAAI/bge-small-en-v1.5"):
self.input_dir = input_dir
self.output_dir = output_dir
self.embedding_dir = embedding_dir
# Create output directories
os.makedirs(output_dir, exist_ok=True)
os.makedirs(embedding_dir, exist_ok=True)
# Load embedding model
self.model = SentenceTransformer(model_name)
def load_documents(self) -> List[Dict[str, Any]]:
"""Load all documents from the input directory."""
documents = []
for filename in os.listdir(self.input_dir):
if filename.endswith('.json'):
filepath = os.path.join(self.input_dir, filename)
with open(filepath, 'r') as f:
document = json.load(f)
documents.append(document)
return documents
def chunk_by_headings(self, document: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Split document into chunks based on headings."""
chunks = []
# If no headings, just create a single chunk
if not document.get('headings'):
chunk = {
'title': document['title'],
'content': document['content'],
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
return chunks
# Process document based on headings
headings = sorted(document['headings'], key=lambda h: h.get('level', 6))
content = document['content']
# Use headings to split content
current_title = document['title']
current_content = ""
content_lines = content.split('\n')
line_index = 0
for heading in headings:
heading_text = heading['text']
# Find the heading in the content
heading_found = False
for i in range(line_index, len(content_lines)):
if heading_text in content_lines[i]:
# Save the previous chunk
if current_content.strip():
chunk = {
'title': current_title,
'content': current_content.strip(),
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
# Start new chunk
current_title = heading_text
current_content = ""
line_index = i + 1
heading_found = True
break
if not heading_found:
current_content += heading_text + "\n"
# Add content until the next heading
if line_index < len(content_lines):
for i in range(line_index, len(content_lines)):
# Check if line contains any of the upcoming headings
if any(h['text'] in content_lines[i] for h in headings if h['text'] != heading_text):
break
current_content += content_lines[i] + "\n"
line_index = i + 1
# Add the last chunk
if current_content.strip():
chunk = {
'title': current_title,
'content': current_content.strip(),
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
return chunks
def chunk_faqs(self, document: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Extract FAQs as individual chunks."""
chunks = []
if not document.get('faqs'):
return chunks
for faq in document['faqs']:
chunk = {
'title': faq['question'],
'content': faq['answer'],
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': 'faq',
'question': faq['question']
}
chunks.append(chunk)
return chunks
def chunk_semantically(self, document: Dict[str, Any],
max_chunk_size: int = 1000,
overlap: int = 100) -> List[Dict[str, Any]]:
"""Split document into fixed-size chunks with overlap."""
chunks = []
content = document['content']
# Skip empty content
if not content.strip():
return chunks
# Split content by paragraphs
paragraphs = re.split(r'\n\s*\n', content)
current_chunk = ""
current_length = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
para_length = len(para)
# If paragraph alone exceeds max size, split by sentences
if para_length > max_chunk_size:
sentences = re.split(r'(?<=[.!?])\s+', para)
for sentence in sentences:
sentence = sentence.strip()
sentence_length = len(sentence)
if current_length + sentence_length <= max_chunk_size:
current_chunk += sentence + " "
current_length += sentence_length + 1
else:
# Save current chunk
if current_chunk:
chunk = {
'title': document['title'],
'content': current_chunk.strip(),
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
# Start new chunk
current_chunk = sentence + " "
current_length = sentence_length + 1
# Paragraph fits within limit
elif current_length + para_length <= max_chunk_size:
current_chunk += para + "\n\n"
current_length += para_length + 2
# Paragraph doesn't fit, create a new chunk
else:
# Save current chunk
if current_chunk:
chunk = {
'title': document['title'],
'content': current_chunk.strip(),
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
# Start new chunk
current_chunk = para + "\n\n"
current_length = para_length + 2
# Add the last chunk
if current_chunk:
chunk = {
'title': document['title'],
'content': current_chunk.strip(),
'url': document['url'],
'categories': document.get('categories', []),
'scraped_at': document['scraped_at'],
'document_type': document.get('document_type', 'webpage')
}
chunks.append(chunk)
return chunks
def create_chunks(self) -> List[Dict[str, Any]]:
"""Process all documents and create chunks."""
all_chunks = []
# Load documents
documents = self.load_documents()
print(f"Loaded {len(documents)} documents")
# Process each document
for document in tqdm(documents, desc="Chunking documents"):
# FAQ chunks
faq_chunks = self.chunk_faqs(document)
all_chunks.extend(faq_chunks)
# Heading-based chunks
heading_chunks = self.chunk_by_headings(document)
all_chunks.extend(heading_chunks)
# Semantic chunks as fallback
if not heading_chunks:
semantic_chunks = self.chunk_semantically(document)
all_chunks.extend(semantic_chunks)
# Save chunks to output directory
with open(os.path.join(self.output_dir, 'chunks.json'), 'w') as f:
json.dump(all_chunks, f, indent=2)
print(f"Created {len(all_chunks)} chunks")
return all_chunks
def create_embeddings(self, chunks: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
"""Create embeddings for all chunks."""
if chunks is None:
# Load chunks if not provided
chunks_path = os.path.join(self.output_dir, 'chunks.json')
if os.path.exists(chunks_path):
with open(chunks_path, 'r') as f:
chunks = json.load(f)
else:
chunks = self.create_chunks()
# Prepare texts for embedding
texts = []
for chunk in chunks:
# For FAQs, combine question and answer
if chunk.get('document_type') == 'faq':
text = f"{chunk['title']} {chunk['content']}"
else:
# For regular chunks, use title and content
text = f"{chunk['title']} {chunk['content']}"
texts.append(text)
# Create embeddings
print("Creating embeddings...")
embeddings = self.model.encode(texts, show_progress_bar=True)
# Create mapping of chunk ID to embedding
embedding_map = {}
for i, chunk in enumerate(chunks):
chunk_id = f"chunk_{i}"
embedding_map[chunk_id] = {
'embedding': embeddings[i],
'chunk': chunk
}
# Save embeddings
with open(os.path.join(self.embedding_dir, 'embeddings.pkl'), 'wb') as f:
pickle.dump(embedding_map, f)
print(f"Created embeddings for {len(chunks)} chunks")
return embedding_map
# Example usage
if __name__ == "__main__":
chunker = DocumentChunker()
chunks = chunker.create_chunks()
embedding_map = chunker.create_embeddings(chunks)