import gradio as gr import torch import base64 import fitz # PyMuPDF import tempfile from io import BytesIO from PIL import Image from transformers import AutoProcessor, Qwen2VLForConditionalGeneration from olmocr.data.renderpdf import render_pdf_to_base64png from olmocr.prompts.anchor import get_anchor_text import re import html import json model = Qwen2VLForConditionalGeneration.from_pretrained( "allenai/olmOCR-7B-0225-preview", torch_dtype=torch.bfloat16 ).eval() processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-7B-Instruct") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) def clean_page_headers(text): lines = text.split("\n") cleaned = [] for line in lines: if not re.match(r'^(\s*Page \d+|\s*\d{1,2}\s*/\s*\d{1,2}|^[A-Z][A-Za-z\s]{0,20}$)', line.strip()): cleaned.append(line) return "\n".join(cleaned) def replace_headers_in_text(text, page_headers): lines = text.split("\n") for level, header in page_headers: prefix = "#" * min(level, 6) pattern = re.compile(re.escape(header.strip()), re.IGNORECASE) for idx, line in enumerate(lines): if pattern.fullmatch(line.strip()): lines[idx] = f"{prefix} {header.strip()}" break else: lines.insert(0, f"{prefix} {header.strip()}") return "\n".join(lines) def process_pdf_to_markdown(pdf_file, title, author): pdf_path = pdf_file.name doc = fitz.open(pdf_path) num_pages = len(doc) toc_entries = doc.get_toc() toc_by_page = {} for level, header, page in toc_entries: toc_by_page.setdefault(page, []).append((level, header)) all_text = f"# {title}\n\n**Author(s):** {author}\n\n" for i in range(num_pages): page_num = i + 1 print(f"Processing page {page_num}...") try: image_base64 = render_pdf_to_base64png(pdf_path, page_num, target_longest_image_dim=1024) anchor_text = get_anchor_text(pdf_path, page_num, pdf_engine="pdfreport", target_length=4000) prompt = ( "Below is the image of one page of a document, as well as some raw textual content that was previously " "extracted for it. Just return the plain text representation of this document as if you were reading it naturally.\n" "Do not hallucinate.\n" "RAW_TEXT_START\n" f"{anchor_text}\n" "RAW_TEXT_END" ) messages = [ { "role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}}, ], } ] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) image = Image.open(BytesIO(base64.b64decode(image_base64))) inputs = processor( text=[text], images=[image], padding=True, return_tensors="pt", ) inputs = {k: v.to(device) for k, v in inputs.items()} output = model.generate( **inputs, temperature=0.8, max_new_tokens=5096, num_return_sequences=1, do_sample=True, ) prompt_len = inputs["input_ids"].shape[1] new_tokens = output[:, prompt_len:].detach().cpu() decoded = "[No output generated]" if new_tokens.shape[1] > 0: decoded_list = processor.tokenizer.batch_decode(new_tokens, skip_special_tokens=True) raw_output = decoded_list[0].strip() if decoded_list else "[No output generated]" try: parsed = json.loads(raw_output) decoded = parsed.get("natural_text", raw_output) except json.JSONDecodeError: decoded = raw_output except Exception as e: decoded = f"[Error on page {page_num}: {e}]" print(f"Decoded content for page {page_num}: {decoded}") cleaned_text = clean_page_headers(decoded) if page_num in toc_by_page: cleaned_text = replace_headers_in_text(cleaned_text, toc_by_page[page_num]) all_text += cleaned_text + "\n\n" with tempfile.NamedTemporaryFile(delete=False, suffix=".txt", dir="/tmp", mode="w", encoding="utf-8") as tmp: tmp.write(all_text) return tmp.name iface = gr.Interface( fn=process_pdf_to_markdown, inputs=[ gr.File(label="Upload PDF", file_types=[".pdf"]), gr.Textbox(label="Markdown Title"), gr.Textbox(label="Author(s)") ], outputs=gr.File(label="Download Markdown .txt"), title="PDF to Markdown Converter (for Calibre)", description="Extracts text with structure and outputs it as Markdown in a .txt file compatible with Calibre.", allow_flagging="never" ) if __name__ == "__main__": iface.launch( server_name="0.0.0.0", server_port=7860, share=True, debug=True, allowed_paths=["/tmp"] )