Bobholamovic
[Feat] Big update
455679a
raw
history blame
11.4 kB
import atexit
import base64
import io
import json
import os
import tempfile
import uuid
import zipfile
from pathlib import Path
import gradio as gr
import requests
from PIL import Image
# API Configuration
API_URL = "https://cf38vaydqdl2l4p2.aistudio-hub.baidu.com/layout-parsing"
TOKEN = os.getenv("API_TOKEN")
LOGO_PATH = Path(__file__).parent / "pp-structurev3.png"
with open(LOGO_PATH, "rb") as image_file:
LOGO_BASE64 = (
f"data:image/png;base64,{base64.b64encode(image_file.read()).decode('utf-8')}"
)
TEMP_DIR = tempfile.TemporaryDirectory()
atexit.register(TEMP_DIR.cleanup)
CSS = """
:root {
--sand-color: #FAF9F6;
--white: #ffffff;
--shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
--text-color: #F3F4F7;
--black:#000000;
--link-hover: #2b6cb0;
--content-width: 1200px;
}
body {
display: flex;
justify-content: center;
background-color: var(--sand-color);
color: var(--text-color);
font-family: Arial, sans-serif;
}
.gradio-container {
max-width: var(--content-width) !important;
width: 100% !important;
margin: 20px auto;
padding: 20px;
background-color: var(--white);
}
#component-0,
#tabs,
#settings {
background-color: var(--white) !important;
padding: 15px;
}
.upload-section {
width: 100%;
margin: 0 auto 30px;
padding: 20px;
background-color: var(--sand-color) !important;
border-radius: 8px;
box-shadow: var(--shadow);
}
.center-content {
display: flex;
flex-direction: column;
align-items: center;
text-align: center;
margin-bottom: 20px;
}
.header {
margin-bottom: 30px;
width: 100%;
}
.logo-container {
width: 100%;
margin-bottom: 20px;
}
.logo-img {
width: 100%;
max-width: var(--content-width);
margin: 0 auto;
display: block;
}
.nav-bar {
display: flex;
justify-content: center;
background-color: var(--white);
padding: 15px 0;
box-shadow: var(--shadow);
margin-bottom: 20px;
}
.nav-links {
display: flex;
gap: 30px;
width: 100%;
justify-content: center;
}
.nav-link {
color: var(--black);
text-decoration: none;
font-weight: bold;
font-size: 24px;
transition: color 0.2s;
}
.nav-link:hover {
color: var(--link-hover);
text-decoration: none;
}
button {
background-color: var(--text-color) !important;
color: var(--black) !important;
border: none !important;
border-radius: 4px;
padding: 8px 16px;
}
button:hover {
opacity: 0.8 !important;
}
.file-download {
margin-top: 15px !important;
}
.loader {
border: 5px solid #f3f3f3;
border-top: 5px solid #3498db;
border-radius: 50%;
width: 50px;
height: 50px;
animation: spin 1s linear infinite;
margin: 20px auto;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.loader-container {
text-align: center;
margin: 20px 0;
}
"""
MAX_NUM_PAGES = 10
def url_to_bytes(url, *, timeout=10):
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
return resp.content
def bytes_to_image(image_bytes):
return Image.open(io.BytesIO(image_bytes))
def embed_images_into_markdown_text(markdown_text, markdown_images):
for img_path, img_url in markdown_images.items():
# HACK
markdown_text = markdown_text.replace(
f'<img src="{img_path}"', f'<img src="{img_url}"'
)
return markdown_text
def process_file(file_path):
"""Process uploaded file with API"""
try:
if not file_path:
raise ValueError("Please upload a file first")
if Path(file_path).suffix == ".pdf":
file_type = "pdf"
else:
file_type = "image"
# Read file content
with open(file_path, "rb") as f:
file_bytes = f.read()
# Call API for processing
file_data = base64.b64encode(file_bytes).decode("ascii")
headers = {
"Authorization": f"token {TOKEN}",
"Content-Type": "application/json",
}
response = requests.post(
API_URL,
json={"file": file_data, "fileType": 0 if file_type == "pdf" else 1},
headers=headers,
timeout=1000,
)
response.raise_for_status()
# Parse API response
result = response.json()
layout_results = result.get("result", {}).get("layoutParsingResults", [])
layout_ordering_images = []
markdown_texts = []
markdown_images = []
markdown_content_list = []
input_images = []
for res in layout_results:
layout_ordering_images.append(
url_to_bytes(res["outputImages"]["layout_order_res"])
)
markdown = res["markdown"]
markdown_text = markdown["text"]
markdown_texts.append(markdown_text)
img_path_to_url = markdown["images"]
img_path_to_bytes = {}
for path, url in img_path_to_url.items():
img_path_to_bytes[path] = url_to_bytes(url)
markdown_images.append(img_path_to_bytes)
input_images.append(url_to_bytes(res["inputImage"]))
markdown_content = embed_images_into_markdown_text(
markdown_text, img_path_to_url
)
markdown_content_list.append(markdown_content)
return {
"original_file": file_path,
"file_type": file_type,
"layout_ordering_images": layout_ordering_images,
"markdown_texts": markdown_texts,
"markdown_images": markdown_images,
"markdown_content_list": markdown_content_list,
"input_images": input_images,
"api_response": result,
}
except requests.exceptions.RequestException as e:
raise gr.Error(f"API request failed: {str(e)}")
except Exception as e:
raise gr.Error(f"Error processing file: {str(e)}")
def export_full_results(results):
"""Create ZIP file with all analysis results"""
try:
if not results:
raise ValueError("No results to export")
filename = Path(results["original_file"]).stem + f"_{uuid.uuid4().hex}.zip"
zip_path = Path(TEMP_DIR.name, filename)
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf:
for i, img_bytes in enumerate(results["layout_ordering_images"]):
zipf.writestr(f"layout_ordering_images/page_{i+1}.jpg", img_bytes)
for i, (md_text, md_imgs) in enumerate(
zip(
results["markdown_texts"],
results["markdown_images"],
)
):
zipf.writestr(f"markdown/page_{i+1}.md", md_text)
for img_path, img_bytes in md_imgs.items():
zipf.writestr(f"markdown/{img_path}", img_bytes)
# Add API response
api_response = results.get("api_response", {})
zipf.writestr(
"api_response.json",
json.dumps(api_response, indent=2, ensure_ascii=False),
)
for i, img_bytes in enumerate(results["input_images"]):
zipf.writestr(f"input_images/page_{i+1}.jpg", img_bytes)
return str(zip_path)
except Exception as e:
raise gr.Error(f"Error creating ZIP file: {str(e)}")
with gr.Blocks(css=CSS, title="Document Analysis System") as demo:
results_state = gr.State()
# Header with logo
with gr.Column(elem_classes=["logo-container"]):
gr.HTML(f'<img src="{LOGO_BASE64}" class="logo-img">')
# Navigation bar
with gr.Row(elem_classes=["nav-bar"]):
gr.HTML(
"""
<div class="nav-links">
<a href="https://github.com/PaddlePaddle/PaddleOCR" class="nav-link" target="_blank">GitHub</a>
<a href="https://paddleocr.ai" class="nav-link" target="_blank">paddleocr.ai</a>
</div>
"""
)
# Upload section
with gr.Column(elem_classes=["upload-section"]):
file_input = gr.File(
label="Upload Document",
file_types=[".pdf", ".jpg", ".jpeg", ".png"],
type="filepath",
)
process_btn = gr.Button("Analyze Document", variant="primary")
gr.Markdown(
f"*Please note that only the first {MAX_NUM_PAGES} pages will be processed.*"
)
loading_spinner = gr.Column(visible=False, elem_classes=["loader-container"])
with loading_spinner:
gr.HTML(
"""
<div class="loader"></div>
<p>Processing, please wait...</p>
"""
)
# Results display section
with gr.Column():
gr.Markdown("### Results")
layout_ordering_images = []
markdown_display_list = []
for i in range(MAX_NUM_PAGES):
with gr.Row():
layout_ordering_images.append(
gr.Image(
label=f"Layout Ordering Image {i}",
show_label=True,
visible=False,
)
)
markdown_display_list.append(
gr.Markdown(
visible=False,
container=True,
show_copy_button=True,
latex_delimiters=[
{"left": "$$", "right": "$$", "display": True},
{"left": "$", "right": "$", "display": False},
],
)
)
# Download section
with gr.Column(elem_classes=["download-section"]):
gr.Markdown("### Result Export")
download_all_btn = gr.Button("Download Full Results (ZIP)", variant="primary")
download_file = gr.File(visible=False, label="Download File")
# Interaction logic
def toggle_spinner():
return gr.update(visible=True)
def hide_spinner():
return gr.update(visible=False)
def update_display(results):
ret_img = []
ret_cont = []
cnt = 0
for img, cont in zip(
results["layout_ordering_images"], results["markdown_content_list"]
):
ret_img.append(gr.update(value=bytes_to_image(img), visible=True))
ret_cont.append(gr.update(value=cont, visible=True))
cnt += 1
for _ in range(cnt, MAX_NUM_PAGES):
ret_img.append(gr.update(visible=False))
ret_cont.append(gr.update(visible=False))
return ret_img + ret_cont
process_btn.click(toggle_spinner, outputs=[loading_spinner]).then(
process_file, inputs=[file_input], outputs=[results_state]
).then(hide_spinner, outputs=[loading_spinner]).then(
update_display,
inputs=[results_state],
outputs=layout_ordering_images + markdown_display_list,
)
download_all_btn.click(
export_full_results, inputs=[results_state], outputs=[download_file]
).success(lambda: gr.update(visible=True), outputs=[download_file])
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
favicon_path=LOGO_PATH,
)