import json import time import os from pathlib import Path from typing import Dict, Any, List import chromadb from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend from docling.datamodel.base_models import InputFormat from docling.datamodel.pipeline_options import PdfPipelineOptions from docling.document_converter import ( DocumentConverter, PdfFormatOption, WordFormatOption, ) from docling.pipeline.simple_pipeline import SimplePipeline from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline from docling.document import DoclingDocument from docling.chunking.hierarchical_chunker import HierarchicalChunker from langchain_community.embeddings.fastembed import FastEmbedEmbeddings class DocumentProcessor: def __init__(self): """Initialize document processor with Docling v2 changes""" self.setup_document_converter() self.embed_model = FastEmbedEmbeddings() self.client = chromadb.PersistentClient(path="chroma_db") def setup_document_converter(self): """Configure document converter to support multiple formats""" pipeline_options = PdfPipelineOptions() pipeline_options.do_ocr = False pipeline_options.do_table_structure = True self.converter = DocumentConverter( allowed_formats=[ InputFormat.PDF, InputFormat.IMAGE, InputFormat.DOCX, InputFormat.HTML, InputFormat.PPTX, InputFormat.TXT, # Added text format InputFormat.CSV, # Added CSV format ], format_options={ InputFormat.PDF: PdfFormatOption( pipeline_options=pipeline_options, backend=PyPdfiumDocumentBackend() ), InputFormat.DOCX: WordFormatOption( pipeline_cls=SimplePipeline ), }, ) def process_document(self, file_path: str): """Process document and create searchable index with metadata""" print(f"📄 Processing document: {file_path}") start_time = time.time() file_ext = Path(file_path).suffix.lower() try: conv_result = self.converter.convert(file_path) doc: DoclingDocument = conv_result.document except Exception as e: print(f"❌ Conversion failed: {e}") return None # Save document as markdown output_dir = Path("parsed-doc") output_dir.mkdir(parents=True, exist_ok=True) doc_filename = Path(file_path).stem md_filename = output_dir / f"{doc_filename}.md" doc.save_as_markdown(md_filename) chunker = HierarchicalChunker() chunks = list(chunker.chunk(doc)) processed_chunks = [] for chunk in chunks: metadata = { "text": chunk.text.strip(), "headings": [item.text for item in chunk.doc_items if hasattr(item, "text")], "content_type": chunk.doc_items[0].label if chunk.doc_items else "Unknown", } processed_chunks.append(metadata) print("✅ Chunking completed. Creating vector database...") collection = self.client.get_or_create_collection(name="document_chunks") documents, embeddings, metadata_list, ids = [], [], [], [] for idx, chunk in enumerate(processed_chunks): text = chunk.get('text', '').strip() if not text: continue embedding = self.embed_model.embed_documents([text])[0] documents.append(text) embeddings.append(embedding) metadata_list.append({ "headings": json.dumps(chunk.get('headings', [])), "content_type": chunk.get('content_type', None) }) ids.append(str(idx)) if documents: collection.add( ids=ids, embeddings=embeddings, documents=documents, metadatas=metadata_list ) print(f"✅ Successfully added {len(documents)} chunks to the database.") print(f"✅ Document processing completed in {time.time() - start_time:.2f} seconds") return collection