Spaces:
Running
Running
File size: 5,318 Bytes
59ff001 3658a99 59ff001 6a0411c 9f080c3 70fe98e 3dc75f1 8be5494 59ff001 9f080c3 bd2cd53 8a21578 6065374 8a21578 6065374 8a21578 6065374 8a21578 6065374 59ff001 5827499 bd2cd53 9f080c3 bd2cd53 6065374 6a0411c 59ff001 9f080c3 59ff001 6a0411c 59ff001 c7e3ff4 59ff001 2a16ca6 59ff001 9f080c3 2a16ca6 9f080c3 59ff001 9f080c3 2a16ca6 9f080c3 e9af7f8 9f080c3 8a21578 6065374 822eba7 6a0411c 6065374 59ff001 6065374 59ff001 6065374 59ff001 6a0411c d45f3e7 6a0411c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
import gradio as gr
import torch
import base64
import fitz # PyMuPDF
import tempfile
from io import BytesIO
from PIL import Image
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
from olmocr.data.renderpdf import render_pdf_to_base64png
from olmocr.prompts.anchor import get_anchor_text
import re
import html
import json
model = Qwen2VLForConditionalGeneration.from_pretrained(
"allenai/olmOCR-7B-0225-preview", torch_dtype=torch.bfloat16
).eval()
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-7B-Instruct")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
def clean_page_headers(text):
lines = text.split("\n")
cleaned = []
for line in lines:
if not re.match(r'^(\s*Page \d+|\s*\d{1,2}\s*/\s*\d{1,2}|^[A-Z][A-Za-z\s]{0,20}$)', line.strip()):
cleaned.append(line)
return "\n".join(cleaned)
def replace_headers_in_text(text, page_headers):
lines = text.split("\n")
for level, header in page_headers:
prefix = "#" * min(level, 6)
pattern = re.compile(re.escape(header.strip()), re.IGNORECASE)
for idx, line in enumerate(lines):
if pattern.fullmatch(line.strip()):
lines[idx] = f"{prefix} {header.strip()}"
break
else:
lines.insert(0, f"{prefix} {header.strip()}")
return "\n".join(lines)
def process_pdf_to_markdown(pdf_file, title, author):
pdf_path = pdf_file.name
doc = fitz.open(pdf_path)
num_pages = len(doc)
toc_entries = doc.get_toc()
toc_by_page = {}
for level, header, page in toc_entries:
toc_by_page.setdefault(page, []).append((level, header))
all_text = f"# {title}\n\n**Author(s):** {author}\n\n"
for i in range(num_pages):
page_num = i + 1
print(f"Processing page {page_num}...")
try:
image_base64 = render_pdf_to_base64png(pdf_path, page_num, target_longest_image_dim=1024)
anchor_text = get_anchor_text(pdf_path, page_num, pdf_engine="pdfreport", target_length=4000)
prompt = (
"Below is the image of one page of a document, as well as some raw textual content that was previously "
"extracted for it. Just return the plain text representation of this document as if you were reading it naturally.\n"
"Do not hallucinate.\n"
"RAW_TEXT_START\n"
f"{anchor_text}\n"
"RAW_TEXT_END"
)
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
],
}
]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image = Image.open(BytesIO(base64.b64decode(image_base64)))
inputs = processor(
text=[text],
images=[image],
padding=True,
return_tensors="pt",
)
inputs = {k: v.to(device) for k, v in inputs.items()}
output = model.generate(
**inputs,
temperature=0.8,
max_new_tokens=5096,
num_return_sequences=1,
do_sample=True,
)
prompt_len = inputs["input_ids"].shape[1]
new_tokens = output[:, prompt_len:].detach().cpu()
decoded = "[No output generated]"
if new_tokens.shape[1] > 0:
decoded_list = processor.tokenizer.batch_decode(new_tokens, skip_special_tokens=True)
raw_output = decoded_list[0].strip() if decoded_list else "[No output generated]"
try:
parsed = json.loads(raw_output)
decoded = parsed.get("natural_text", raw_output)
except json.JSONDecodeError:
decoded = raw_output
except Exception as e:
decoded = f"[Error on page {page_num}: {e}]"
print(f"Decoded content for page {page_num}: {decoded}")
cleaned_text = clean_page_headers(decoded)
if page_num in toc_by_page:
cleaned_text = replace_headers_in_text(cleaned_text, toc_by_page[page_num])
all_text += cleaned_text + "\n\n"
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt", dir="/tmp", mode="w", encoding="utf-8") as tmp:
tmp.write(all_text)
return tmp.name
iface = gr.Interface(
fn=process_pdf_to_markdown,
inputs=[
gr.File(label="Upload PDF", file_types=[".pdf"]),
gr.Textbox(label="Markdown Title"),
gr.Textbox(label="Author(s)")
],
outputs=gr.File(label="Download Markdown .txt"),
title="PDF to Markdown Converter (for Calibre)",
description="Extracts text with structure and outputs it as Markdown in a .txt file compatible with Calibre.",
allow_flagging="never"
)
if __name__ == "__main__":
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
debug=True,
allowed_paths=["/tmp"]
)
|