|
import atexit |
|
import base64 |
|
import io |
|
import json |
|
import os |
|
import re |
|
import tempfile |
|
import uuid |
|
import zipfile |
|
from pathlib import Path |
|
|
|
import gradio as gr |
|
import requests |
|
from PIL import Image |
|
|
|
|
|
API_URL = "https://cf38vaydqdl2l4p2.aistudio-hub.baidu.com/layout-parsing" |
|
TOKEN = os.getenv("API_TOKEN", "") |
|
|
|
LOGO_PATH = Path(__file__).parent / "pp-structurev3.png" |
|
with open(LOGO_PATH, "rb") as image_file: |
|
LOGO_BASE64 = ( |
|
f"data:image/png;base64,{base64.b64encode(image_file.read()).decode('utf-8')}" |
|
) |
|
|
|
TEMP_DIR = tempfile.TemporaryDirectory() |
|
atexit.register(TEMP_DIR.cleanup) |
|
|
|
|
|
CSS = """ |
|
:root { |
|
--sand-color: #FAF9F6; |
|
--white: #ffffff; |
|
--shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
--text-color: #F3F4F7; |
|
--black:#000000; |
|
--link-hover: #2b6cb0; |
|
--content-width: 1200px; |
|
} |
|
|
|
body { |
|
display: flex; |
|
justify-content: center; |
|
background-color: var(--sand-color); |
|
color: var(--text-color); |
|
font-family: Arial, sans-serif; |
|
} |
|
|
|
.gradio-container { |
|
max-width: var(--content-width) !important; |
|
width: 100% !important; |
|
margin: 20px auto; |
|
padding: 20px; |
|
background-color: var(--white); |
|
} |
|
|
|
#component-0, |
|
#tabs, |
|
#settings { |
|
background-color: var(--white) !important; |
|
padding: 15px; |
|
} |
|
|
|
.upload-section { |
|
width: 100%; |
|
margin: 0 auto 30px; |
|
padding: 20px; |
|
background-color: var(--sand-color) !important; |
|
border-radius: 8px; |
|
box-shadow: var(--shadow); |
|
} |
|
|
|
.center-content { |
|
display: flex; |
|
flex-direction: column; |
|
align-items: center; |
|
text-align: center; |
|
margin-bottom: 20px; |
|
} |
|
|
|
.header { |
|
margin-bottom: 30px; |
|
width: 100%; |
|
} |
|
|
|
.logo-container { |
|
width: 100%; |
|
margin-bottom: 20px; |
|
} |
|
|
|
.logo-img { |
|
width: 100%; |
|
max-width: var(--content-width); |
|
margin: 0 auto; |
|
display: block; |
|
} |
|
|
|
.nav-bar { |
|
display: flex; |
|
justify-content: center; |
|
background-color: var(--white); |
|
padding: 15px 0; |
|
box-shadow: var(--shadow); |
|
margin-bottom: 20px; |
|
} |
|
|
|
.nav-links { |
|
display: flex; |
|
gap: 30px; |
|
width: 100%; |
|
justify-content: center; |
|
} |
|
|
|
.nav-link { |
|
color: var(--black); |
|
text-decoration: none; |
|
font-weight: bold; |
|
font-size: 24px; |
|
transition: color 0.2s; |
|
} |
|
|
|
.nav-link:hover { |
|
color: var(--link-hover); |
|
text-decoration: none; |
|
} |
|
|
|
button { |
|
background-color: var(--text-color) !important; |
|
color: var(--black) !important; |
|
border: none !important; |
|
border-radius: 4px; |
|
padding: 8px 16px; |
|
} |
|
button:hover { |
|
opacity: 0.8 !important; |
|
} |
|
|
|
.file-download { |
|
margin-top: 15px !important; |
|
} |
|
.loader { |
|
border: 5px solid #f3f3f3; |
|
border-top: 5px solid #3498db; |
|
border-radius: 50%; |
|
width: 50px; |
|
height: 50px; |
|
animation: spin 1s linear infinite; |
|
margin: 20px auto; |
|
} |
|
|
|
@keyframes spin { |
|
0% { transform: rotate(0deg); } |
|
100% { transform: rotate(360deg); } |
|
} |
|
|
|
.loader-container { |
|
text-align: center; |
|
margin: 20px 0; |
|
} |
|
""" |
|
|
|
MAX_NUM_PAGES = 10 |
|
|
|
|
|
def url_to_bytes(url, *, timeout=10): |
|
resp = requests.get(url, timeout=timeout) |
|
resp.raise_for_status() |
|
return resp.content |
|
|
|
|
|
def bytes_to_image(image_bytes): |
|
return Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
def embed_images_into_markdown_text(markdown_text, markdown_images): |
|
for img_path, img_url in markdown_images.items(): |
|
|
|
markdown_text = markdown_text.replace( |
|
f'<img src="{img_path}"', f'<img src="{img_url}"' |
|
) |
|
return markdown_text |
|
|
|
|
|
|
|
def concatenate_markdown_pages(markdown_list): |
|
markdown_texts = "" |
|
previous_page_last_element_paragraph_end_flag = True |
|
|
|
for res in markdown_list: |
|
|
|
page_first_element_paragraph_start_flag: bool = res["isStart"] |
|
page_last_element_paragraph_end_flag: bool = res["isEnd"] |
|
|
|
|
|
if ( |
|
not page_first_element_paragraph_start_flag |
|
and not previous_page_last_element_paragraph_end_flag |
|
): |
|
last_char_of_markdown = markdown_texts[-1] if markdown_texts else "" |
|
first_char_of_handler = res["text"] |
|
|
|
|
|
last_is_chinese_char = ( |
|
re.match(r"[\u4e00-\u9fff]", last_char_of_markdown) |
|
if last_char_of_markdown |
|
else False |
|
) |
|
first_is_chinese_char = ( |
|
re.match(r"[\u4e00-\u9fff]", first_char_of_handler) |
|
if first_char_of_handler |
|
else False |
|
) |
|
if not (last_is_chinese_char or first_is_chinese_char): |
|
markdown_texts += " " + res["text"] |
|
else: |
|
markdown_texts += res["text"] |
|
else: |
|
markdown_texts += "\n\n" + res["text"] |
|
previous_page_last_element_paragraph_end_flag = ( |
|
page_last_element_paragraph_end_flag |
|
) |
|
|
|
return markdown_texts |
|
|
|
|
|
def process_file( |
|
file_path, |
|
use_formula_recognition, |
|
use_chart_recognition, |
|
use_doc_orientation_classify, |
|
use_doc_unwarping, |
|
use_textline_orientation, |
|
): |
|
"""Process uploaded file with API""" |
|
try: |
|
if not file_path: |
|
raise ValueError("Please upload a file first") |
|
|
|
if Path(file_path).suffix == ".pdf": |
|
file_type = "pdf" |
|
else: |
|
file_type = "image" |
|
|
|
|
|
with open(file_path, "rb") as f: |
|
file_bytes = f.read() |
|
|
|
|
|
file_data = base64.b64encode(file_bytes).decode("ascii") |
|
headers = { |
|
"Authorization": f"token {TOKEN}", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
response = requests.post( |
|
API_URL, |
|
json={ |
|
"file": file_data, |
|
"fileType": 0 if file_type == "pdf" else 1, |
|
"useFormulaRecognition": use_formula_recognition, |
|
"useChartRecognition": use_chart_recognition, |
|
"useDocOrientationClassify": use_doc_orientation_classify, |
|
"useDocUnwarping": use_doc_unwarping, |
|
"useTextlineOrientation": use_textline_orientation, |
|
}, |
|
headers=headers, |
|
timeout=1000, |
|
) |
|
response.raise_for_status() |
|
|
|
|
|
result = response.json() |
|
layout_results = result.get("result", {}).get("layoutParsingResults", []) |
|
|
|
layout_ordering_images = [] |
|
markdown_texts = [] |
|
markdown_images = [] |
|
markdown_content_list = [] |
|
input_images = [] |
|
for res in layout_results: |
|
layout_ordering_images.append( |
|
url_to_bytes(res["outputImages"]["layout_order_res"]) |
|
) |
|
markdown = res["markdown"] |
|
markdown_text = markdown["text"] |
|
markdown_texts.append(markdown_text) |
|
img_path_to_url = markdown["images"] |
|
img_path_to_bytes = {} |
|
for path, url in img_path_to_url.items(): |
|
img_path_to_bytes[path] = url_to_bytes(url) |
|
markdown_images.append(img_path_to_bytes) |
|
input_images.append(url_to_bytes(res["inputImage"])) |
|
markdown_content = embed_images_into_markdown_text( |
|
markdown_text, img_path_to_url |
|
) |
|
markdown_content_list.append(markdown_content) |
|
|
|
concatenated_markdown_content = concatenate_markdown_pages( |
|
[res["markdown"] for res in layout_results] |
|
) |
|
|
|
return { |
|
"original_file": file_path, |
|
"file_type": file_type, |
|
"layout_ordering_images": layout_ordering_images, |
|
"markdown_texts": markdown_texts, |
|
"markdown_images": markdown_images, |
|
"markdown_content_list": markdown_content_list, |
|
"concatenated_markdown_content": concatenated_markdown_content, |
|
"input_images": input_images, |
|
"api_response": result, |
|
} |
|
|
|
except requests.exceptions.RequestException as e: |
|
raise gr.Error(f"API request failed: {str(e)}") |
|
except Exception as e: |
|
raise gr.Error(f"Error processing file: {str(e)}") |
|
|
|
|
|
def export_full_results(results): |
|
"""Create ZIP file with all analysis results""" |
|
try: |
|
if not results: |
|
raise ValueError("No results to export") |
|
|
|
filename = Path(results["original_file"]).stem + f"_{uuid.uuid4().hex}.zip" |
|
zip_path = Path(TEMP_DIR.name, filename) |
|
|
|
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: |
|
for i, img_bytes in enumerate(results["layout_ordering_images"]): |
|
zipf.writestr(f"layout_ordering_images/page_{i+1}.jpg", img_bytes) |
|
|
|
for i, (md_text, md_imgs) in enumerate( |
|
zip( |
|
results["markdown_texts"], |
|
results["markdown_images"], |
|
) |
|
): |
|
zipf.writestr(f"markdown/page_{i+1}.md", md_text) |
|
for img_path, img_bytes in md_imgs.items(): |
|
zipf.writestr(f"markdown/{img_path}", img_bytes) |
|
|
|
|
|
api_response = results.get("api_response", {}) |
|
zipf.writestr( |
|
"api_response.json", |
|
json.dumps(api_response, indent=2, ensure_ascii=False), |
|
) |
|
|
|
for i, img_bytes in enumerate(results["input_images"]): |
|
zipf.writestr(f"input_images/page_{i+1}.jpg", img_bytes) |
|
|
|
return str(zip_path) |
|
|
|
except Exception as e: |
|
raise gr.Error(f"Error creating ZIP file: {str(e)}") |
|
|
|
|
|
with gr.Blocks(css=CSS, title="Document Analysis System") as demo: |
|
results_state = gr.State() |
|
|
|
|
|
with gr.Column(elem_classes=["logo-container"]): |
|
gr.HTML(f'<img src="{LOGO_BASE64}" class="logo-img">') |
|
|
|
|
|
with gr.Row(elem_classes=["nav-bar"]): |
|
gr.HTML( |
|
""" |
|
<div class="nav-links"> |
|
<a href="https://github.com/PaddlePaddle/PaddleOCR" class="nav-link" target="_blank">GitHub</a> |
|
<a href="https://paddleocr.ai" class="nav-link" target="_blank">paddleocr.ai</a> |
|
</div> |
|
""" |
|
) |
|
|
|
|
|
with gr.Column(elem_classes=["upload-section"]): |
|
file_input = gr.File( |
|
label="Upload Document", |
|
file_types=[".pdf", ".jpg", ".jpeg", ".png"], |
|
type="filepath", |
|
) |
|
with gr.Row(): |
|
use_formula_recognition_cb = gr.Checkbox( |
|
value=True, label="Use formula recognition" |
|
) |
|
use_chart_recognition_cb = gr.Checkbox( |
|
value=False, label="Use chart recognition" |
|
) |
|
with gr.Row(): |
|
use_doc_orientation_classify_cb = gr.Checkbox( |
|
value=False, label="Use document image orientation classification" |
|
) |
|
use_doc_unwarping_cb = gr.Checkbox( |
|
value=False, label="Use text image unwarping" |
|
) |
|
with gr.Row(): |
|
use_textline_orientation_cb = gr.Checkbox( |
|
value=False, label="Use text line orientation classification" |
|
) |
|
concatenate_pages_cb = gr.Checkbox(value=True, label="Concatenate pages") |
|
process_btn = gr.Button("Analyze Document", variant="primary") |
|
gr.Markdown( |
|
f""" |
|
1. Only the first {MAX_NUM_PAGES} pages will be processed. |
|
2. Some formulas might not display correctly because of renderer limitations. |
|
""" |
|
) |
|
|
|
loading_spinner = gr.Column(visible=False, elem_classes=["loader-container"]) |
|
with loading_spinner: |
|
gr.HTML( |
|
""" |
|
<div class="loader"></div> |
|
<p>Processing, please wait...</p> |
|
""" |
|
) |
|
|
|
|
|
with gr.Column(): |
|
gr.Markdown("### Results") |
|
with gr.Row(): |
|
with gr.Column(): |
|
layout_ordering_images = [] |
|
for i in range(MAX_NUM_PAGES): |
|
layout_ordering_images.append( |
|
gr.Image( |
|
label=f"Layout Ordering Image {i}", |
|
show_label=True, |
|
visible=False, |
|
) |
|
) |
|
with gr.Column(): |
|
markdown_display_list = [] |
|
for i in range(MAX_NUM_PAGES): |
|
markdown_display_list.append( |
|
gr.Markdown( |
|
visible=False, |
|
container=True, |
|
show_copy_button=True, |
|
latex_delimiters=[ |
|
{"left": "$$", "right": "$$", "display": True}, |
|
{"left": "$", "right": "$", "display": False}, |
|
], |
|
) |
|
) |
|
|
|
|
|
with gr.Column(elem_classes=["download-section"]): |
|
gr.Markdown("### Result Export") |
|
download_all_btn = gr.Button("Download Full Results (ZIP)", variant="primary") |
|
download_file = gr.File(visible=False, label="Download File") |
|
|
|
|
|
def toggle_spinner(): |
|
return gr.Column(visible=True) |
|
|
|
def hide_spinner(): |
|
return gr.Column(visible=False) |
|
|
|
def update_display(results, concatenate_pages): |
|
if not results: |
|
return gr.skip() |
|
|
|
assert len(results["layout_ordering_images"]) <= MAX_NUM_PAGES, len( |
|
results["layout_ordering_images"] |
|
) |
|
ret_img = [] |
|
for img in results["layout_ordering_images"]: |
|
ret_img.append(gr.Image(value=bytes_to_image(img), visible=True)) |
|
for _ in range(len(results["layout_ordering_images"]), MAX_NUM_PAGES): |
|
ret_img.append(gr.Image(visible=False)) |
|
|
|
if concatenate_pages: |
|
markdown_content = results["concatenated_markdown_content"] |
|
ret_cont = [gr.Markdown(value=markdown_content, visible=True)] |
|
for _ in range(1, MAX_NUM_PAGES): |
|
ret_cont.append(gr.Markdown(visible=False)) |
|
else: |
|
assert len(results["markdown_content_list"]) <= MAX_NUM_PAGES, len( |
|
results["markdown_content_list"] |
|
) |
|
ret_cont = [] |
|
for cont in results["markdown_content_list"]: |
|
ret_cont.append(gr.Markdown(value=cont, visible=True)) |
|
for _ in range(len(results["markdown_content_list"]), MAX_NUM_PAGES): |
|
ret_cont.append(gr.Markdown(visible=False)) |
|
return ret_img + ret_cont |
|
|
|
process_btn.click(toggle_spinner, outputs=[loading_spinner]).then( |
|
process_file, |
|
inputs=[ |
|
file_input, |
|
use_formula_recognition_cb, |
|
use_chart_recognition_cb, |
|
use_doc_orientation_classify_cb, |
|
use_doc_unwarping_cb, |
|
use_textline_orientation_cb, |
|
], |
|
outputs=[results_state], |
|
).then(hide_spinner, outputs=[loading_spinner]).then( |
|
update_display, |
|
inputs=[results_state, concatenate_pages_cb], |
|
outputs=layout_ordering_images + markdown_display_list, |
|
) |
|
|
|
download_all_btn.click( |
|
export_full_results, inputs=[results_state], outputs=[download_file] |
|
).success(lambda: gr.File(visible=True), outputs=[download_file]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
favicon_path=LOGO_PATH, |
|
) |
|
|