docling_rag / utils /ingestion.py
NEXAS's picture
Update utils/ingestion.py
0732be7 verified
raw
history blame
4.75 kB
import json
import time
import os
import logging
from pathlib import Path
import yaml
from typing import Dict, Any, List
import chromadb
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import (
DocumentConverter,
PdfFormatOption,
WordFormatOption,
)
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.chunking.hierarchical_chunker import HierarchicalChunker
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings
_log = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self):
"""Initialize document processor with Docling v2 changes"""
self.setup_document_converter()
self.embed_model = FastEmbedEmbeddings()
self.client = chromadb.PersistentClient(path="chroma_db")
def setup_document_converter(self):
"""Configure document converter to support multiple formats"""
pipeline_options = PdfPipelineOptions()
pipeline_options.do_ocr = False
pipeline_options.do_table_structure = True
self.converter = DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.IMAGE,
InputFormat.DOCX,
InputFormat.HTML,
InputFormat.PPTX,
InputFormat.TXT, # Added text format
InputFormat.CSV, # Added CSV format
InputFormat.ASCIIDOC, # Added AsciiDoc format
InputFormat.MD, # Added Markdown format
],
format_options={
InputFormat.PDF: PdfFormatOption(
pipeline_cls=StandardPdfPipeline,
backend=PyPdfiumDocumentBackend
),
InputFormat.DOCX: WordFormatOption(
pipeline_cls=SimplePipeline
),
},
)
def process_document(self, file_path: str):
"""Process document and create searchable index with metadata"""
print(f"πŸ“„ Processing document: {file_path}")
start_time = time.time()
file_ext = Path(file_path).suffix.lower()
try:
conv_result = self.converter.convert(file_path)
doc = conv_result.document
except Exception as e:
print(f"❌ Conversion failed: {e}")
return None
# Save document as markdown, JSON, and YAML
output_dir = Path("parsed-doc")
output_dir.mkdir(parents=True, exist_ok=True)
doc_filename = Path(file_path).stem
with (output_dir / f"{doc_filename}.md").open("w") as fp:
fp.write(doc.export_to_markdown())
with (output_dir / f"{doc_filename}.json").open("w") as fp:
fp.write(json.dumps(doc.export_to_dict()))
with (output_dir / f"{doc_filename}.yaml").open("w") as fp:
fp.write(yaml.safe_dump(doc.export_to_dict()))
chunker = HierarchicalChunker()
chunks = list(chunker.chunk(doc))
processed_chunks = []
for chunk in chunks:
metadata = {
"text": chunk.text.strip(),
"headings": [item.text for item in chunk.doc_items if hasattr(item, "text")],
"content_type": chunk.doc_items[0].label if chunk.doc_items else "Unknown",
}
processed_chunks.append(metadata)
print("βœ… Chunking completed. Creating vector database...")
collection = self.client.get_or_create_collection(name="document_chunks")
documents, embeddings, metadata_list, ids = [], [], [], []
for idx, chunk in enumerate(processed_chunks):
text = chunk.get('text', '').strip()
if not text:
continue
embedding = self.embed_model.embed_documents([text])[0]
documents.append(text)
embeddings.append(embedding)
metadata_list.append({
"headings": json.dumps(chunk.get('headings', [])),
"content_type": chunk.get('content_type', None)
})
ids.append(str(idx))
if documents:
collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadata_list
)
print(f"βœ… Successfully added {len(documents)} chunks to the database.")
print(f"βœ… Document processing completed in {time.time() - start_time:.2f} seconds")
return collection