Spaces:
Running
Running
import gradio as gr | |
import pandas as pd | |
import io | |
import zipfile | |
from datetime import datetime | |
import traceback | |
import tempfile | |
import os | |
# ติดตั้ง dependencies ที่จำเป็น | |
try: | |
from PyPDF2 import PdfReader, PdfWriter | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.pdfbase.ttfonts import TTFont | |
except ImportError as e: | |
print(f"กำลังติดตั้ง dependencies: {e}") | |
import subprocess | |
import sys | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2", "reportlab", "pandas"]) | |
from PyPDF2 import PdfReader, PdfWriter | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter | |
def analyze_pdf_fields(pdf_path): | |
"""วิเคราะห์ฟิลด์ใน PDF""" | |
try: | |
reader = PdfReader(pdf_path) | |
all_fields = {} | |
# ตรวจสอบจาก AcroForm | |
if reader.trailer.get("/Root") and reader.trailer["/Root"].get("/AcroForm"): | |
acro_form = reader.trailer["/Root"]["/AcroForm"] | |
if "/Fields" in acro_form: | |
fields = acro_form["/Fields"] | |
for field in fields: | |
field_obj = field.get_object() | |
if "/T" in field_obj: | |
field_name = str(field_obj["/T"]).strip("()") | |
field_type = str(field_obj.get("/FT", "Unknown")) | |
all_fields[field_name] = { | |
'type': field_type, | |
'method': 'AcroForm' | |
} | |
# ตรวจสอบจาก Annotations | |
for page_num, page in enumerate(reader.pages): | |
if "/Annots" in page: | |
try: | |
annotations = page["/Annots"] | |
for annotation in annotations: | |
annot_obj = annotation.get_object() | |
if annot_obj.get("/Subtype") == "/Widget": | |
if "/T" in annot_obj: | |
field_name = str(annot_obj["/T"]).strip("()") | |
field_type = str(annot_obj.get("/FT", "Widget")) | |
all_fields[field_name] = { | |
'type': field_type, | |
'page': page_num + 1, | |
'method': 'Annotation' | |
} | |
except Exception: | |
continue | |
return all_fields | |
except Exception as e: | |
return {"error": str(e)} | |
def fill_pdf_form(pdf_path, field_data): | |
"""เติมข้อมูลในฟอร์ม PDF""" | |
try: | |
reader = PdfReader(pdf_path) | |
writer = PdfWriter() | |
# คัดลอกหน้าทั้งหมด | |
for page in reader.pages: | |
writer.add_page(page) | |
# เติมข้อมูลในฟอร์ม | |
if hasattr(writer, 'update_page_form_field_values'): | |
for page_num, page in enumerate(writer.pages): | |
try: | |
writer.update_page_form_field_values(page, field_data) | |
except Exception: | |
pass | |
# ลองวิธีอื่น | |
elif "/AcroForm" in reader.trailer.get("/Root", {}): | |
try: | |
acro_form = reader.trailer["/Root"]["/AcroForm"] | |
if "/Fields" in acro_form: | |
fields = acro_form["/Fields"] | |
for field in fields: | |
field_obj = field.get_object() | |
if "/T" in field_obj: | |
field_name = str(field_obj["/T"]).strip("()") | |
if field_name in field_data: | |
try: | |
field_obj.update({"/V": field_data[field_name]}) | |
except Exception: | |
pass | |
except Exception: | |
pass | |
return writer | |
except Exception as e: | |
raise Exception(f"ไม่สามารถเติมฟอร์มได้: {str(e)}") | |
def create_simple_pdf(data_row, filename): | |
"""สร้าง PDF ใหม่แบบง่าย""" | |
buffer = io.BytesIO() | |
p = canvas.Canvas(buffer, pagesize=letter) | |
width, height = letter | |
# ตั้งค่า font | |
p.setFont("Helvetica", 12) | |
# หัวเรื่อง | |
p.setFont("Helvetica-Bold", 16) | |
title = f"Document: {filename.replace('.pdf', '')}" | |
p.drawString(50, height - 50, title) | |
p.line(50, height - 60, 550, height - 60) | |
# เนื้อหา | |
y_position = height - 100 | |
p.setFont("Helvetica", 12) | |
for column, value in data_row.items(): | |
if pd.notna(value) and str(value).strip(): | |
clean_column = str(column).strip() | |
clean_value = str(value).strip() | |
if len(clean_value) > 80: | |
clean_value = clean_value[:77] + "..." | |
text = f"{clean_column}: {clean_value}" | |
try: | |
p.drawString(50, y_position, text) | |
except: | |
safe_text = text.encode('ascii', errors='ignore').decode('ascii') | |
p.drawString(50, y_position, safe_text) | |
y_position -= 25 | |
if y_position < 50: | |
p.showPage() | |
p.setFont("Helvetica", 12) | |
y_position = height - 50 | |
# เวลาที่สร้าง | |
p.setFont("Helvetica", 8) | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
p.drawString(50, 30, f"Created: {timestamp}") | |
p.save() | |
buffer.seek(0) | |
return buffer.getvalue() | |
def process_single_row(pdf_path, row_data, filename, use_form=True): | |
"""ประมวลผลแถวเดียว""" | |
try: | |
# เตรียมข้อมูลฟิลด์ | |
field_data = {} | |
for column, value in row_data.items(): | |
if pd.notna(value) and str(value).strip(): | |
clean_value = str(value).strip() | |
clean_column = str(column).strip() | |
# ลองหลายรูปแบบของชื่อฟิลด์ | |
field_variations = [ | |
clean_column, | |
clean_column.lower(), | |
clean_column.upper(), | |
clean_column.replace('_', ' '), | |
clean_column.replace(' ', '_'), | |
clean_column.replace('-', '_'), | |
clean_column.replace('_', '') | |
] | |
for variation in field_variations: | |
field_data[variation] = clean_value | |
if use_form: | |
try: | |
# ลองเติมฟอร์ม | |
writer = fill_pdf_form(pdf_path, field_data) | |
output_buffer = io.BytesIO() | |
writer.write(output_buffer) | |
output_buffer.seek(0) | |
return output_buffer.getvalue(), "form_filled" | |
except Exception as e: | |
# ถ้าไม่ได้ ให้สร้างใหม่ | |
pdf_content = create_simple_pdf(row_data, filename) | |
return pdf_content, f"new_pdf_created: {str(e)}" | |
else: | |
# สร้าง PDF ใหม่ | |
pdf_content = create_simple_pdf(row_data, filename) | |
return pdf_content, "new_pdf_created" | |
except Exception as e: | |
return None, f"error: {str(e)}" | |
def read_csv_safe(csv_file): | |
"""อ่าน CSV อย่างปลอดภัย""" | |
encodings = ['utf-8', 'utf-8-sig', 'cp874', 'tis-620', 'iso-8859-1', 'cp1252'] | |
separators = [',', ';', '\t', '|'] | |
for encoding in encodings: | |
for sep in separators: | |
try: | |
df = pd.read_csv(csv_file, encoding=encoding, sep=sep, engine='python') | |
if len(df.columns) > 1 and len(df) > 0: | |
return df, None | |
except Exception: | |
continue | |
try: | |
df = pd.read_csv(csv_file) | |
return df, None | |
except Exception as e: | |
return None, str(e) | |
def process_pdf_csv(pdf_file, csv_file, filename_column, file_prefix, use_form_fields, progress=gr.Progress()): | |
"""ฟังก์ชันหลักสำหรับประมวลผล PDF และ CSV""" | |
if pdf_file is None or csv_file is None: | |
return None, "❌ กรุณาอัพโหลดไฟล์ PDF และ CSV" | |
try: | |
# อ่าน CSV | |
df, csv_error = read_csv_safe(csv_file) | |
if df is None: | |
return None, f"❌ ไม่สามารถอ่าน CSV ได้: {csv_error}" | |
# วิเคราะห์ PDF | |
pdf_fields = analyze_pdf_fields(pdf_file) | |
has_form_fields = bool(pdf_fields and "error" not in pdf_fields and pdf_fields) | |
# เก็บ PDF ที่สร้าง | |
generated_pdfs = {} | |
success_count = 0 | |
error_count = 0 | |
processing_log = [] | |
# ประมวลผลแต่ละแถว | |
for index, (_, row) in enumerate(df.iterrows()): | |
progress((index + 1) / len(df), f"ประมวลผล {index + 1}/{len(df)}") | |
try: | |
# สร้างชื่อไฟล์ | |
if filename_column and filename_column in df.columns and pd.notna(row[filename_column]): | |
safe_name = str(row[filename_column]).strip() | |
safe_name = "".join(c for c in safe_name if c.isalnum() or c in (' ', '-', '_')).strip() | |
filename = f"{file_prefix}_{safe_name}.pdf" | |
else: | |
filename = f"{file_prefix}_{index + 1:03d}.pdf" | |
filename = filename.replace(' ', ' ').replace(' ', '_') | |
if not filename.endswith('.pdf'): | |
filename += '.pdf' | |
# ประมวลผล | |
pdf_content, status = process_single_row( | |
pdf_file, | |
row, | |
filename, | |
use_form_fields and has_form_fields | |
) | |
if pdf_content is not None: | |
generated_pdfs[filename] = pdf_content | |
success_count += 1 | |
processing_log.append(f"✅ {filename}: {status}") | |
else: | |
error_count += 1 | |
processing_log.append(f"❌ {filename}: {status}") | |
except Exception as e: | |
error_count += 1 | |
processing_log.append(f"💥 แถว {index + 1}: {str(e)}") | |
# สร้าง ZIP | |
if generated_pdfs: | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
for filename, pdf_content in generated_pdfs.items(): | |
zip_file.writestr(filename, pdf_content) | |
zip_buffer.seek(0) | |
# สร้างชื่อไฟล์ ZIP | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
zip_filename = f"generated_pdfs_{timestamp}.zip" | |
# บันทึกไฟล์ชั่วคราว | |
temp_zip_path = os.path.join(tempfile.gettempdir(), zip_filename) | |
with open(temp_zip_path, 'wb') as f: | |
f.write(zip_buffer.getvalue()) | |
result_message = f"✅ สร้าง PDF สำเร็จ {success_count} ไฟล์!" | |
if error_count > 0: | |
result_message += f"\n⚠️ มีข้อผิดพลาด {error_count} ไฟล์" | |
result_message += f"\n\n📋 รายละเอียด:\n" + "\n".join(processing_log[:10]) | |
if len(processing_log) > 10: | |
result_message += f"\n... และอีก {len(processing_log) - 10} รายการ" | |
return temp_zip_path, result_message | |
else: | |
return None, "❌ ไม่สามารถสร้าง PDF ได้เลย" | |
except Exception as e: | |
return None, f"❌ เกิดข้อผิดพลาด: {str(e)}\n{traceback.format_exc()}" | |
def analyze_pdf_info(pdf_file): | |
"""วิเคราะห์ข้อมูล PDF""" | |
if pdf_file is None: | |
return "ไม่มีไฟล์ PDF" | |
try: | |
reader = PdfReader(pdf_file) | |
info = f"📄 **ข้อมูล PDF:**\n" | |
info += f"- จำนวนหน้า: {len(reader.pages)}\n" | |
# ตรวจสอบฟิลด์ | |
pdf_fields = analyze_pdf_fields(pdf_file) | |
if pdf_fields and "error" not in pdf_fields and pdf_fields: | |
info += f"- จำนวน Form Fields: {len(pdf_fields)}\n" | |
info += f"\n🏷️ **รายชื่อ Fields:**\n" | |
for name, details in list(pdf_fields.items())[:10]: # แสดงแค่ 10 ตัวแรก | |
info += f" - {name} ({details.get('type', 'Unknown')})\n" | |
if len(pdf_fields) > 10: | |
info += f" - ... และอีก {len(pdf_fields) - 10} fields\n" | |
else: | |
info += "- Form Fields: ไม่พบหรือไม่สามารถอ่านได้\n" | |
info += "- หมายเหตุ: จะสร้าง PDF ใหม่แทน\n" | |
return info | |
except Exception as e: | |
return f"❌ ไม่สามารถวิเคราะห์ PDF ได้: {str(e)}" | |
def analyze_csv_info(csv_file): | |
"""วิเคราะห์ข้อมูล CSV""" | |
if csv_file is None: | |
return "ไม่มีไฟล์ CSV" | |
try: | |
df, error = read_csv_safe(csv_file) | |
if df is None: | |
return f"❌ ไม่สามารถอ่าน CSV ได้: {error}" | |
info = f"📋 **ข้อมูล CSV:**\n" | |
info += f"- จำนวนแถว: {len(df)}\n" | |
info += f"- จำนวนคอลัมน์: {len(df.columns)}\n" | |
info += f"\n📝 **รายชื่อคอลัมน์:**\n" | |
for col in df.columns[:15]: # แสดงแค่ 15 คอลัมน์แรก | |
info += f" - {col}\n" | |
if len(df.columns) > 15: | |
info += f" - ... และอีก {len(df.columns) - 15} คอลัมน์\n" | |
# ตรวจสอบข้อมูลที่ขาด | |
missing_data = df.isnull().sum() | |
if missing_data.any(): | |
missing_cols = missing_data[missing_data > 0] | |
if len(missing_cols) > 0: | |
info += f"\n⚠️ **ข้อมูลที่ขาดหาย:**\n" | |
for col, count in missing_cols.head(5).items(): | |
info += f" - {col}: {count} แถว\n" | |
return info | |
except Exception as e: | |
return f"❌ ไม่สามารถวิเคราะห์ CSV ได้: {str(e)}" | |
# สร้าง Gradio Interface | |
def create_interface(): | |
with gr.Blocks(title="PDF Form Filler", theme=gr.themes.Soft()) as app: | |
gr.Markdown(""" | |
# 📄 เครื่องมือเติมข้อมูล PDF จาก CSV | |
**เครื่องมือนี้สามารถ:** | |
- เติมข้อมูลลงในฟอร์ม PDF ที่มี form fields | |
- สร้าง PDF ใหม่หากไม่มี form fields หรือเติมไม่ได้ | |
- รองรับ CSV หลาย encoding (UTF-8, TIS-620, CP874, etc.) | |
- ส่งออกเป็นไฟล์ ZIP | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("## 📁 อัพโหลดไฟล์") | |
pdf_file = gr.File( | |
label="PDF Template", | |
file_types=[".pdf"], | |
type="filepath" | |
) | |
csv_file = gr.File( | |
label="CSV Data", | |
file_types=[".csv"], | |
type="filepath" | |
) | |
gr.Markdown("## ⚙️ ตั้งค่า") | |
filename_column = gr.Textbox( | |
label="คอลัมน์สำหรับชื่อไฟล์ (ถ้ามี)", | |
placeholder="เช่น name, id, etc.", | |
value="" | |
) | |
file_prefix = gr.Textbox( | |
label="คำนำหน้าชื่อไฟล์", | |
value="document" | |
) | |
use_form_fields = gr.Checkbox( | |
label="ใช้ Form Fields (ถ้าพบ)", | |
value=True | |
) | |
process_btn = gr.Button( | |
"🚀 สร้าง PDF ทั้งหมด", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Column(scale=2): | |
gr.Markdown("## 📊 ข้อมูลไฟล์") | |
pdf_info = gr.Markdown("ยังไม่มีไฟล์ PDF") | |
csv_info = gr.Markdown("ยังไม่มีไฟล์ CSV") | |
gr.Markdown("## 📥 ผลลัพธ์") | |
result_file = gr.File( | |
label="ไฟล์ ZIP ที่สร้าง", | |
visible=False | |
) | |
result_message = gr.Markdown("") | |
# Event handlers | |
pdf_file.change( | |
fn=analyze_pdf_info, | |
inputs=[pdf_file], | |
outputs=[pdf_info] | |
) | |
csv_file.change( | |
fn=analyze_csv_info, | |
inputs=[csv_file], | |
outputs=[csv_info] | |
) | |
process_btn.click( | |
fn=process_pdf_csv, | |
inputs=[ | |
pdf_file, | |
csv_file, | |
filename_column, | |
file_prefix, | |
use_form_fields | |
], | |
outputs=[result_file, result_message] | |
).then( | |
fn=lambda x: gr.update(visible=x is not None), | |
inputs=[result_file], | |
outputs=[result_file] | |
) | |
return app | |
# รันแอป | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, # สร้าง public URL | |
debug=True | |
) |