95win / pdfk.py
protae5544's picture
Create pdfk.py
fd34976 verified
import gradio as gr
import pandas as pd
import io
import zipfile
from datetime import datetime
import traceback
import tempfile
import os
# ติดตั้ง dependencies ที่จำเป็น
try:
from PyPDF2 import PdfReader, PdfWriter
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
except ImportError as e:
print(f"กำลังติดตั้ง dependencies: {e}")
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2", "reportlab", "pandas"])
from PyPDF2 import PdfReader, PdfWriter
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
def analyze_pdf_fields(pdf_path):
"""วิเคราะห์ฟิลด์ใน PDF"""
try:
reader = PdfReader(pdf_path)
all_fields = {}
# ตรวจสอบจาก AcroForm
if reader.trailer.get("/Root") and reader.trailer["/Root"].get("/AcroForm"):
acro_form = reader.trailer["/Root"]["/AcroForm"]
if "/Fields" in acro_form:
fields = acro_form["/Fields"]
for field in fields:
field_obj = field.get_object()
if "/T" in field_obj:
field_name = str(field_obj["/T"]).strip("()")
field_type = str(field_obj.get("/FT", "Unknown"))
all_fields[field_name] = {
'type': field_type,
'method': 'AcroForm'
}
# ตรวจสอบจาก Annotations
for page_num, page in enumerate(reader.pages):
if "/Annots" in page:
try:
annotations = page["/Annots"]
for annotation in annotations:
annot_obj = annotation.get_object()
if annot_obj.get("/Subtype") == "/Widget":
if "/T" in annot_obj:
field_name = str(annot_obj["/T"]).strip("()")
field_type = str(annot_obj.get("/FT", "Widget"))
all_fields[field_name] = {
'type': field_type,
'page': page_num + 1,
'method': 'Annotation'
}
except Exception:
continue
return all_fields
except Exception as e:
return {"error": str(e)}
def fill_pdf_form(pdf_path, field_data):
"""เติมข้อมูลในฟอร์ม PDF"""
try:
reader = PdfReader(pdf_path)
writer = PdfWriter()
# คัดลอกหน้าทั้งหมด
for page in reader.pages:
writer.add_page(page)
# เติมข้อมูลในฟอร์ม
if hasattr(writer, 'update_page_form_field_values'):
for page_num, page in enumerate(writer.pages):
try:
writer.update_page_form_field_values(page, field_data)
except Exception:
pass
# ลองวิธีอื่น
elif "/AcroForm" in reader.trailer.get("/Root", {}):
try:
acro_form = reader.trailer["/Root"]["/AcroForm"]
if "/Fields" in acro_form:
fields = acro_form["/Fields"]
for field in fields:
field_obj = field.get_object()
if "/T" in field_obj:
field_name = str(field_obj["/T"]).strip("()")
if field_name in field_data:
try:
field_obj.update({"/V": field_data[field_name]})
except Exception:
pass
except Exception:
pass
return writer
except Exception as e:
raise Exception(f"ไม่สามารถเติมฟอร์มได้: {str(e)}")
def create_simple_pdf(data_row, filename):
"""สร้าง PDF ใหม่แบบง่าย"""
buffer = io.BytesIO()
p = canvas.Canvas(buffer, pagesize=letter)
width, height = letter
# ตั้งค่า font
p.setFont("Helvetica", 12)
# หัวเรื่อง
p.setFont("Helvetica-Bold", 16)
title = f"Document: {filename.replace('.pdf', '')}"
p.drawString(50, height - 50, title)
p.line(50, height - 60, 550, height - 60)
# เนื้อหา
y_position = height - 100
p.setFont("Helvetica", 12)
for column, value in data_row.items():
if pd.notna(value) and str(value).strip():
clean_column = str(column).strip()
clean_value = str(value).strip()
if len(clean_value) > 80:
clean_value = clean_value[:77] + "..."
text = f"{clean_column}: {clean_value}"
try:
p.drawString(50, y_position, text)
except:
safe_text = text.encode('ascii', errors='ignore').decode('ascii')
p.drawString(50, y_position, safe_text)
y_position -= 25
if y_position < 50:
p.showPage()
p.setFont("Helvetica", 12)
y_position = height - 50
# เวลาที่สร้าง
p.setFont("Helvetica", 8)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
p.drawString(50, 30, f"Created: {timestamp}")
p.save()
buffer.seek(0)
return buffer.getvalue()
def process_single_row(pdf_path, row_data, filename, use_form=True):
"""ประมวลผลแถวเดียว"""
try:
# เตรียมข้อมูลฟิลด์
field_data = {}
for column, value in row_data.items():
if pd.notna(value) and str(value).strip():
clean_value = str(value).strip()
clean_column = str(column).strip()
# ลองหลายรูปแบบของชื่อฟิลด์
field_variations = [
clean_column,
clean_column.lower(),
clean_column.upper(),
clean_column.replace('_', ' '),
clean_column.replace(' ', '_'),
clean_column.replace('-', '_'),
clean_column.replace('_', '')
]
for variation in field_variations:
field_data[variation] = clean_value
if use_form:
try:
# ลองเติมฟอร์ม
writer = fill_pdf_form(pdf_path, field_data)
output_buffer = io.BytesIO()
writer.write(output_buffer)
output_buffer.seek(0)
return output_buffer.getvalue(), "form_filled"
except Exception as e:
# ถ้าไม่ได้ ให้สร้างใหม่
pdf_content = create_simple_pdf(row_data, filename)
return pdf_content, f"new_pdf_created: {str(e)}"
else:
# สร้าง PDF ใหม่
pdf_content = create_simple_pdf(row_data, filename)
return pdf_content, "new_pdf_created"
except Exception as e:
return None, f"error: {str(e)}"
def read_csv_safe(csv_file):
"""อ่าน CSV อย่างปลอดภัย"""
encodings = ['utf-8', 'utf-8-sig', 'cp874', 'tis-620', 'iso-8859-1', 'cp1252']
separators = [',', ';', '\t', '|']
for encoding in encodings:
for sep in separators:
try:
df = pd.read_csv(csv_file, encoding=encoding, sep=sep, engine='python')
if len(df.columns) > 1 and len(df) > 0:
return df, None
except Exception:
continue
try:
df = pd.read_csv(csv_file)
return df, None
except Exception as e:
return None, str(e)
def process_pdf_csv(pdf_file, csv_file, filename_column, file_prefix, use_form_fields, progress=gr.Progress()):
"""ฟังก์ชันหลักสำหรับประมวลผล PDF และ CSV"""
if pdf_file is None or csv_file is None:
return None, "❌ กรุณาอัพโหลดไฟล์ PDF และ CSV"
try:
# อ่าน CSV
df, csv_error = read_csv_safe(csv_file)
if df is None:
return None, f"❌ ไม่สามารถอ่าน CSV ได้: {csv_error}"
# วิเคราะห์ PDF
pdf_fields = analyze_pdf_fields(pdf_file)
has_form_fields = bool(pdf_fields and "error" not in pdf_fields and pdf_fields)
# เก็บ PDF ที่สร้าง
generated_pdfs = {}
success_count = 0
error_count = 0
processing_log = []
# ประมวลผลแต่ละแถว
for index, (_, row) in enumerate(df.iterrows()):
progress((index + 1) / len(df), f"ประมวลผล {index + 1}/{len(df)}")
try:
# สร้างชื่อไฟล์
if filename_column and filename_column in df.columns and pd.notna(row[filename_column]):
safe_name = str(row[filename_column]).strip()
safe_name = "".join(c for c in safe_name if c.isalnum() or c in (' ', '-', '_')).strip()
filename = f"{file_prefix}_{safe_name}.pdf"
else:
filename = f"{file_prefix}_{index + 1:03d}.pdf"
filename = filename.replace(' ', ' ').replace(' ', '_')
if not filename.endswith('.pdf'):
filename += '.pdf'
# ประมวลผล
pdf_content, status = process_single_row(
pdf_file,
row,
filename,
use_form_fields and has_form_fields
)
if pdf_content is not None:
generated_pdfs[filename] = pdf_content
success_count += 1
processing_log.append(f"✅ {filename}: {status}")
else:
error_count += 1
processing_log.append(f"❌ {filename}: {status}")
except Exception as e:
error_count += 1
processing_log.append(f"💥 แถว {index + 1}: {str(e)}")
# สร้าง ZIP
if generated_pdfs:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for filename, pdf_content in generated_pdfs.items():
zip_file.writestr(filename, pdf_content)
zip_buffer.seek(0)
# สร้างชื่อไฟล์ ZIP
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = f"generated_pdfs_{timestamp}.zip"
# บันทึกไฟล์ชั่วคราว
temp_zip_path = os.path.join(tempfile.gettempdir(), zip_filename)
with open(temp_zip_path, 'wb') as f:
f.write(zip_buffer.getvalue())
result_message = f"✅ สร้าง PDF สำเร็จ {success_count} ไฟล์!"
if error_count > 0:
result_message += f"\n⚠️ มีข้อผิดพลาด {error_count} ไฟล์"
result_message += f"\n\n📋 รายละเอียด:\n" + "\n".join(processing_log[:10])
if len(processing_log) > 10:
result_message += f"\n... และอีก {len(processing_log) - 10} รายการ"
return temp_zip_path, result_message
else:
return None, "❌ ไม่สามารถสร้าง PDF ได้เลย"
except Exception as e:
return None, f"❌ เกิดข้อผิดพลาด: {str(e)}\n{traceback.format_exc()}"
def analyze_pdf_info(pdf_file):
"""วิเคราะห์ข้อมูล PDF"""
if pdf_file is None:
return "ไม่มีไฟล์ PDF"
try:
reader = PdfReader(pdf_file)
info = f"📄 **ข้อมูล PDF:**\n"
info += f"- จำนวนหน้า: {len(reader.pages)}\n"
# ตรวจสอบฟิลด์
pdf_fields = analyze_pdf_fields(pdf_file)
if pdf_fields and "error" not in pdf_fields and pdf_fields:
info += f"- จำนวน Form Fields: {len(pdf_fields)}\n"
info += f"\n🏷️ **รายชื่อ Fields:**\n"
for name, details in list(pdf_fields.items())[:10]: # แสดงแค่ 10 ตัวแรก
info += f" - {name} ({details.get('type', 'Unknown')})\n"
if len(pdf_fields) > 10:
info += f" - ... และอีก {len(pdf_fields) - 10} fields\n"
else:
info += "- Form Fields: ไม่พบหรือไม่สามารถอ่านได้\n"
info += "- หมายเหตุ: จะสร้าง PDF ใหม่แทน\n"
return info
except Exception as e:
return f"❌ ไม่สามารถวิเคราะห์ PDF ได้: {str(e)}"
def analyze_csv_info(csv_file):
"""วิเคราะห์ข้อมูล CSV"""
if csv_file is None:
return "ไม่มีไฟล์ CSV"
try:
df, error = read_csv_safe(csv_file)
if df is None:
return f"❌ ไม่สามารถอ่าน CSV ได้: {error}"
info = f"📋 **ข้อมูล CSV:**\n"
info += f"- จำนวนแถว: {len(df)}\n"
info += f"- จำนวนคอลัมน์: {len(df.columns)}\n"
info += f"\n📝 **รายชื่อคอลัมน์:**\n"
for col in df.columns[:15]: # แสดงแค่ 15 คอลัมน์แรก
info += f" - {col}\n"
if len(df.columns) > 15:
info += f" - ... และอีก {len(df.columns) - 15} คอลัมน์\n"
# ตรวจสอบข้อมูลที่ขาด
missing_data = df.isnull().sum()
if missing_data.any():
missing_cols = missing_data[missing_data > 0]
if len(missing_cols) > 0:
info += f"\n⚠️ **ข้อมูลที่ขาดหาย:**\n"
for col, count in missing_cols.head(5).items():
info += f" - {col}: {count} แถว\n"
return info
except Exception as e:
return f"❌ ไม่สามารถวิเคราะห์ CSV ได้: {str(e)}"
# สร้าง Gradio Interface
def create_interface():
with gr.Blocks(title="PDF Form Filler", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# 📄 เครื่องมือเติมข้อมูล PDF จาก CSV
**เครื่องมือนี้สามารถ:**
- เติมข้อมูลลงในฟอร์ม PDF ที่มี form fields
- สร้าง PDF ใหม่หากไม่มี form fields หรือเติมไม่ได้
- รองรับ CSV หลาย encoding (UTF-8, TIS-620, CP874, etc.)
- ส่งออกเป็นไฟล์ ZIP
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 📁 อัพโหลดไฟล์")
pdf_file = gr.File(
label="PDF Template",
file_types=[".pdf"],
type="filepath"
)
csv_file = gr.File(
label="CSV Data",
file_types=[".csv"],
type="filepath"
)
gr.Markdown("## ⚙️ ตั้งค่า")
filename_column = gr.Textbox(
label="คอลัมน์สำหรับชื่อไฟล์ (ถ้ามี)",
placeholder="เช่น name, id, etc.",
value=""
)
file_prefix = gr.Textbox(
label="คำนำหน้าชื่อไฟล์",
value="document"
)
use_form_fields = gr.Checkbox(
label="ใช้ Form Fields (ถ้าพบ)",
value=True
)
process_btn = gr.Button(
"🚀 สร้าง PDF ทั้งหมด",
variant="primary",
size="lg"
)
with gr.Column(scale=2):
gr.Markdown("## 📊 ข้อมูลไฟล์")
pdf_info = gr.Markdown("ยังไม่มีไฟล์ PDF")
csv_info = gr.Markdown("ยังไม่มีไฟล์ CSV")
gr.Markdown("## 📥 ผลลัพธ์")
result_file = gr.File(
label="ไฟล์ ZIP ที่สร้าง",
visible=False
)
result_message = gr.Markdown("")
# Event handlers
pdf_file.change(
fn=analyze_pdf_info,
inputs=[pdf_file],
outputs=[pdf_info]
)
csv_file.change(
fn=analyze_csv_info,
inputs=[csv_file],
outputs=[csv_info]
)
process_btn.click(
fn=process_pdf_csv,
inputs=[
pdf_file,
csv_file,
filename_column,
file_prefix,
use_form_fields
],
outputs=[result_file, result_message]
).then(
fn=lambda x: gr.update(visible=x is not None),
inputs=[result_file],
outputs=[result_file]
)
return app
# รันแอป
if __name__ == "__main__":
app = create_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True, # สร้าง public URL
debug=True
)