protae5544's picture
Update app.py
71d1a5b verified
raw
history blame
18 kB
import gradio as gr
import sqlite3
import json
import platform
from huggingface_hub import hf_hub_download, upload_file
import os
from pathlib import Path
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import cm
from reportlab.pdfgen import canvas
from reportlab.pdfbase import pdfmetrics
from reportlab.pdfbase.ttfonts import TTFont
import qrcode
import zipfile
import io
from dotenv import load_dotenv
import time
import shutil
# อ่าน .env
load_dotenv()
REPO_ID = os.getenv("REPO_ID", "protae5544/WorkerManagement")
ORIGINAL_REPO_ID = os.getenv("ORIGINAL_REPO_ID", None)
# ตั้งค่าฟอนต์
try:
if platform.system() == "Linux":
font_path = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf"
if os.path.exists(font_path):
pdfmetrics.registerFont(TTFont('THSarabunBold', font_path))
else:
# ใช้ฟอนต์ default
print("ใช้ฟอนต์มาตรฐานของ ReportLab เนื่องจากไม่พบ DejaVuSans-Bold")
else:
# ใช้ฟอนต์สำหรับ Windows/Mac
pdfmetrics.registerFont(TTFont('THSarabunBold', 'THSarabunNew-Bold.ttf'))
except Exception as e:
print(f"ใช้ฟอนต์ default เนื่องจากเกิดข้อผิดพลาด: {e}")
# ฟังก์ชันแปลงวันที่เป็นภาษาไทย
def thai_date_time(timestamp):
months = {
1: "มกราคม", 2: "กุมภาพันธ์", 3: "มีนาคม", 4: "เมษายน",
5: "พฤษภาคม", 6: "มิถุนายน", 7: "กรกฎาคม", 8: "สิงหาคม",
9: "กันยายน", 10: "ตุลาคม", 11: "พฤศจิกายน", 12: "ธันวาคม"
}
dt = time.localtime(timestamp)
day = dt.tm_mday
month = months[dt.tm_mon]
year = dt.tm_year + 543
hour = dt.tm_hour
minute = dt.tm_min
period = "น"
return f"{day:02d} {month} {year} {hour:02d}:{minute:02d} {period}"
# เริ่มต้นฐานข้อมูล
def init_db():
conn = sqlite3.connect("workers.db")
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS workers (
request_number TEXT PRIMARY KEY,
english_name TEXT,
foreign_reference_number TEXT,
id_number TEXT,
nationality TEXT,
receipt_number TEXT,
payment_number TEXT
)''')
c.execute('''CREATE TABLE IF NOT EXISTS attachments (
request_number TEXT,
file_url TEXT,
FOREIGN KEY(request_number) REFERENCES workers(request_number)
)''')
c.execute('''CREATE TABLE IF NOT EXISTS pdfs (
request_number TEXT,
pdf_url TEXT,
FOREIGN KEY(request_number) REFERENCES workers(request_number)
)''')
conn.commit()
conn.close()
# นำเข้า JSON
def import_json_to_db():
try:
json_path = hf_hub_download(repo_id=REPO_ID, filename="worker-database.json", repo_type="space")
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
conn = sqlite3.connect("workers.db")
c = conn.cursor()
for record in data:
c.execute('''INSERT OR IGNORE INTO workers (
request_number, english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number
) VALUES (?, ?, ?, ?, ?, ?, ?)''', (
record['เลขคำขอ'],
record['ชื่อภาษาอังกฤษ'],
record['หมายเลขอ้างอิงคนต่างด้าว'],
record['หมายเลขประจำตัว'],
record['สัญชาติ'],
record['เลขที่บนขวาใบเสร็จ'],
record['หมายเลขชำระเงิน']
))
conn.commit()
conn.close()
except Exception as e:
print(f"ข้อผิดพลาดในการนำเข้า JSON: {e}")
# สำรองฐานข้อมูล
def backup_db():
try:
upload_file(
path_or_fileobj="workers.db",
path_in_repo="workers.db",
repo_id=REPO_ID,
repo_type="space"
)
except Exception as e:
print(f"ข้อผิดพลาดในการสำรองฐานข้อมูล: {e}")
# ดึงเลขคำขอถัดไป
def get_next_request_number():
conn = sqlite3.connect("workers.db")
c = conn.cursor()
c.execute("SELECT request_number FROM workers")
request_numbers = c.fetchall()
conn.close()
if not request_numbers:
return "WP-68-366-150"
max_num = max([int(num[0].split('-')[-1]) for num in request_numbers])
return f"WP-68-366-{max_num + 1:03d}"
# สร้าง PDF ด้วยภาพพื้นหลัง
def create_receipt_pdf(request_number, english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number):
# สร้างโฟลเดอร์ pdfs ถ้ายังไม่มี
os.makedirs("pdfs", exist_ok=True)
filename = f"pdfs/receipt_{request_number}.pdf"
local_path = f"temp_{request_number}.pdf"
try:
c = canvas.Canvas(local_path, pagesize=A4)
# เพิ่มภาพพื้นหลัง
try:
if ORIGINAL_REPO_ID:
bg_path = hf_hub_download(repo_id=ORIGINAL_REPO_ID, filename="assets/bg.png", repo_type="space")
else:
bg_path = hf_hub_download(repo_id=REPO_ID, filename="assets/bg.png", repo_type="space")
c.drawImage(bg_path, 2.05*cm, A4[1] - (5.12*cm + 23.45*cm), width=17.62*cm, height=23.45*cm, preserveAspectRatio=True)
except Exception as e:
print(f"ไม่พบภาพพื้นหลัง: {e}")
# ข้อมูลใบเสร็จ
c.setFont("THSarabunBold", 18) # 52.284/2.83465 ≈ 18.45
c.drawString(2*cm, 28*cm, "กระทรวงแรงงาน")
c.setFont("THSarabun", 12) # 35.212/2.83465 ≈ 12.42
c.drawString(2*cm, 27.5*cm, "ใบเสร็จรับเงิน (ต้นฉบับ)")
c.drawString(2*cm, 26*cm, f"เลขที่: {receipt_number}")
c.drawString(2*cm, 25.5*cm, "ที่ทำการ: สำนักบริหารแรงงานต่างด้าว")
c.drawString(2*cm, 25*cm, "วันที่: 01 เมษายน 2568")
c.drawString(2*cm, 24.5*cm, f"เลขที่ใบชำระเงิน: {payment_number}")
c.drawString(2*cm, 24*cm, f"เลขรับคำขอที่: {request_number}")
c.drawString(2*cm, 23.5*cm, f"ชื่อผู้ชำระเงิน: {english_name}")
c.drawString(2*cm, 23*cm, f"สัญชาติ: {nationality}")
c.drawString(2*cm, 22.5*cm, f"เลขอ้างอิงคนต่างด้าว: {foreign_reference_number}")
c.drawString(2*cm, 22*cm, f"หมายเลขประจำตัวคนต่างด้าว: {id_number}")
c.drawString(2*cm, 21.5*cm, "ชื่อนายจ้าง / สถานประกอบการ: บริษัท บาน กง เอ็นจิเนียริ่ง จำกัด")
c.drawString(2*cm, 21*cm, "เลขประจำตัวนายจ้าง: 0415567000061")
c.setFont("THSarabunBold", 12)
c.drawString(2*cm, 20*cm, "รายการ")
c.setFont("THSarabun", 12)
c.drawString(2*cm, 19.5*cm, "1. ค่าธรรมเนียมในการยื่นคำขอ ฉบับละ 100 บาท: 100.00")
c.drawString(2*cm, 19*cm, "2. ค่าธรรมเนียม: 900.00")
c.drawString(2*cm, 18.5*cm, "รวมเป็นเงินทั้งสิ้น (บาท): (หนึ่งพันบาทถ้วน) 1,000.00")
c.drawString(2*cm, 18*cm, "ได้รับเงินไว้เป็นการถูกต้องแล้ว")
c.drawString(2*cm, 17.5*cm, "(ลงชื่อ) นางสาวอารีวรรณ โพธิ์นิ่มแดง (ผู้รับเงิน)")
c.drawString(2*cm, 17*cm, "ตำแหน่ง: นักวิชาการแรงงานชำนาญการ")
# QR Code
pdf_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/{filename}"
qr = qrcode.make(pdf_url)
qr_img_path = f"temp_qr_{request_number}.png"
qr.save(qr_img_path)
c.drawImage(qr_img_path, 15*cm, 15*cm, width=3*cm, height=3*cm)
# เวลาพิมพ์
c.setFont("THSarabun", 7)
print_time = thai_date_time(time.time())
c.drawString(2*cm, 2*cm, f"พิมพ์เมื่อ: {print_time}")
c.showPage()
c.save()
# อัพโหลด PDF
try:
upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=REPO_ID,
repo_type="space"
)
except Exception as e:
print(f"อัพโหลด PDF ไปยัง Space หลักไม่สำเร็จ: {e}")
# อัพโหลดไปยัง Space เดิม
if ORIGINAL_REPO_ID:
try:
upload_file(
path_or_fileobj=local_path,
path_in_repo=filename,
repo_id=ORIGINAL_REPO_ID,
repo_type="space"
)
except Exception as e:
print(f"อัพโหลด PDF ไปยัง Space เดิมไม่สำเร็จ: {e}")
# บันทึกข้อมูล PDF
conn = sqlite3.connect("workers.db")
c = conn.cursor()
c.execute("INSERT OR REPLACE INTO pdfs (request_number, pdf_url) VALUES (?, ?)",
(request_number, pdf_url))
conn.commit()
conn.close()
return filename
finally:
# ลบไฟล์ชั่วคราว
if os.path.exists(local_path):
os.remove(local_path)
if os.path.exists(qr_img_path):
os.remove(qr_img_path)
# ตรวจสอบข้อมูล
def validate_input(english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number):
if not id_number or not id_number.isdigit() or len(id_number) != 13:
return "หมายเลขประจำตัวต้องเป็นตัวเลข 13 หลัก"
if not receipt_number or not payment_number:
return "กรุณากรอกเลขที่ใบเสร็จและหมายเลขชำระเงิน"
return None
# เพิ่มข้อมูลพนักงาน
def add_worker(english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number, attachments):
error = validate_input(english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number)
if error:
return error
request_number = get_next_request_number()
# บันทึกข้อมูล
conn = sqlite3.connect("workers.db")
c = conn.cursor()
c.execute('''INSERT INTO workers (
request_number, english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number
) VALUES (?, ?, ?, ?, ?, ?, ?)''', (
request_number, english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number
))
# จัดการไฟล์แนบ
attachment_urls = []
os.makedirs("attachments", exist_ok=True)
for i, file in enumerate(attachments or []):
try:
filename = f"attachments/{request_number}_{i}_{os.path.basename(file.name)}"
repo_path = f"attachments/{os.path.basename(file.name)}"
# บันทึกไฟล์ชั่วคราว
temp_path = f"temp_{request_number}_{i}_{os.path.basename(file.name)}"
with open(temp_path, "wb") as f:
if hasattr(file, "read"):
f.write(file.read())
else:
with open(file.name, "rb") as src:
f.write(src.read())
# อัพโหลดไฟล์
upload_file(
path_or_fileobj=temp_path,
path_in_repo=filename,
repo_id=REPO_ID,
repo_type="space"
)
url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/{filename}"
c.execute("INSERT INTO attachments (request_number, file_url) VALUES (?, ?)", (request_number, url))
attachment_urls.append(url)
# ลบไฟล์ชั่วคราว
os.remove(temp_path)
except Exception as e:
print(f"อัพโหลดไฟล์แนบไม่สำเร็จ: {e}")
conn.commit()
conn.close()
# สร้าง PDF
pdf_path = create_receipt_pdf(request_number, english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number)
# สำรองฐานข้อมูล
backup_db()
return f"เพิ่มข้อมูลพนักงานสำเร็จ! เลขคำขอ: {request_number}\nPDF: https://huggingface.co/spaces/{REPO_ID}/resolve/main/{pdf_path}"
# ดาวน์โหลด PDF ทั้งหมด
def download_all_pdfs():
conn = sqlite3.connect("workers.db")
c = conn.cursor()
c.execute("SELECT request_number, pdf_url FROM pdfs")
pdfs = c.fetchall()
conn.close()
os.makedirs("downloads", exist_ok=True)
zip_path = "downloads/all_receipts.zip"
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for request_number, pdf_url in pdfs:
try:
# ดาวน์โหลด PDF จาก URL
filename = f"receipt_{request_number}.pdf"
local_path = f"downloads/{filename}"
# ดาวน์โหลดไฟล์
downloaded_path = hf_hub_download(
repo_id=REPO_ID,
filename=f"pdfs/receipt_{request_number}.pdf",
repo_type="space",
local_path=local_path
)
# เพิ่มไฟล์ลง ZIP
zip_file.write(downloaded_path, filename)
except Exception as e:
print(f"ไม่สามารถเพิ่ม PDF {request_number}: {e}")
# อัพโหลด ZIP
try:
upload_file(
path_or_fileobj=zip_path,
path_in_repo="all_receipts.zip",
repo_id=REPO_ID,
repo_type="space"
)
except Exception as e:
print(f"อัพโหลด ZIP ไม่สำเร็จ: {e}")
return f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/all_receipts.zip"
# เริ่มต้นฐานข้อมูลและนำเข้า JSON
init_db()
import_json_to_db()
# อินเทอร์เฟซ Gradio
with gr.Blocks(title="ระบบจัดการข้อมูลพนักงาน", theme=gr.themes.Soft()) as demo:
gr.Markdown("## เพิ่มข้อมูลพนักงานใหม่")
with gr.Row():
english_name = gr.Textbox(label="ชื่อภาษาอังกฤษ", placeholder="เช่น MR. AUNG KYAW")
foreign_reference_number = gr.Textbox(label="หมายเลขอ้างอิงคนต่างด้าว", placeholder="เช่น 2492102076212")
with gr.Row():
id_number = gr.Textbox(label="หมายเลขประจำตัว (13 หลัก)", placeholder="เช่น 6685490000472")
nationality = gr.Textbox(label="สัญชาติ", value="เมียนมา")
with gr.Row():
receipt_number = gr.Textbox(label="เลขที่บนขวาใบเสร็จ", placeholder="เช่น 2100680001130")
payment_number = gr.Textbox(label="หมายเลขชำระเงิน", placeholder="เช่น IV680210/002350")
attachments = gr.File(label="แนบไฟล์ (สูงสุด 5MB)", file_count="multiple", file_types=["image", ".pdf"], max_size=5*1024*1024)
submit = gr.Button("บันทึกข้อมูล", variant="primary")
output = gr.Textbox(label="ผลลัพธ์")
gr.Markdown("## ดาวน์โหลดเอกสาร")
download_all = gr.Button("ดาวน์โหลด PDF ทั้งหมด")
download_output = gr.Textbox(label="ลิงก์ดาวน์โหลด")
submit.click(
fn=add_worker,
inputs=[english_name, foreign_reference_number, id_number, nationality, receipt_number, payment_number, attachments],
outputs=output
)
download_all.click(
fn=download_all_pdfs,
outputs=download_output
)
# เริ่มต้นแอปพลิเคชัน
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=int(os.getenv("PORT", 7860))