import gradio as gr import pandas as pd import io import zipfile from datetime import datetime import traceback import tempfile import os # ติดตั้ง dependencies ที่จำเป็น try: from PyPDF2 import PdfReader, PdfWriter from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont except ImportError as e: print(f"กำลังติดตั้ง dependencies: {e}") import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "PyPDF2", "reportlab", "pandas"]) from PyPDF2 import PdfReader, PdfWriter from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter def analyze_pdf_fields(pdf_path): """วิเคราะห์ฟิลด์ใน PDF""" try: reader = PdfReader(pdf_path) all_fields = {} # ตรวจสอบจาก AcroForm if reader.trailer.get("/Root") and reader.trailer["/Root"].get("/AcroForm"): acro_form = reader.trailer["/Root"]["/AcroForm"] if "/Fields" in acro_form: fields = acro_form["/Fields"] for field in fields: field_obj = field.get_object() if "/T" in field_obj: field_name = str(field_obj["/T"]).strip("()") field_type = str(field_obj.get("/FT", "Unknown")) all_fields[field_name] = { 'type': field_type, 'method': 'AcroForm' } # ตรวจสอบจาก Annotations for page_num, page in enumerate(reader.pages): if "/Annots" in page: try: annotations = page["/Annots"] for annotation in annotations: annot_obj = annotation.get_object() if annot_obj.get("/Subtype") == "/Widget": if "/T" in annot_obj: field_name = str(annot_obj["/T"]).strip("()") field_type = str(annot_obj.get("/FT", "Widget")) all_fields[field_name] = { 'type': field_type, 'page': page_num + 1, 'method': 'Annotation' } except Exception: continue return all_fields except Exception as e: return {"error": str(e)} def fill_pdf_form(pdf_path, field_data): """เติมข้อมูลในฟอร์ม PDF""" try: reader = PdfReader(pdf_path) writer = PdfWriter() # คัดลอกหน้าทั้งหมด for page in reader.pages: writer.add_page(page) # เติมข้อมูลในฟอร์ม if hasattr(writer, 'update_page_form_field_values'): for page_num, page in enumerate(writer.pages): try: writer.update_page_form_field_values(page, field_data) except Exception: pass # ลองวิธีอื่น elif "/AcroForm" in reader.trailer.get("/Root", {}): try: acro_form = reader.trailer["/Root"]["/AcroForm"] if "/Fields" in acro_form: fields = acro_form["/Fields"] for field in fields: field_obj = field.get_object() if "/T" in field_obj: field_name = str(field_obj["/T"]).strip("()") if field_name in field_data: try: field_obj.update({"/V": field_data[field_name]}) except Exception: pass except Exception: pass return writer except Exception as e: raise Exception(f"ไม่สามารถเติมฟอร์มได้: {str(e)}") def create_simple_pdf(data_row, filename): """สร้าง PDF ใหม่แบบง่าย""" buffer = io.BytesIO() p = canvas.Canvas(buffer, pagesize=letter) width, height = letter # ตั้งค่า font p.setFont("Helvetica", 12) # หัวเรื่อง p.setFont("Helvetica-Bold", 16) title = f"Document: {filename.replace('.pdf', '')}" p.drawString(50, height - 50, title) p.line(50, height - 60, 550, height - 60) # เนื้อหา y_position = height - 100 p.setFont("Helvetica", 12) for column, value in data_row.items(): if pd.notna(value) and str(value).strip(): clean_column = str(column).strip() clean_value = str(value).strip() if len(clean_value) > 80: clean_value = clean_value[:77] + "..." text = f"{clean_column}: {clean_value}" try: p.drawString(50, y_position, text) except: safe_text = text.encode('ascii', errors='ignore').decode('ascii') p.drawString(50, y_position, safe_text) y_position -= 25 if y_position < 50: p.showPage() p.setFont("Helvetica", 12) y_position = height - 50 # เวลาที่สร้าง p.setFont("Helvetica", 8) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") p.drawString(50, 30, f"Created: {timestamp}") p.save() buffer.seek(0) return buffer.getvalue() def process_single_row(pdf_path, row_data, filename, use_form=True): """ประมวลผลแถวเดียว""" try: # เตรียมข้อมูลฟิลด์ field_data = {} for column, value in row_data.items(): if pd.notna(value) and str(value).strip(): clean_value = str(value).strip() clean_column = str(column).strip() # ลองหลายรูปแบบของชื่อฟิลด์ field_variations = [ clean_column, clean_column.lower(), clean_column.upper(), clean_column.replace('_', ' '), clean_column.replace(' ', '_'), clean_column.replace('-', '_'), clean_column.replace('_', '') ] for variation in field_variations: field_data[variation] = clean_value if use_form: try: # ลองเติมฟอร์ม writer = fill_pdf_form(pdf_path, field_data) output_buffer = io.BytesIO() writer.write(output_buffer) output_buffer.seek(0) return output_buffer.getvalue(), "form_filled" except Exception as e: # ถ้าไม่ได้ ให้สร้างใหม่ pdf_content = create_simple_pdf(row_data, filename) return pdf_content, f"new_pdf_created: {str(e)}" else: # สร้าง PDF ใหม่ pdf_content = create_simple_pdf(row_data, filename) return pdf_content, "new_pdf_created" except Exception as e: return None, f"error: {str(e)}" def read_csv_safe(csv_file): """อ่าน CSV อย่างปลอดภัย""" encodings = ['utf-8', 'utf-8-sig', 'cp874', 'tis-620', 'iso-8859-1', 'cp1252'] separators = [',', ';', '\t', '|'] for encoding in encodings: for sep in separators: try: df = pd.read_csv(csv_file, encoding=encoding, sep=sep, engine='python') if len(df.columns) > 1 and len(df) > 0: return df, None except Exception: continue try: df = pd.read_csv(csv_file) return df, None except Exception as e: return None, str(e) def process_pdf_csv(pdf_file, csv_file, filename_column, file_prefix, use_form_fields, progress=gr.Progress()): """ฟังก์ชันหลักสำหรับประมวลผล PDF และ CSV""" if pdf_file is None or csv_file is None: return None, "❌ กรุณาอัพโหลดไฟล์ PDF และ CSV" try: # อ่าน CSV df, csv_error = read_csv_safe(csv_file) if df is None: return None, f"❌ ไม่สามารถอ่าน CSV ได้: {csv_error}" # วิเคราะห์ PDF pdf_fields = analyze_pdf_fields(pdf_file) has_form_fields = bool(pdf_fields and "error" not in pdf_fields and pdf_fields) # เก็บ PDF ที่สร้าง generated_pdfs = {} success_count = 0 error_count = 0 processing_log = [] # ประมวลผลแต่ละแถว for index, (_, row) in enumerate(df.iterrows()): progress((index + 1) / len(df), f"ประมวลผล {index + 1}/{len(df)}") try: # สร้างชื่อไฟล์ if filename_column and filename_column in df.columns and pd.notna(row[filename_column]): safe_name = str(row[filename_column]).strip() safe_name = "".join(c for c in safe_name if c.isalnum() or c in (' ', '-', '_')).strip() filename = f"{file_prefix}_{safe_name}.pdf" else: filename = f"{file_prefix}_{index + 1:03d}.pdf" filename = filename.replace(' ', ' ').replace(' ', '_') if not filename.endswith('.pdf'): filename += '.pdf' # ประมวลผล pdf_content, status = process_single_row( pdf_file, row, filename, use_form_fields and has_form_fields ) if pdf_content is not None: generated_pdfs[filename] = pdf_content success_count += 1 processing_log.append(f"✅ {filename}: {status}") else: error_count += 1 processing_log.append(f"❌ {filename}: {status}") except Exception as e: error_count += 1 processing_log.append(f"💥 แถว {index + 1}: {str(e)}") # สร้าง ZIP if generated_pdfs: zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: for filename, pdf_content in generated_pdfs.items(): zip_file.writestr(filename, pdf_content) zip_buffer.seek(0) # สร้างชื่อไฟล์ ZIP timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") zip_filename = f"generated_pdfs_{timestamp}.zip" # บันทึกไฟล์ชั่วคราว temp_zip_path = os.path.join(tempfile.gettempdir(), zip_filename) with open(temp_zip_path, 'wb') as f: f.write(zip_buffer.getvalue()) result_message = f"✅ สร้าง PDF สำเร็จ {success_count} ไฟล์!" if error_count > 0: result_message += f"\n⚠️ มีข้อผิดพลาด {error_count} ไฟล์" result_message += f"\n\n📋 รายละเอียด:\n" + "\n".join(processing_log[:10]) if len(processing_log) > 10: result_message += f"\n... และอีก {len(processing_log) - 10} รายการ" return temp_zip_path, result_message else: return None, "❌ ไม่สามารถสร้าง PDF ได้เลย" except Exception as e: return None, f"❌ เกิดข้อผิดพลาด: {str(e)}\n{traceback.format_exc()}" def analyze_pdf_info(pdf_file): """วิเคราะห์ข้อมูล PDF""" if pdf_file is None: return "ไม่มีไฟล์ PDF" try: reader = PdfReader(pdf_file) info = f"📄 **ข้อมูล PDF:**\n" info += f"- จำนวนหน้า: {len(reader.pages)}\n" # ตรวจสอบฟิลด์ pdf_fields = analyze_pdf_fields(pdf_file) if pdf_fields and "error" not in pdf_fields and pdf_fields: info += f"- จำนวน Form Fields: {len(pdf_fields)}\n" info += f"\n🏷️ **รายชื่อ Fields:**\n" for name, details in list(pdf_fields.items())[:10]: # แสดงแค่ 10 ตัวแรก info += f" - {name} ({details.get('type', 'Unknown')})\n" if len(pdf_fields) > 10: info += f" - ... และอีก {len(pdf_fields) - 10} fields\n" else: info += "- Form Fields: ไม่พบหรือไม่สามารถอ่านได้\n" info += "- หมายเหตุ: จะสร้าง PDF ใหม่แทน\n" return info except Exception as e: return f"❌ ไม่สามารถวิเคราะห์ PDF ได้: {str(e)}" def analyze_csv_info(csv_file): """วิเคราะห์ข้อมูล CSV""" if csv_file is None: return "ไม่มีไฟล์ CSV" try: df, error = read_csv_safe(csv_file) if df is None: return f"❌ ไม่สามารถอ่าน CSV ได้: {error}" info = f"📋 **ข้อมูล CSV:**\n" info += f"- จำนวนแถว: {len(df)}\n" info += f"- จำนวนคอลัมน์: {len(df.columns)}\n" info += f"\n📝 **รายชื่อคอลัมน์:**\n" for col in df.columns[:15]: # แสดงแค่ 15 คอลัมน์แรก info += f" - {col}\n" if len(df.columns) > 15: info += f" - ... และอีก {len(df.columns) - 15} คอลัมน์\n" # ตรวจสอบข้อมูลที่ขาด missing_data = df.isnull().sum() if missing_data.any(): missing_cols = missing_data[missing_data > 0] if len(missing_cols) > 0: info += f"\n⚠️ **ข้อมูลที่ขาดหาย:**\n" for col, count in missing_cols.head(5).items(): info += f" - {col}: {count} แถว\n" return info except Exception as e: return f"❌ ไม่สามารถวิเคราะห์ CSV ได้: {str(e)}" # สร้าง Gradio Interface def create_interface(): with gr.Blocks(title="PDF Form Filler", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 📄 เครื่องมือเติมข้อมูล PDF จาก CSV **เครื่องมือนี้สามารถ:** - เติมข้อมูลลงในฟอร์ม PDF ที่มี form fields - สร้าง PDF ใหม่หากไม่มี form fields หรือเติมไม่ได้ - รองรับ CSV หลาย encoding (UTF-8, TIS-620, CP874, etc.) - ส่งออกเป็นไฟล์ ZIP """) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 📁 อัพโหลดไฟล์") pdf_file = gr.File( label="PDF Template", file_types=[".pdf"], type="filepath" ) csv_file = gr.File( label="CSV Data", file_types=[".csv"], type="filepath" ) gr.Markdown("## ⚙️ ตั้งค่า") filename_column = gr.Textbox( label="คอลัมน์สำหรับชื่อไฟล์ (ถ้ามี)", placeholder="เช่น name, id, etc.", value="" ) file_prefix = gr.Textbox( label="คำนำหน้าชื่อไฟล์", value="document" ) use_form_fields = gr.Checkbox( label="ใช้ Form Fields (ถ้าพบ)", value=True ) process_btn = gr.Button( "🚀 สร้าง PDF ทั้งหมด", variant="primary", size="lg" ) with gr.Column(scale=2): gr.Markdown("## 📊 ข้อมูลไฟล์") pdf_info = gr.Markdown("ยังไม่มีไฟล์ PDF") csv_info = gr.Markdown("ยังไม่มีไฟล์ CSV") gr.Markdown("## 📥 ผลลัพธ์") result_file = gr.File( label="ไฟล์ ZIP ที่สร้าง", visible=False ) result_message = gr.Markdown("") # Event handlers pdf_file.change( fn=analyze_pdf_info, inputs=[pdf_file], outputs=[pdf_info] ) csv_file.change( fn=analyze_csv_info, inputs=[csv_file], outputs=[csv_info] ) process_btn.click( fn=process_pdf_csv, inputs=[ pdf_file, csv_file, filename_column, file_prefix, use_form_fields ], outputs=[result_file, result_message] ).then( fn=lambda x: gr.update(visible=x is not None), inputs=[result_file], outputs=[result_file] ) return app # รันแอป if __name__ == "__main__": app = create_interface() app.launch( server_name="0.0.0.0", server_port=7860, share=True, # สร้าง public URL debug=True )