|
""" |
|
Generic Pre-Processing Pipeline (GPP) for Document Intelligence |
|
|
|
This module handles: |
|
1. Parsing PDFs via MinerU Python API (OCR/text modes) |
|
2. Extracting markdown, images, and content_list JSON |
|
3. Chunking multimodal content (text, tables, images), ensuring tables/images are in single chunks |
|
4. Parsing markdown tables into JSON 2D structures for dense tables |
|
5. Narration of tables/images via LLM |
|
6. Semantic enhancements (deduplication, coreference, metadata summarization) |
|
7. Embedding computation for in-memory use |
|
|
|
Each step is modular to support swapping components (e.g. different parsers or stores). |
|
""" |
|
import os |
|
import json |
|
import logging |
|
from typing import List, Dict, Any, Optional |
|
import re |
|
|
|
from mineru.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader |
|
from mineru.data.dataset import PymuDocDataset |
|
from mineru.model.doc_analyze_by_custom_model import doc_analyze |
|
from mineru.config.enums import SupportedPdfParseMethod |
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from sentence_transformers import SentenceTransformer |
|
from rank_bm25 import BM25Okapi |
|
import numpy as np |
|
|
|
|
|
from src.utils import LLMClient |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
def parse_markdown_table(md: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Parses a markdown table into a JSON-like dict: |
|
{ headers: [...], rows: [[...], ...] } |
|
Handles multi-level headers by nesting lists if needed. |
|
""" |
|
lines = [l for l in md.strip().splitlines() if l.strip().startswith('|')] |
|
if len(lines) < 2: |
|
return None |
|
header_line = lines[0] |
|
sep_line = lines[1] |
|
|
|
if not re.match(r"^\|?\s*:?-+:?\s*(\|\s*:?-+:?\s*)+\|?", sep_line): |
|
return None |
|
def split_row(line): |
|
parts = [cell.strip() for cell in line.strip().strip('|').split('|')] |
|
return parts |
|
headers = split_row(header_line) |
|
rows = [split_row(r) for r in lines[2:]] |
|
return {'headers': headers, 'rows': rows} |
|
|
|
class GPPConfig: |
|
""" |
|
Configuration for GPP pipeline. |
|
""" |
|
CHUNK_TOKEN_SIZE = 256 |
|
DEDUP_SIM_THRESHOLD = 0.9 |
|
EXPANSION_SIM_THRESHOLD = 0.85 |
|
COREF_CONTEXT_SIZE = 3 |
|
|
|
|
|
TEXT_EMBED_MODEL = 'sentence-transformers/all-MiniLM-L6-v2' |
|
META_EMBED_MODEL = 'sentence-transformers/all-MiniLM-L6-v2' |
|
|
|
class GPP: |
|
def __init__(self, config: GPPConfig): |
|
self.config = config |
|
|
|
self.text_embedder = SentenceTransformer(config.TEXT_EMBED_MODEL) |
|
self.meta_embedder = SentenceTransformer(config.META_EMBED_MODEL) |
|
self.bm25 = None |
|
|
|
def parse_pdf(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: |
|
""" |
|
Uses MinerU API to parse PDF in OCR/text mode, |
|
dumps markdown, images, layout PDF, content_list JSON. |
|
Returns parsed data plus file paths for UI traceability. |
|
""" |
|
name = os.path.splitext(os.path.basename(pdf_path))[0] |
|
img_dir = os.path.join(output_dir, 'images') |
|
os.makedirs(img_dir, exist_ok=True) |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
writer_imgs = FileBasedDataWriter(img_dir) |
|
writer_md = FileBasedDataWriter(output_dir) |
|
reader = FileBasedDataReader("") |
|
pdf_bytes = reader.read(pdf_path) |
|
ds = PymuDocDataset(pdf_bytes) |
|
if ds.classify() == SupportedPdfParseMethod.OCR: |
|
infer = ds.apply(doc_analyze, ocr=True) |
|
pipe = infer.pipe_ocr_mode(writer_imgs) |
|
else: |
|
infer = ds.apply(doc_analyze, ocr=False) |
|
pipe = infer.pipe_txt_mode(writer_imgs) |
|
|
|
pipe.draw_layout(os.path.join(output_dir, f"{name}_layout.pdf")) |
|
|
|
pipe.dump_md(writer_md, f"{name}.md", os.path.basename(img_dir)) |
|
pipe.dump_content_list(writer_md, f"{name}_content_list.json", os.path.basename(img_dir)) |
|
|
|
content_list_path = os.path.join(output_dir, f"{name}_content_list.json") |
|
with open(content_list_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
|
|
data.update({ |
|
'md_path': os.path.join(output_dir, f"{name}.md"), |
|
'images_dir': img_dir, |
|
'layout_pdf': os.path.join(output_dir, f"{name}_layout.pdf") |
|
}) |
|
return data |
|
|
|
def chunk_blocks(self, blocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
""" |
|
Creates chunks of ~CHUNK_TOKEN_SIZE tokens, but ensures any table/image block |
|
becomes its own chunk (unsplittable), flushing current text chunk as needed. |
|
""" |
|
chunks, current, token_count = [], {'text': '', 'type': None, 'blocks': []}, 0 |
|
for blk in blocks: |
|
btype = blk.get('type') |
|
text = blk.get('text', '') |
|
if btype in ('table', 'img_path'): |
|
|
|
if current['blocks']: |
|
chunks.append(current) |
|
current = {'text': '', 'type': None, 'blocks': []} |
|
token_count = 0 |
|
|
|
tbl_chunk = {'text': text, 'type': btype, 'blocks': [blk]} |
|
|
|
if btype == 'table': |
|
tbl_struct = parse_markdown_table(text) |
|
tbl_chunk['table_structure'] = tbl_struct |
|
chunks.append(tbl_chunk) |
|
continue |
|
|
|
count = len(text.split()) |
|
if token_count + count > self.config.CHUNK_TOKEN_SIZE and current['blocks']: |
|
chunks.append(current) |
|
current = {'text': '', 'type': None, 'blocks': []} |
|
token_count = 0 |
|
current['text'] += text + '\n' |
|
current['type'] = current['type'] or btype |
|
current['blocks'].append(blk) |
|
token_count += count |
|
|
|
if current['blocks']: |
|
chunks.append(current) |
|
logger.info(f"Chunked into {len(chunks)} pieces (with tables/images isolated).") |
|
return chunks |
|
|
|
def narrate_multimodal(self, chunks: List[Dict[str, Any]]) -> None: |
|
""" |
|
For table/image chunks, generate LLM narration. Preserve table_structure in metadata. |
|
""" |
|
for c in chunks: |
|
if c['type'] in ('table', 'img_path'): |
|
prompt = f"Describe this {c['type']} concisely:\n{c['text']}" |
|
c['narration'] = LLMClient.generate(prompt) |
|
else: |
|
c['narration'] = c['text'] |
|
|
|
def deduplicate(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
try: |
|
embs = self.text_embedder.encode([c.get('narration', '') for c in chunks], convert_to_tensor=True) |
|
keep = [] |
|
for i, emb in enumerate(embs): |
|
if not any((emb @ embs[j]).item() / (np.linalg.norm(emb) * np.linalg.norm(embs[j]) + 1e-8) |
|
> self.config.DEDUP_SIM_THRESHOLD for j in keep): |
|
keep.append(i) |
|
deduped = [chunks[i] for i in keep] |
|
logger.info(f"Deduplicated: {len(chunks)}→{len(deduped)}") |
|
return deduped |
|
except Exception as e: |
|
logger.error(f"Deduplication failed: {e}") |
|
return chunks |
|
|
|
def coref_resolution(self, chunks: List[Dict[str, Any]]) -> None: |
|
for idx, c in enumerate(chunks): |
|
start = max(0, idx-self.config.COREF_CONTEXT_SIZE) |
|
ctx = "\n".join(chunks[i].get('narration', '') for i in range(start, idx)) |
|
prompt = f"Context:\n{ctx}\nRewrite pronouns in:\n{c.get('narration', '')}" |
|
try: |
|
c['narration'] = LLMClient.generate(prompt) |
|
except Exception as e: |
|
logger.error(f"Coref resolution failed for chunk {idx}: {e}") |
|
|
|
def metadata_summarization(self, chunks: List[Dict[str, Any]]) -> None: |
|
sections: Dict[str, List[Dict[str, Any]]] = {} |
|
for c in chunks: |
|
sec = c.get('section', 'default') |
|
sections.setdefault(sec, []).append(c) |
|
for sec, items in sections.items(): |
|
blob = "\n".join(i.get('narration', '') for i in items) |
|
try: |
|
summ = LLMClient.generate(f"Summarize this section:\n{blob}") |
|
for i in items: |
|
i.setdefault('metadata', {})['section_summary'] = summ |
|
except Exception as e: |
|
logger.error(f"Metadata summarization failed for section {sec}: {e}") |
|
|
|
def build_bm25(self, chunks: List[Dict[str, Any]]) -> None: |
|
""" |
|
Build BM25 index on token lists for sparse retrieval. |
|
""" |
|
tokenized = [c['narration'].split() for c in chunks] |
|
self.bm25 = BM25Okapi(tokenized) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(self, pdf_path: str, output_dir: str) -> Dict[str, Any]: |
|
""" |
|
Executes full GPP: parse -> chunk -> narrate -> enhance -> index. |
|
Returns parse output dict augmented with `chunks` for downstream processes. |
|
""" |
|
parsed = self.parse_pdf(pdf_path, output_dir) |
|
blocks = parsed.get('blocks', []) |
|
chunks = self.chunk_blocks(blocks) |
|
self.narrate_multimodal(chunks) |
|
chunks = self.deduplicate(chunks) |
|
self.coref_resolution(chunks) |
|
self.metadata_summarization(chunks) |
|
self.build_bm25(chunks) |
|
|
|
parsed['chunks'] = chunks |
|
logger.info("GPP pipeline complete.") |
|
return parsed |
|
|