Bobholamovic
Fix port
e5663e2
raw
history blame
15.8 kB
import atexit
import base64
import io
import json
import os
import re
import tempfile
import uuid
import zipfile
from pathlib import Path
import gradio as gr
import requests
from PIL import Image
# API Configuration
API_URL = "https://cf38vaydqdl2l4p2.aistudio-hub.baidu.com/layout-parsing"
TOKEN = os.getenv("API_TOKEN", "")
LOGO_PATH = Path(__file__).parent / "pp-structurev3.png"
with open(LOGO_PATH, "rb") as image_file:
LOGO_BASE64 = (
f"data:image/png;base64,{base64.b64encode(image_file.read()).decode('utf-8')}"
)
TEMP_DIR = tempfile.TemporaryDirectory()
atexit.register(TEMP_DIR.cleanup)
CSS = """
:root {
--sand-color: #FAF9F6;
--white: #ffffff;
--shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
--text-color: #F3F4F7;
--black:#000000;
--link-hover: #2b6cb0;
--content-width: 1200px;
}
body {
display: flex;
justify-content: center;
background-color: var(--sand-color);
color: var(--text-color);
font-family: Arial, sans-serif;
}
.gradio-container {
max-width: var(--content-width) !important;
width: 100% !important;
margin: 20px auto;
padding: 20px;
background-color: var(--white);
}
#component-0,
#tabs,
#settings {
background-color: var(--white) !important;
padding: 15px;
}
.upload-section {
width: 100%;
margin: 0 auto 30px;
padding: 20px;
background-color: var(--sand-color) !important;
border-radius: 8px;
box-shadow: var(--shadow);
}
.center-content {
display: flex;
flex-direction: column;
align-items: center;
text-align: center;
margin-bottom: 20px;
}
.header {
margin-bottom: 30px;
width: 100%;
}
.logo-container {
width: 100%;
margin-bottom: 20px;
}
.logo-img {
width: 100%;
max-width: var(--content-width);
margin: 0 auto;
display: block;
}
.nav-bar {
display: flex;
justify-content: center;
background-color: var(--white);
padding: 15px 0;
box-shadow: var(--shadow);
margin-bottom: 20px;
}
.nav-links {
display: flex;
gap: 30px;
width: 100%;
justify-content: center;
}
.nav-link {
color: var(--black);
text-decoration: none;
font-weight: bold;
font-size: 24px;
transition: color 0.2s;
}
.nav-link:hover {
color: var(--link-hover);
text-decoration: none;
}
button {
background-color: var(--text-color) !important;
color: var(--black) !important;
border: none !important;
border-radius: 4px;
padding: 8px 16px;
}
button:hover {
opacity: 0.8 !important;
}
.file-download {
margin-top: 15px !important;
}
.loader {
border: 5px solid #f3f3f3;
border-top: 5px solid #3498db;
border-radius: 50%;
width: 50px;
height: 50px;
animation: spin 1s linear infinite;
margin: 20px auto;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.loader-container {
text-align: center;
margin: 20px 0;
}
"""
MAX_NUM_PAGES = 10
def url_to_bytes(url, *, timeout=10):
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
return resp.content
def bytes_to_image(image_bytes):
return Image.open(io.BytesIO(image_bytes))
def embed_images_into_markdown_text(markdown_text, markdown_images):
for img_path, img_url in markdown_images.items():
# HACK
markdown_text = markdown_text.replace(
f'<img src="{img_path}"', f'<img src="{img_url}"'
)
return markdown_text
# HACK: Adapted from PaddleX 3.0.0 code
def concatenate_markdown_pages(markdown_list):
markdown_texts = ""
previous_page_last_element_paragraph_end_flag = True
for res in markdown_list:
# Get the paragraph flags for the current page
page_first_element_paragraph_start_flag: bool = res["isStart"]
page_last_element_paragraph_end_flag: bool = res["isEnd"]
# Determine whether to add a space or a newline
if (
not page_first_element_paragraph_start_flag
and not previous_page_last_element_paragraph_end_flag
):
last_char_of_markdown = markdown_texts[-1] if markdown_texts else ""
first_char_of_handler = res["text"]
# Check if the last character and the first character are Chinese characters
last_is_chinese_char = (
re.match(r"[\u4e00-\u9fff]", last_char_of_markdown)
if last_char_of_markdown
else False
)
first_is_chinese_char = (
re.match(r"[\u4e00-\u9fff]", first_char_of_handler)
if first_char_of_handler
else False
)
if not (last_is_chinese_char or first_is_chinese_char):
markdown_texts += " " + res["text"]
else:
markdown_texts += res["text"]
else:
markdown_texts += "\n\n" + res["text"]
previous_page_last_element_paragraph_end_flag = (
page_last_element_paragraph_end_flag
)
return markdown_texts
def process_file(
file_path,
use_formula_recognition,
use_chart_recognition,
use_doc_orientation_classify,
use_doc_unwarping,
use_textline_orientation,
):
"""Process uploaded file with API"""
try:
if not file_path:
raise ValueError("Please upload a file first")
if Path(file_path).suffix == ".pdf":
file_type = "pdf"
else:
file_type = "image"
# Read file content
with open(file_path, "rb") as f:
file_bytes = f.read()
# Call API for processing
file_data = base64.b64encode(file_bytes).decode("ascii")
headers = {
"Authorization": f"token {TOKEN}",
"Content-Type": "application/json",
}
response = requests.post(
API_URL,
json={
"file": file_data,
"fileType": 0 if file_type == "pdf" else 1,
"useFormulaRecognition": use_formula_recognition,
"useChartRecognition": use_chart_recognition,
"useDocOrientationClassify": use_doc_orientation_classify,
"useDocUnwarping": use_doc_unwarping,
"useTextlineOrientation": use_textline_orientation,
},
headers=headers,
timeout=1000,
)
response.raise_for_status()
# Parse API response
result = response.json()
layout_results = result.get("result", {}).get("layoutParsingResults", [])
layout_ordering_images = []
markdown_texts = []
markdown_images = []
markdown_content_list = []
input_images = []
for res in layout_results:
layout_ordering_images.append(
url_to_bytes(res["outputImages"]["layout_order_res"])
)
markdown = res["markdown"]
markdown_text = markdown["text"]
markdown_texts.append(markdown_text)
img_path_to_url = markdown["images"]
img_path_to_bytes = {}
for path, url in img_path_to_url.items():
img_path_to_bytes[path] = url_to_bytes(url)
markdown_images.append(img_path_to_bytes)
input_images.append(url_to_bytes(res["inputImage"]))
markdown_content = embed_images_into_markdown_text(
markdown_text, img_path_to_url
)
markdown_content_list.append(markdown_content)
concatenated_markdown_content = concatenate_markdown_pages(
[res["markdown"] for res in layout_results]
)
return {
"original_file": file_path,
"file_type": file_type,
"layout_ordering_images": layout_ordering_images,
"markdown_texts": markdown_texts,
"markdown_images": markdown_images,
"markdown_content_list": markdown_content_list,
"concatenated_markdown_content": concatenated_markdown_content,
"input_images": input_images,
"api_response": result,
}
except requests.exceptions.RequestException as e:
raise gr.Error(f"API request failed: {str(e)}")
except Exception as e:
raise gr.Error(f"Error processing file: {str(e)}")
def export_full_results(results):
"""Create ZIP file with all analysis results"""
try:
if not results:
raise ValueError("No results to export")
filename = Path(results["original_file"]).stem + f"_{uuid.uuid4().hex}.zip"
zip_path = Path(TEMP_DIR.name, filename)
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for i, img_bytes in enumerate(results["layout_ordering_images"]):
zipf.writestr(f"layout_ordering_images/page_{i+1}.jpg", img_bytes)
for i, (md_text, md_imgs) in enumerate(
zip(
results["markdown_texts"],
results["markdown_images"],
)
):
zipf.writestr(f"markdown/page_{i+1}.md", md_text)
for img_path, img_bytes in md_imgs.items():
zipf.writestr(f"markdown/{img_path}", img_bytes)
# Add API response
api_response = results.get("api_response", {})
zipf.writestr(
"api_response.json",
json.dumps(api_response, indent=2, ensure_ascii=False),
)
for i, img_bytes in enumerate(results["input_images"]):
zipf.writestr(f"input_images/page_{i+1}.jpg", img_bytes)
return str(zip_path)
except Exception as e:
raise gr.Error(f"Error creating ZIP file: {str(e)}")
with gr.Blocks(css=CSS, title="Document Analysis System") as demo:
results_state = gr.State()
# Header with logo
with gr.Column(elem_classes=["logo-container"]):
gr.HTML(f'<img src="{LOGO_BASE64}" class="logo-img">')
# Navigation bar
with gr.Row(elem_classes=["nav-bar"]):
gr.HTML(
"""
<div class="nav-links">
<a href="https://github.com/PaddlePaddle/PaddleOCR" class="nav-link" target="_blank">GitHub</a>
<a href="https://paddleocr.ai" class="nav-link" target="_blank">paddleocr.ai</a>
</div>
"""
)
# Upload section
with gr.Column(elem_classes=["upload-section"]):
file_input = gr.File(
label="Upload Document",
file_types=[".pdf", ".jpg", ".jpeg", ".png"],
type="filepath",
)
with gr.Row():
use_formula_recognition_cb = gr.Checkbox(
value=True, label="Use formula recognition"
)
use_chart_recognition_cb = gr.Checkbox(
value=False, label="Use chart recognition"
)
with gr.Row():
use_doc_orientation_classify_cb = gr.Checkbox(
value=False, label="Use document image orientation classification"
)
use_doc_unwarping_cb = gr.Checkbox(
value=False, label="Use text image unwarping"
)
with gr.Row():
use_textline_orientation_cb = gr.Checkbox(
value=False, label="Use text line orientation classification"
)
concatenate_pages_cb = gr.Checkbox(value=True, label="Concatenate pages")
process_btn = gr.Button("Analyze Document", variant="primary")
gr.Markdown(
f"""
1. Only the first {MAX_NUM_PAGES} pages will be processed.
2. Some formulas might not display correctly because of renderer limitations.
"""
)
loading_spinner = gr.Column(visible=False, elem_classes=["loader-container"])
with loading_spinner:
gr.HTML(
"""
<div class="loader"></div>
<p>Processing, please wait...</p>
"""
)
# Results display section
with gr.Column():
gr.Markdown("### Results")
with gr.Row():
with gr.Column():
layout_ordering_images = []
for i in range(MAX_NUM_PAGES):
layout_ordering_images.append(
gr.Image(
label=f"Layout Ordering Image {i}",
show_label=True,
visible=False,
)
)
with gr.Column():
markdown_display_list = []
for i in range(MAX_NUM_PAGES):
markdown_display_list.append(
gr.Markdown(
visible=False,
container=True,
show_copy_button=True,
latex_delimiters=[
{"left": "$$", "right": "$$", "display": True},
{"left": "$", "right": "$", "display": False},
],
)
)
# Download section
with gr.Column(elem_classes=["download-section"]):
gr.Markdown("### Result Export")
download_all_btn = gr.Button("Download Full Results (ZIP)", variant="primary")
download_file = gr.File(visible=False, label="Download File")
# Interaction logic
def toggle_spinner():
return gr.Column(visible=True)
def hide_spinner():
return gr.Column(visible=False)
def update_display(results, concatenate_pages):
if not results:
return gr.skip()
assert len(results["layout_ordering_images"]) <= MAX_NUM_PAGES, len(
results["layout_ordering_images"]
)
ret_img = []
for img in results["layout_ordering_images"]:
ret_img.append(gr.Image(value=bytes_to_image(img), visible=True))
for _ in range(len(results["layout_ordering_images"]), MAX_NUM_PAGES):
ret_img.append(gr.Image(visible=False))
if concatenate_pages:
markdown_content = results["concatenated_markdown_content"]
ret_cont = [gr.Markdown(value=markdown_content, visible=True)]
for _ in range(1, MAX_NUM_PAGES):
ret_cont.append(gr.Markdown(visible=False))
else:
assert len(results["markdown_content_list"]) <= MAX_NUM_PAGES, len(
results["markdown_content_list"]
)
ret_cont = []
for cont in results["markdown_content_list"]:
ret_cont.append(gr.Markdown(value=cont, visible=True))
for _ in range(len(results["markdown_content_list"]), MAX_NUM_PAGES):
ret_cont.append(gr.Markdown(visible=False))
return ret_img + ret_cont
process_btn.click(toggle_spinner, outputs=[loading_spinner]).then(
process_file,
inputs=[
file_input,
use_formula_recognition_cb,
use_chart_recognition_cb,
use_doc_orientation_classify_cb,
use_doc_unwarping_cb,
use_textline_orientation_cb,
],
outputs=[results_state],
).then(hide_spinner, outputs=[loading_spinner]).then(
update_display,
inputs=[results_state, concatenate_pages_cb],
outputs=layout_ordering_images + markdown_display_list,
)
download_all_btn.click(
export_full_results, inputs=[results_state], outputs=[download_file]
).success(lambda: gr.File(visible=True), outputs=[download_file])
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
favicon_path=LOGO_PATH,
)