Spaces:
Build error
Build error
| """ | |
| Generic Pre-Processing Pipeline (GPP) for Document Intelligence | |
| This module handles: | |
| 1. Parsing PDFs via MinerU Python API (OCR/text modes) | |
| 2. Extracting markdown, images, and content_list JSON | |
| 3. Chunking multimodal content (text, tables, images), ensuring tables/images are in single chunks | |
| 4. Parsing markdown tables into JSON 2D structures for dense tables | |
| 5. Narration of tables/images via LLM | |
| 6. Semantic enhancements (deduplication, coreference, metadata summarization) | |
| 7. Embedding computation for in-memory use | |
| Each step is modular to support swapping components (e.g. different parsers or stores). | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| import re | |
| from mineru.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader | |
| from mineru.data.dataset import PymuDocDataset | |
| from mineru.model.doc_analyze_by_custom_model import doc_analyze | |
| from mineru.config.enums import SupportedPdfParseMethod | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from sentence_transformers import SentenceTransformer | |
| from rank_bm25 import BM25Okapi | |
| import numpy as np | |
| # LLM client abstraction | |
| from src.utils import LLMClient | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| def parse_markdown_table(md: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parses a markdown table into a JSON-like dict: | |
| { headers: [...], rows: [[...], ...] } | |
| Handles multi-level headers by nesting lists if needed. | |
| """ | |
| lines = [l for l in md.strip().splitlines() if l.strip().startswith('|')] | |
| if len(lines) < 2: | |
| return None | |
| header_line = lines[0] | |
| sep_line = lines[1] | |
| # Validate separator line | |
| if not re.match(r"^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)+\|?", sep_line): | |
| return None | |
| def split_row(line): | |
| parts = [cell.strip() for cell in line.strip().strip('|').split('|')] | |
| return parts | |
| headers = split_row(header_line) | |
| rows = [split_row(r) for r in lines[2:]] | |
| return {'headers': headers, 'rows': rows} | |
| class GPPConfig: | |
| """ | |
| Configuration for GPP pipeline. | |
| """ | |
| CHUNK_TOKEN_SIZE = 256 | |
| DEDUP_SIM_THRESHOLD = 0.9 | |
| EXPANSION_SIM_THRESHOLD = 0.85 | |
| COREF_CONTEXT_SIZE = 3 | |
| # Embedding models | |
| TEXT_EMBED_MODEL = 'sentence-transformers/all-MiniLM-L6-v2' | |
| META_EMBED_MODEL = 'sentence-transformers/all-MiniLM-L6-v2' | |
| class GPP: | |
| def __init__(self, config: GPPConfig): | |
| self.config = config | |
| # Embedding models | |
| self.text_embedder = SentenceTransformer(config.TEXT_EMBED_MODEL) | |
| self.meta_embedder = SentenceTransformer(config.META_EMBED_MODEL) | |
| self.bm25 = None | |
| def parse_pdf(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: | |
| """ | |
| Uses MinerU API to parse PDF in OCR/text mode, | |
| dumps markdown, images, layout PDF, content_list JSON. | |
| Returns parsed data plus file paths for UI traceability. | |
| """ | |
| name = os.path.splitext(os.path.basename(pdf_path))[0] | |
| img_dir = os.path.join(output_dir, 'images') | |
| os.makedirs(img_dir, exist_ok=True) | |
| os.makedirs(output_dir, exist_ok=True) | |
| writer_imgs = FileBasedDataWriter(img_dir) | |
| writer_md = FileBasedDataWriter(output_dir) | |
| reader = FileBasedDataReader("") | |
| pdf_bytes = reader.read(pdf_path) | |
| ds = PymuDocDataset(pdf_bytes) | |
| if ds.classify() == SupportedPdfParseMethod.OCR: | |
| infer = ds.apply(doc_analyze, ocr=True) | |
| pipe = infer.pipe_ocr_mode(writer_imgs) | |
| else: | |
| infer = ds.apply(doc_analyze, ocr=False) | |
| pipe = infer.pipe_txt_mode(writer_imgs) | |
| # Visual layout | |
| pipe.draw_layout(os.path.join(output_dir, f"{name}_layout.pdf")) | |
| # Dump markdown & JSON | |
| pipe.dump_md(writer_md, f"{name}.md", os.path.basename(img_dir)) | |
| pipe.dump_content_list(writer_md, f"{name}_content_list.json", os.path.basename(img_dir)) | |
| content_list_path = os.path.join(output_dir, f"{name}_content_list.json") | |
| with open(content_list_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # UI traceability paths | |
| data.update({ | |
| 'md_path': os.path.join(output_dir, f"{name}.md"), | |
| 'images_dir': img_dir, | |
| 'layout_pdf': os.path.join(output_dir, f"{name}_layout.pdf") | |
| }) | |
| return data | |
| def chunk_blocks(self, blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """ | |
| Creates chunks of ~CHUNK_TOKEN_SIZE tokens, but ensures any table/image block | |
| becomes its own chunk (unsplittable), flushing current text chunk as needed. | |
| """ | |
| chunks, current, token_count = [], {'text': '', 'type': None, 'blocks': []}, 0 | |
| for blk in blocks: | |
| btype = blk.get('type') | |
| text = blk.get('text', '') | |
| if btype in ('table', 'img_path'): | |
| # Flush existing text chunk | |
| if current['blocks']: | |
| chunks.append(current) | |
| current = {'text': '', 'type': None, 'blocks': []} | |
| token_count = 0 | |
| # Create isolated chunk for the table/image | |
| tbl_chunk = {'text': text, 'type': btype, 'blocks': [blk]} | |
| # Parse markdown table into JSON structure if applicable | |
| if btype == 'table': | |
| tbl_struct = parse_markdown_table(text) | |
| tbl_chunk['table_structure'] = tbl_struct | |
| chunks.append(tbl_chunk) | |
| continue | |
| # Standard text accumulation | |
| count = len(text.split()) | |
| if token_count + count > self.config.CHUNK_TOKEN_SIZE and current['blocks']: | |
| chunks.append(current) | |
| current = {'text': '', 'type': None, 'blocks': []} | |
| token_count = 0 | |
| current['text'] += text + '\n' | |
| current['type'] = current['type'] or btype | |
| current['blocks'].append(blk) | |
| token_count += count | |
| # Flush remaining | |
| if current['blocks']: | |
| chunks.append(current) | |
| logger.info(f"Chunked into {len(chunks)} pieces (with tables/images isolated).") | |
| return chunks | |
| def narrate_multimodal(self, chunks: List[Dict[str, Any]]) -> None: | |
| """ | |
| For table/image chunks, generate LLM narration. Preserve table_structure in metadata. | |
| """ | |
| for c in chunks: | |
| if c['type'] in ('table', 'img_path'): | |
| prompt = f"Describe this {c['type']} concisely:\n{c['text']}" | |
| c['narration'] = LLMClient.generate(prompt) | |
| else: | |
| c['narration'] = c['text'] | |
| def deduplicate(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| try: | |
| embs = self.text_embedder.encode([c.get('narration', '') for c in chunks], convert_to_tensor=True) | |
| keep = [] | |
| for i, emb in enumerate(embs): | |
| if not any((emb @ embs[j]).item() / (np.linalg.norm(emb) * np.linalg.norm(embs[j]) + 1e-8) | |
| > self.config.DEDUP_SIM_THRESHOLD for j in keep): | |
| keep.append(i) | |
| deduped = [chunks[i] for i in keep] | |
| logger.info(f"Deduplicated: {len(chunks)}→{len(deduped)}") | |
| return deduped | |
| except Exception as e: | |
| logger.error(f"Deduplication failed: {e}") | |
| return chunks | |
| def coref_resolution(self, chunks: List[Dict[str, Any]]) -> None: | |
| for idx, c in enumerate(chunks): | |
| start = max(0, idx-self.config.COREF_CONTEXT_SIZE) | |
| ctx = "\n".join(chunks[i].get('narration', '') for i in range(start, idx)) | |
| prompt = f"Context:\n{ctx}\nRewrite pronouns in:\n{c.get('narration', '')}" | |
| try: | |
| c['narration'] = LLMClient.generate(prompt) | |
| except Exception as e: | |
| logger.error(f"Coref resolution failed for chunk {idx}: {e}") | |
| def metadata_summarization(self, chunks: List[Dict[str, Any]]) -> None: | |
| sections: Dict[str, List[Dict[str, Any]]] = {} | |
| for c in chunks: | |
| sec = c.get('section', 'default') | |
| sections.setdefault(sec, []).append(c) | |
| for sec, items in sections.items(): | |
| blob = "\n".join(i.get('narration', '') for i in items) | |
| try: | |
| summ = LLMClient.generate(f"Summarize this section:\n{blob}") | |
| for i in items: | |
| i.setdefault('metadata', {})['section_summary'] = summ | |
| except Exception as e: | |
| logger.error(f"Metadata summarization failed for section {sec}: {e}") | |
| def build_bm25(self, chunks: List[Dict[str, Any]]) -> None: | |
| """ | |
| Build BM25 index on token lists for sparse retrieval. | |
| """ | |
| tokenized = [c['narration'].split() for c in chunks] | |
| self.bm25 = BM25Okapi(tokenized) | |
| # def compute_and_store(self, chunks: List[Dict[str, Any]]) -> None: | |
| # try: | |
| # txts = [c.get('narration', '') for c in chunks] | |
| # metas = [c.get('metadata', {}).get('section_summary', '') for c in chunks] | |
| # txt_embs = self.text_embedder.encode(txts) | |
| # meta_embs = self.meta_embedder.encode(metas) | |
| # # No Redis storage, just keep for in-memory use or return as needed | |
| # logger.info("Computed embeddings for chunks.") | |
| # except Exception as e: | |
| # logger.error(f"Failed to compute embeddings: {e}") | |
| def run(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: | |
| """ | |
| Executes full GPP: parse -> chunk -> narrate -> enhance -> index. | |
| Returns parse output dict augmented with `chunks` for downstream processes. | |
| """ | |
| parsed = self.parse_pdf(pdf_path, output_dir) | |
| blocks = parsed.get('blocks', []) | |
| chunks = self.chunk_blocks(blocks) | |
| self.narrate_multimodal(chunks) | |
| chunks = self.deduplicate(chunks) | |
| self.coref_resolution(chunks) | |
| self.metadata_summarization(chunks) | |
| self.build_bm25(chunks) | |
| # self.compute_and_store(chunks) | |
| parsed['chunks'] = chunks | |
| logger.info("GPP pipeline complete.") | |
| return parsed | |