Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query | |
from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse | |
from typing import Dict, List, Any, Optional | |
import asyncio | |
import logging | |
import threading | |
import concurrent.futures | |
from openai import OpenAI | |
import fitz # PyMuPDF | |
import tempfile | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.styles import getSampleStyleSheet | |
import io | |
import docx2txt | |
# ๋ก๊น ์ค์ | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
BASE = pathlib.Path(__file__).parent | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory=BASE), name="static") | |
# PDF ๋๋ ํ ๋ฆฌ ์ค์ | |
PDF_DIR = BASE / "pdf" | |
if not PDF_DIR.exists(): | |
PDF_DIR.mkdir(parents=True) | |
# ์๊ตฌ PDF ๋๋ ํ ๋ฆฌ ์ค์ (Hugging Face ์๊ตฌ ๋์คํฌ) | |
PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs" | |
if not PERMANENT_PDF_DIR.exists(): | |
PERMANENT_PDF_DIR.mkdir(parents=True) | |
# ์บ์ ๋๋ ํ ๋ฆฌ ์ค์ | |
CACHE_DIR = BASE / "cache" | |
if not CACHE_DIR.exists(): | |
CACHE_DIR.mkdir(parents=True) | |
# PDF ๋ฉํ๋ฐ์ดํฐ ๋๋ ํ ๋ฆฌ ๋ฐ ํ์ผ ์ค์ | |
METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata" | |
if not METADATA_DIR.exists(): | |
METADATA_DIR.mkdir(parents=True) | |
PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json" | |
# ์๋ฒ ๋ฉ ์บ์ ๋๋ ํ ๋ฆฌ ์ค์ | |
EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings" | |
if not EMBEDDING_DIR.exists(): | |
EMBEDDING_DIR.mkdir(parents=True) | |
# ๊ด๋ฆฌ์ ๋น๋ฐ๋ฒํธ | |
ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # ํ๊ฒฝ ๋ณ์์์ ๊ฐ์ ธ์ค๊ธฐ, ๊ธฐ๋ณธ๊ฐ์ ํ ์คํธ์ฉ | |
# OpenAI API ํค ์ค์ | |
OPENAI_API_KEY = os.getenv("LLM_API", "") | |
# API ํค๊ฐ ์๊ฑฐ๋ ๋น์ด์์ ๋ ํ๋๊ทธ ์ค์ | |
HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip()) | |
if HAS_VALID_API_KEY: | |
try: | |
openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0) | |
logger.info("OpenAI ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์ฑ๊ณต") | |
except Exception as e: | |
logger.error(f"OpenAI ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ์คํจ: {e}") | |
HAS_VALID_API_KEY = False | |
else: | |
logger.warning("์ ํจํ OpenAI API ํค๊ฐ ์์ต๋๋ค. AI ๊ธฐ๋ฅ์ด ์ ํ๋ฉ๋๋ค.") | |
openai_client = None | |
# ์ ์ญ ์บ์ ๊ฐ์ฒด | |
pdf_cache: Dict[str, Dict[str, Any]] = {} | |
# ์บ์ฑ ๋ฝ | |
cache_locks = {} | |
# PDF ๋ฉํ๋ฐ์ดํฐ (ID to ๊ฒฝ๋ก ๋งคํ) | |
pdf_metadata: Dict[str, str] = {} | |
# PDF ์๋ฒ ๋ฉ ์บ์ | |
pdf_embeddings: Dict[str, Dict[str, Any]] = {} | |
# PDF ๋ฉํ๋ฐ์ดํฐ ๋ก๋ | |
def load_pdf_metadata(): | |
global pdf_metadata | |
if PDF_METADATA_FILE.exists(): | |
try: | |
with open(PDF_METADATA_FILE, "r") as f: | |
pdf_metadata = json.load(f) | |
logger.info(f"PDF ๋ฉํ๋ฐ์ดํฐ ๋ก๋ ์๋ฃ: {len(pdf_metadata)} ํญ๋ชฉ") | |
except Exception as e: | |
logger.error(f"๋ฉํ๋ฐ์ดํฐ ๋ก๋ ์ค๋ฅ: {e}") | |
pdf_metadata = {} | |
else: | |
pdf_metadata = {} | |
# PDF ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | |
def save_pdf_metadata(): | |
try: | |
with open(PDF_METADATA_FILE, "w") as f: | |
json.dump(pdf_metadata, f) | |
except Exception as e: | |
logger.error(f"๋ฉํ๋ฐ์ดํฐ ์ ์ฅ ์ค๋ฅ: {e}") | |
# PDF ID ์์ฑ (ํ์ผ๋ช + ํ์์คํฌํ ๊ธฐ๋ฐ) - ๋ ๋จ์ํ๊ณ ์์ ํ ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝ | |
def generate_pdf_id(filename: str) -> str: | |
# ํ์ผ๋ช ์์ ํ์ฅ์ ์ ๊ฑฐ | |
base_name = os.path.splitext(filename)[0] | |
# ์์ ํ ๋ฌธ์์ด๋ก ๋ณํ (URL ์ธ์ฝ๋ฉ ๋์ ์ง์ ๋ณํ) | |
import re | |
safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_")) | |
# ํ์์คํฌํ ์ถ๊ฐ๋ก ๊ณ ์ ์ฑ ๋ณด์ฅ | |
timestamp = int(time.time()) | |
# ์งง์ ์์ ๋ฌธ์์ด ์ถ๊ฐ | |
random_suffix = uuid.uuid4().hex[:6] | |
return f"{safe_name}_{timestamp}_{random_suffix}" | |
# PDF ํ์ผ ๋ชฉ๋ก ๊ฐ์ ธ์ค๊ธฐ (๋ฉ์ธ ๋๋ ํ ๋ฆฌ์ฉ) | |
def get_pdf_files(): | |
pdf_files = [] | |
if PDF_DIR.exists(): | |
pdf_files = [f for f in PDF_DIR.glob("*.pdf")] | |
return pdf_files | |
# ์๊ตฌ ์ ์ฅ์์ PDF ํ์ผ ๋ชฉ๋ก ๊ฐ์ ธ์ค๊ธฐ | |
def get_permanent_pdf_files(): | |
pdf_files = [] | |
if PERMANENT_PDF_DIR.exists(): | |
pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")] | |
return pdf_files | |
# PDF ์ธ๋ค์ผ ์์ฑ ๋ฐ ํ๋ก์ ํธ ๋ฐ์ดํฐ ์ค๋น | |
def generate_pdf_projects(): | |
projects_data = [] | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์ ์๊ตฌ ์ ์ฅ์์ ํ์ผ๋ค ๊ฐ์ ธ์ค๊ธฐ | |
pdf_files = get_pdf_files() | |
permanent_pdf_files = get_permanent_pdf_files() | |
# ๋ชจ๋ ํ์ผ ํฉ์น๊ธฐ (ํ์ผ๋ช ๊ธฐ์ค์ผ๋ก ์ค๋ณต ์ ๊ฑฐ) | |
unique_files = {} | |
# ๋จผ์ ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์ ํ์ผ๋ค ์ถ๊ฐ | |
for file in pdf_files: | |
unique_files[file.name] = file | |
# ์๊ตฌ ์ ์ฅ์์ ํ์ผ๋ค ์ถ๊ฐ (๋์ผ ํ์ผ๋ช ์ด ์์ผ๋ฉด ์๊ตฌ ์ ์ฅ์ ํ์ผ ์ฐ์ ) | |
for file in permanent_pdf_files: | |
unique_files[file.name] = file | |
# ์ค๋ณต ์ ๊ฑฐ๋ ํ์ผ๋ค๋ก ํ๋ก์ ํธ ๋ฐ์ดํฐ ์์ฑ | |
for pdf_file in unique_files.values(): | |
# ํด๋น ํ์ผ์ PDF ID ์ฐพ๊ธฐ | |
pdf_id = None | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
pdf_id = pid | |
break | |
# ID๊ฐ ์์ผ๋ฉด ์๋ก ์์ฑํ๊ณ ๋ฉํ๋ฐ์ดํฐ์ ์ถ๊ฐ | |
if not pdf_id: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
projects_data.append({ | |
"path": str(pdf_file), | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
}) | |
return projects_data | |
# ์บ์ ํ์ผ ๊ฒฝ๋ก ์์ฑ | |
def get_cache_path(pdf_name: str): | |
return CACHE_DIR / f"{pdf_name}_cache.json" | |
# ์๋ฒ ๋ฉ ์บ์ ํ์ผ ๊ฒฝ๋ก ์์ฑ | |
def get_embedding_path(pdf_id: str): | |
return EMBEDDING_DIR / f"{pdf_id}_embedding.json" | |
# PDF ํ ์คํธ ์ถ์ถ ํจ์ | |
def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]: | |
try: | |
doc = fitz.open(pdf_path) | |
chunks = [] | |
for page_num in range(len(doc)): | |
page = doc[page_num] | |
text = page.get_text() | |
# ํ์ด์ง ํ ์คํธ๊ฐ ์๋ ๊ฒฝ์ฐ๋ง ์ถ๊ฐ | |
if text.strip(): | |
chunks.append({ | |
"page": page_num + 1, | |
"text": text, | |
"chunk_id": f"page_{page_num + 1}" | |
}) | |
return chunks | |
except Exception as e: | |
logger.error(f"PDF ํ ์คํธ ์ถ์ถ ์ค๋ฅ: {e}") | |
return [] | |
# PDF ID๋ก ์๋ฒ ๋ฉ ์์ฑ ๋๋ ๊ฐ์ ธ์ค๊ธฐ | |
async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]: | |
try: | |
# ์๋ฒ ๋ฉ ์บ์ ํ์ธ | |
embedding_path = get_embedding_path(pdf_id) | |
if embedding_path.exists(): | |
try: | |
with open(embedding_path, "r", encoding="utf-8") as f: | |
return json.load(f) | |
except Exception as e: | |
logger.error(f"์๋ฒ ๋ฉ ์บ์ ๋ก๋ ์ค๋ฅ: {e}") | |
# PDF ๊ฒฝ๋ก ์ฐพ๊ธฐ | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
raise ValueError(f"PDF ID {pdf_id}์ ํด๋นํ๋ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค") | |
# ํ ์คํธ ์ถ์ถ | |
chunks = extract_pdf_text(pdf_path) | |
if not chunks: | |
raise ValueError(f"PDF์์ ํ ์คํธ๋ฅผ ์ถ์ถํ ์ ์์ต๋๋ค: {pdf_path}") | |
# ์๋ฒ ๋ฉ ์ ์ฅ ๋ฐ ๋ฐํ | |
embedding_data = { | |
"pdf_id": pdf_id, | |
"pdf_path": pdf_path, | |
"chunks": chunks, | |
"created_at": time.time() | |
} | |
# ์๋ฒ ๋ฉ ์บ์ ์ ์ฅ | |
with open(embedding_path, "w", encoding="utf-8") as f: | |
json.dump(embedding_data, f, ensure_ascii=False) | |
return embedding_data | |
except Exception as e: | |
logger.error(f"PDF ์๋ฒ ๋ฉ ์์ฑ ์ค๋ฅ: {e}") | |
return {"error": str(e), "pdf_id": pdf_id} | |
# PDF ๋ด์ฉ ๊ธฐ๋ฐ ์ง์์๋ต | |
# PDF ๋ด์ฉ ๊ธฐ๋ฐ ์ง์์๋ต ํจ์ ๊ฐ์ | |
async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]: | |
try: | |
# API ํค๊ฐ ์๊ฑฐ๋ ์ ํจํ์ง ์์ ๊ฒฝ์ฐ | |
if not HAS_VALID_API_KEY or not openai_client: | |
return { | |
"error": "OpenAI API ํค๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค.", | |
"answer": "์ฃ์กํฉ๋๋ค. ํ์ฌ AI ๊ธฐ๋ฅ์ด ๋นํ์ฑํ๋์ด ์์ด ์ง๋ฌธ์ ๋ต๋ณํ ์ ์์ต๋๋ค. ์์คํ ๊ด๋ฆฌ์์๊ฒ ๋ฌธ์ํ์ธ์." | |
} | |
# ์๋ฒ ๋ฉ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ | |
embedding_data = await get_pdf_embedding(pdf_id) | |
if "error" in embedding_data: | |
return {"error": embedding_data["error"]} | |
# ์ฒญํฌ ํ ์คํธ ๋ชจ์ผ๊ธฐ (์์๋ก ๊ฐ๋จํ๊ฒ ์ ์ฒด ํ ์คํธ ์ฌ์ฉ) | |
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) | |
# ์ปจํ ์คํธ ํฌ๊ธฐ๋ฅผ ๊ณ ๋ คํ์ฌ ํ ์คํธ๊ฐ ๋๋ฌด ๊ธธ๋ฉด ์๋ถ๋ถ๋ง ์ฌ์ฉ | |
max_context_length = 60000 # ํ ํฐ ์๊ฐ ์๋ ๋ฌธ์ ์ ๊ธฐ์ค (๋๋ต์ ์ธ ์ ํ) | |
if len(all_text) > max_context_length: | |
all_text = all_text[:max_context_length] + "...(์ดํ ์๋ต)" | |
# ์์คํ ํ๋กฌํํธ ์ค๋น | |
system_prompt = """ | |
The default language is set to English. However, please respond in the language used in the user's prompt (e.g., English, Korean, Japanese, Chinese, etc.). | |
You are an assistant that answers questions based solely on the provided PDF context. Please use only the information from the provided PDF content to respond. If relevant information is not available in the context, honestly reply with, "The requested information could not be found in the provided PDF." | |
Please ensure your responses are clear and concise, citing relevant page numbers. Always respond politely and courteously. | |
""" | |
# gpt-4.1-mini ๋ชจ๋ธ ์ฌ์ฉ | |
try: | |
# ํ์์์ ๋ฐ ์ฌ์๋ ์ค์ ๊ฐ์ | |
for attempt in range(3): # ์ต๋ 3๋ฒ ์ฌ์๋ | |
try: | |
response = openai_client.chat.completions.create( | |
model="gpt-4.1-mini", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": f"The default language is set to English.๋ค์ PDF ๋ด์ฉ์ ์ฐธ๊ณ ํ์ฌ ์ง๋ฌธ์ ๋ต๋ณํด์ฃผ์ธ์.\n\nPDF ๋ด์ฉ:\n{all_text}\n\n์ง๋ฌธ: {query}"} | |
], | |
temperature=0.7, | |
max_tokens=2048, | |
timeout=30.0 # 30์ด ํ์์์ | |
) | |
answer = response.choices[0].message.content | |
return { | |
"answer": answer, | |
"pdf_id": pdf_id, | |
"query": query | |
} | |
except Exception as api_error: | |
logger.error(f"OpenAI API ํธ์ถ ์ค๋ฅ (์๋ {attempt+1}/3): {api_error}") | |
if attempt == 2: # ๋ง์ง๋ง ์๋์์๋ ์คํจ | |
raise api_error | |
await asyncio.sleep(1 * (attempt + 1)) # ์ฌ์๋ ๊ฐ ์ง์ฐ ์๊ฐ ์ฆ๊ฐ | |
# ์ฌ๊ธฐ๊น์ง ๋๋ฌํ์ง ์์์ผ ํจ | |
raise Exception("API ํธ์ถ ์ฌ์๋ ๋ชจ๋ ์คํจ") | |
except Exception as api_error: | |
logger.error(f"OpenAI API ํธ์ถ ์ต์ข ์ค๋ฅ: {api_error}") | |
# ์ค๋ฅ ์ ํ์ ๋ฐ๋ฅธ ๋ ๋ช ํํ ๋ฉ์์ง ์ ๊ณต | |
error_message = str(api_error) | |
if "Connection" in error_message: | |
return {"error": "OpenAI ์๋ฒ์ ์ฐ๊ฒฐํ ์ ์์ต๋๋ค. ์ธํฐ๋ท ์ฐ๊ฒฐ์ ํ์ธํ์ธ์."} | |
elif "Unauthorized" in error_message or "Authentication" in error_message: | |
return {"error": "API ํค๊ฐ ์ ํจํ์ง ์์ต๋๋ค."} | |
elif "Rate limit" in error_message: | |
return {"error": "API ํธ์ถ ํ๋๋ฅผ ์ด๊ณผํ์ต๋๋ค. ์ ์ ํ ๋ค์ ์๋ํ์ธ์."} | |
else: | |
return {"error": f"AI ์๋ต ์์ฑ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {error_message}"} | |
except Exception as e: | |
logger.error(f"์ง์์๋ต ์ฒ๋ฆฌ ์ค๋ฅ: {e}") | |
return {"error": str(e)} | |
# PDF ์์ฝ ์์ฑ | |
# PDF ์์ฝ ์์ฑ ํจ์ ๊ฐ์ | |
async def summarize_pdf(pdf_id: str) -> Dict[str, Any]: | |
try: | |
# API ํค๊ฐ ์๊ฑฐ๋ ์ ํจํ์ง ์์ ๊ฒฝ์ฐ | |
if not HAS_VALID_API_KEY or not openai_client: | |
return { | |
"error": "OpenAI API ํค๊ฐ ์ค์ ๋์ง ์์์ต๋๋ค. 'LLM_API' ํ๊ฒฝ ๋ณ์๋ฅผ ํ์ธํ์ธ์.", | |
"summary": "API ํค๊ฐ ์์ด ์์ฝ์ ์์ฑํ ์ ์์ต๋๋ค. ์์คํ ๊ด๋ฆฌ์์๊ฒ ๋ฌธ์ํ์ธ์." | |
} | |
# ์๋ฒ ๋ฉ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ | |
embedding_data = await get_pdf_embedding(pdf_id) | |
if "error" in embedding_data: | |
return {"error": embedding_data["error"], "summary": "PDF์์ ํ ์คํธ๋ฅผ ์ถ์ถํ ์ ์์ต๋๋ค."} | |
# ์ฒญํฌ ํ ์คํธ ๋ชจ์ผ๊ธฐ (์ ํ๋ ๊ธธ์ด) | |
all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) | |
# ์ปจํ ์คํธ ํฌ๊ธฐ๋ฅผ ๊ณ ๋ คํ์ฌ ํ ์คํธ๊ฐ ๋๋ฌด ๊ธธ๋ฉด ์๋ถ๋ถ๋ง ์ฌ์ฉ | |
max_context_length = 60000 # ํ ํฐ ์๊ฐ ์๋ ๋ฌธ์ ์ ๊ธฐ์ค (๋๋ต์ ์ธ ์ ํ) | |
if len(all_text) > max_context_length: | |
all_text = all_text[:max_context_length] + "...(์ดํ ์๋ต)" | |
# OpenAI API ํธ์ถ | |
try: | |
# ํ์์์ ๋ฐ ์ฌ์๋ ์ค์ ๊ฐ์ | |
for attempt in range(3): # ์ต๋ 3๋ฒ ์ฌ์๋ | |
try: | |
response = openai_client.chat.completions.create( | |
model="gpt-4.1-mini", | |
messages=[ | |
{"role": "system", "content": "The default language is set to English. ๋ค์ PDF ๋ด์ฉ์ ๊ฐ๊ฒฐํ๊ฒ ์์ฝํด์ฃผ์ธ์. ํต์ฌ ์ฃผ์ ์ ์ฃผ์ ํฌ์ธํธ๋ฅผ ํฌํจํ ์์ฝ์ 500์ ์ด๋ด๋ก ์์ฑํด์ฃผ์ธ์."}, | |
{"role": "user", "content": f"PDF ๋ด์ฉ:\n{all_text}"} | |
], | |
temperature=0.7, | |
max_tokens=1024, | |
timeout=30.0 # 30์ด ํ์์์ | |
) | |
summary = response.choices[0].message.content | |
return { | |
"summary": summary, | |
"pdf_id": pdf_id | |
} | |
except Exception as api_error: | |
logger.error(f"OpenAI API ํธ์ถ ์ค๋ฅ (์๋ {attempt+1}/3): {api_error}") | |
if attempt == 2: # ๋ง์ง๋ง ์๋์์๋ ์คํจ | |
raise api_error | |
await asyncio.sleep(1 * (attempt + 1)) # ์ฌ์๋ ๊ฐ ์ง์ฐ ์๊ฐ ์ฆ๊ฐ | |
# ์ฌ๊ธฐ๊น์ง ๋๋ฌํ์ง ์์์ผ ํจ | |
raise Exception("API ํธ์ถ ์ฌ์๋ ๋ชจ๋ ์คํจ") | |
except Exception as api_error: | |
logger.error(f"OpenAI API ํธ์ถ ์ต์ข ์ค๋ฅ: {api_error}") | |
# ์ค๋ฅ ์ ํ์ ๋ฐ๋ฅธ ๋ ๋ช ํํ ๋ฉ์์ง ์ ๊ณต | |
error_message = str(api_error) | |
if "Connection" in error_message: | |
return {"error": "OpenAI ์๋ฒ์ ์ฐ๊ฒฐํ ์ ์์ต๋๋ค. ์ธํฐ๋ท ์ฐ๊ฒฐ์ ํ์ธํ์ธ์.", "pdf_id": pdf_id} | |
elif "Unauthorized" in error_message or "Authentication" in error_message: | |
return {"error": "API ํค๊ฐ ์ ํจํ์ง ์์ต๋๋ค.", "pdf_id": pdf_id} | |
elif "Rate limit" in error_message: | |
return {"error": "API ํธ์ถ ํ๋๋ฅผ ์ด๊ณผํ์ต๋๋ค. ์ ์ ํ ๋ค์ ์๋ํ์ธ์.", "pdf_id": pdf_id} | |
else: | |
return {"error": f"AI ์์ฝ ์์ฑ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค: {error_message}", "pdf_id": pdf_id} | |
except Exception as e: | |
logger.error(f"PDF ์์ฝ ์์ฑ ์ค๋ฅ: {e}") | |
return { | |
"error": str(e), | |
"summary": "PDF ์์ฝ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค. PDF ํ์ด์ง ์๊ฐ ๋๋ฌด ๋ง๊ฑฐ๋ ํ์์ด ์ง์๋์ง ์์ ์ ์์ต๋๋ค." | |
} | |
# ์ต์ ํ๋ PDF ํ์ด์ง ์บ์ฑ ํจ์ | |
async def cache_pdf(pdf_path: str): | |
try: | |
import fitz # PyMuPDF | |
pdf_file = pathlib.Path(pdf_path) | |
pdf_name = pdf_file.stem | |
# ๋ฝ ์์ฑ - ๋์ผํ PDF์ ๋ํด ๋์ ์บ์ฑ ๋ฐฉ์ง | |
if pdf_name not in cache_locks: | |
cache_locks[pdf_name] = threading.Lock() | |
# ์ด๋ฏธ ์บ์ฑ ์ค์ด๊ฑฐ๋ ์บ์ฑ ์๋ฃ๋ PDF๋ ๊ฑด๋๋ฐ๊ธฐ | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: | |
logger.info(f"PDF {pdf_name} ์ด๋ฏธ ์บ์ฑ ์๋ฃ ๋๋ ์งํ ์ค") | |
return | |
with cache_locks[pdf_name]: | |
# ์ด์ค ์ฒดํฌ - ๋ฝ ํ๋ ํ ๋ค์ ํ์ธ | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: | |
return | |
# ์บ์ ์ํ ์ ๋ฐ์ดํธ | |
pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []} | |
# ์บ์ ํ์ผ์ด ์ด๋ฏธ ์กด์ฌํ๋์ง ํ์ธ | |
cache_path = get_cache_path(pdf_name) | |
if cache_path.exists(): | |
try: | |
with open(cache_path, "r") as cache_file: | |
cached_data = json.load(cache_file) | |
if cached_data.get("status") == "completed" and cached_data.get("pages"): | |
pdf_cache[pdf_name] = cached_data | |
pdf_cache[pdf_name]["status"] = "completed" | |
logger.info(f"์บ์ ํ์ผ์์ {pdf_name} ๋ก๋ ์๋ฃ") | |
return | |
except Exception as e: | |
logger.error(f"์บ์ ํ์ผ ๋ก๋ ์คํจ: {e}") | |
# PDF ํ์ผ ์ด๊ธฐ | |
doc = fitz.open(pdf_path) | |
total_pages = doc.page_count | |
# ๋ฏธ๋ฆฌ ์ธ๋ค์ผ๋ง ๋จผ์ ์์ฑ (๋น ๋ฅธ UI ๋ก๋ฉ์ฉ) | |
if total_pages > 0: | |
# ์ฒซ ํ์ด์ง ์ธ๋ค์ผ ์์ฑ | |
page = doc[0] | |
pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # ๋ ์์ ์ธ๋ค์ผ | |
thumb_data = pix_thumb.tobytes("png") | |
b64_thumb = base64.b64encode(thumb_data).decode('utf-8') | |
thumb_src = f"data:image/png;base64,{b64_thumb}" | |
# ์ธ๋ค์ผ ํ์ด์ง๋ง ๋จผ์ ์บ์ | |
pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}] | |
pdf_cache[pdf_name]["progress"] = 1 | |
pdf_cache[pdf_name]["total_pages"] = total_pages | |
# ์ด๋ฏธ์ง ํด์๋ ๋ฐ ์์ถ ํ์ง ์ค์ (์ฑ๋ฅ ์ต์ ํ) | |
scale_factor = 1.0 # ๊ธฐ๋ณธ ํด์๋ (๋ฎ์ถ์๋ก ๋ก๋ฉ ๋น ๋ฆ) | |
jpeg_quality = 80 # JPEG ํ์ง (๋ฎ์ถ์๋ก ์ฉ๋ ์์์ง) | |
# ํ์ด์ง ์ฒ๋ฆฌ ์์ ์ ํจ์ (๋ณ๋ ฌ ์ฒ๋ฆฌ์ฉ) | |
def process_page(page_num): | |
try: | |
page = doc[page_num] | |
# ์ด๋ฏธ์ง๋ก ๋ณํ ์ ๋งคํธ๋ฆญ์ค ์ค์ผ์ผ๋ง ์ ์ฉ (์ฑ๋ฅ ์ต์ ํ) | |
pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor)) | |
# JPEG ํ์์ผ๋ก ์ธ์ฝ๋ฉ (PNG๋ณด๋ค ํฌ๊ธฐ ์์) | |
img_data = pix.tobytes("jpeg", jpeg_quality) | |
b64_img = base64.b64encode(img_data).decode('utf-8') | |
img_src = f"data:image/jpeg;base64,{b64_img}" | |
# ์ธ๋ค์ผ (์ฒซ ํ์ด์ง๊ฐ ์๋๋ฉด ๋น ๋ฌธ์์ด) | |
thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"] | |
return { | |
"page_num": page_num, | |
"src": img_src, | |
"thumb": thumb_src | |
} | |
except Exception as e: | |
logger.error(f"ํ์ด์ง {page_num} ์ฒ๋ฆฌ ์ค๋ฅ: {e}") | |
return { | |
"page_num": page_num, | |
"src": "", | |
"thumb": "", | |
"error": str(e) | |
} | |
# ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ก ๋ชจ๋ ํ์ด์ง ์ฒ๋ฆฌ | |
pages = [None] * total_pages | |
processed_count = 0 | |
# ํ์ด์ง ๋ฐฐ์น ์ฒ๋ฆฌ (๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ) | |
batch_size = 5 # ํ ๋ฒ์ ์ฒ๋ฆฌํ ํ์ด์ง ์ | |
for batch_start in range(0, total_pages, batch_size): | |
batch_end = min(batch_start + batch_size, total_pages) | |
current_batch = list(range(batch_start, batch_end)) | |
# ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ก ๋ฐฐ์น ํ์ด์ง ๋ ๋๋ง | |
with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor: | |
batch_results = list(executor.map(process_page, current_batch)) | |
# ๊ฒฐ๊ณผ ์ ์ฅ | |
for result in batch_results: | |
page_num = result["page_num"] | |
pages[page_num] = { | |
"src": result["src"], | |
"thumb": result["thumb"] | |
} | |
processed_count += 1 | |
progress = round(processed_count / total_pages * 100) | |
pdf_cache[pdf_name]["progress"] = progress | |
# ์ค๊ฐ ์ ์ฅ | |
pdf_cache[pdf_name]["pages"] = pages | |
try: | |
with open(cache_path, "w") as cache_file: | |
json.dump({ | |
"status": "processing", | |
"progress": pdf_cache[pdf_name]["progress"], | |
"pages": pdf_cache[pdf_name]["pages"], | |
"total_pages": total_pages | |
}, cache_file) | |
except Exception as e: | |
logger.error(f"์ค๊ฐ ์บ์ ์ ์ฅ ์คํจ: {e}") | |
# ์บ์ฑ ์๋ฃ | |
pdf_cache[pdf_name] = { | |
"status": "completed", | |
"progress": 100, | |
"pages": pages, | |
"total_pages": total_pages | |
} | |
# ์ต์ข ์บ์ ํ์ผ ์ ์ฅ | |
try: | |
with open(cache_path, "w") as cache_file: | |
json.dump(pdf_cache[pdf_name], cache_file) | |
logger.info(f"PDF {pdf_name} ์บ์ฑ ์๋ฃ, {total_pages}ํ์ด์ง") | |
except Exception as e: | |
logger.error(f"์ต์ข ์บ์ ์ ์ฅ ์คํจ: {e}") | |
except Exception as e: | |
import traceback | |
logger.error(f"PDF ์บ์ฑ ์ค๋ฅ: {str(e)}\n{traceback.format_exc()}") | |
if pdf_name in pdf_cache: | |
pdf_cache[pdf_name]["status"] = "error" | |
pdf_cache[pdf_name]["error"] = str(e) | |
# PDF ID๋ก PDF ๊ฒฝ๋ก ์ฐพ๊ธฐ (๊ฐ์ ๋ ๊ฒ์ ๋ก์ง) | |
def get_pdf_path_by_id(pdf_id: str) -> str: | |
logger.info(f"PDF ID๋ก ํ์ผ ์กฐํ: {pdf_id}") | |
# 1. ๋ฉํ๋ฐ์ดํฐ์์ ์ง์ ID๋ก ๊ฒ์ | |
if pdf_id in pdf_metadata: | |
path = pdf_metadata[pdf_id] | |
# ํ์ผ ์กด์ฌ ํ์ธ | |
if os.path.exists(path): | |
return path | |
# ํ์ผ์ด ์ด๋ํ์ ์ ์์ผ๋ฏ๋ก ํ์ผ๋ช ์ผ๋ก ๊ฒ์ | |
filename = os.path.basename(path) | |
# ์๊ตฌ ์ ์ฅ์์์ ๊ฒ์ | |
perm_path = PERMANENT_PDF_DIR / filename | |
if perm_path.exists(): | |
# ๋ฉํ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ | |
pdf_metadata[pdf_id] = str(perm_path) | |
save_pdf_metadata() | |
return str(perm_path) | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์์ ๊ฒ์ | |
main_path = PDF_DIR / filename | |
if main_path.exists(): | |
# ๋ฉํ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ | |
pdf_metadata[pdf_id] = str(main_path) | |
save_pdf_metadata() | |
return str(main_path) | |
# 2. ํ์ผ๋ช ๋ถ๋ถ๋ง ์ถ์ถํ์ฌ ๋ชจ๋ PDF ํ์ผ ๊ฒ์ | |
try: | |
# ID ํ์: filename_timestamp_random | |
# ํ์ผ๋ช ๋ถ๋ถ๋ง ์ถ์ถ | |
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id | |
# ๋ชจ๋ PDF ํ์ผ ๊ฒ์ | |
for file_path in get_pdf_files() + get_permanent_pdf_files(): | |
# ํ์ผ๋ช ์ด ID์ ์์ ๋ถ๋ถ๊ณผ ์ผ์นํ๋ฉด | |
file_basename = os.path.basename(file_path) | |
if file_basename.startswith(name_part) or file_path.stem.startswith(name_part): | |
# ID ๋งคํ ์ ๋ฐ์ดํธ | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
return str(file_path) | |
except Exception as e: | |
logger.error(f"ํ์ผ๋ช ๊ฒ์ ์ค ์ค๋ฅ: {e}") | |
# 3. ๋ชจ๋ PDF ํ์ผ์ ๋ํด ๋ฉํ๋ฐ์ดํฐ ํ์ธ | |
for pid, path in pdf_metadata.items(): | |
if os.path.exists(path): | |
file_basename = os.path.basename(path) | |
# ์ ์ฌํ ํ์ผ๋ช ์ ๊ฐ์ง ๊ฒฝ์ฐ | |
if pdf_id in pid or pid in pdf_id: | |
pdf_metadata[pdf_id] = path | |
save_pdf_metadata() | |
return path | |
return None | |
# ์์ ์ ๋ชจ๋ PDF ํ์ผ ์บ์ฑ | |
async def init_cache_all_pdfs(): | |
logger.info("PDF ์บ์ฑ ์์ ์์") | |
# PDF ๋ฉํ๋ฐ์ดํฐ ๋ก๋ | |
load_pdf_metadata() | |
# ๋ฉ์ธ ๋ฐ ์๊ตฌ ๋๋ ํ ๋ฆฌ์์ PDF ํ์ผ ๋ชจ๋ ๊ฐ์ ธ์ค๊ธฐ | |
pdf_files = get_pdf_files() + get_permanent_pdf_files() | |
# ์ค๋ณต ์ ๊ฑฐ | |
unique_pdf_paths = set(str(p) for p in pdf_files) | |
pdf_files = [pathlib.Path(p) for p in unique_pdf_paths] | |
# ํ์ผ ๊ธฐ๋ฐ ๋ฉํ๋ฐ์ดํฐ ์ ๋ฐ์ดํธ | |
for pdf_file in pdf_files: | |
# ID๊ฐ ์๋ ํ์ผ์ ๋ํด ID ์์ฑ | |
found = False | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
found = True | |
# ๊ฒฝ๋ก ์ ๋ฐ์ดํธ ํ์ํ ๊ฒฝ์ฐ | |
if not os.path.exists(path): | |
pdf_metadata[pid] = str(pdf_file) | |
break | |
if not found: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
# ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | |
save_pdf_metadata() | |
# ์ด๋ฏธ ์บ์๋ PDF ํ์ผ ๋ก๋ (๋น ๋ฅธ ์์์ ์ํด ๋จผ์ ์ํ) | |
for cache_file in CACHE_DIR.glob("*_cache.json"): | |
try: | |
pdf_name = cache_file.stem.replace("_cache", "") | |
with open(cache_file, "r") as f: | |
cached_data = json.load(f) | |
if cached_data.get("status") == "completed" and cached_data.get("pages"): | |
pdf_cache[pdf_name] = cached_data | |
pdf_cache[pdf_name]["status"] = "completed" | |
logger.info(f"๊ธฐ์กด ์บ์ ๋ก๋: {pdf_name}") | |
except Exception as e: | |
logger.error(f"์บ์ ํ์ผ ๋ก๋ ์ค๋ฅ: {str(e)}") | |
# ์บ์ฑ๋์ง ์์ PDF ํ์ผ ๋ณ๋ ฌ ์ฒ๋ฆฌ | |
await asyncio.gather(*[asyncio.create_task(cache_pdf(str(pdf_file))) | |
for pdf_file in pdf_files | |
if pdf_file.stem not in pdf_cache | |
or pdf_cache[pdf_file.stem].get("status") != "completed"]) | |
# ๋ฐฑ๊ทธ๋ผ์ด๋ ์์ ์์ ํจ์ | |
async def startup_event(): | |
# PDF ๋ฉํ๋ฐ์ดํฐ ๋ก๋ | |
load_pdf_metadata() | |
# ๋๋ฝ๋ PDF ํ์ผ์ ๋ํ ๋ฉํ๋ฐ์ดํฐ ์์ฑ | |
for pdf_file in get_pdf_files() + get_permanent_pdf_files(): | |
found = False | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
found = True | |
# ๊ฒฝ๋ก ์ ๋ฐ์ดํธ | |
if not os.path.exists(path): | |
pdf_metadata[pid] = str(pdf_file) | |
break | |
if not found: | |
# ์ ID ์์ฑ ๋ฐ ๋ฉํ๋ฐ์ดํฐ์ ์ถ๊ฐ | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
# ๋ณ๊ฒฝ์ฌํญ ์ ์ฅ | |
save_pdf_metadata() | |
# ๋ฐฑ๊ทธ๋ผ์ด๋ ํ์คํฌ๋ก ์บ์ฑ ์คํ | |
asyncio.create_task(init_cache_all_pdfs()) | |
# API ์๋ํฌ์ธํธ: PDF ํ๋ก์ ํธ ๋ชฉ๋ก | |
async def get_pdf_projects_api(): | |
return generate_pdf_projects() | |
# API ์๋ํฌ์ธํธ: ์๊ตฌ ์ ์ฅ๋ PDF ํ๋ก์ ํธ ๋ชฉ๋ก | |
async def get_permanent_pdf_projects_api(): | |
pdf_files = get_permanent_pdf_files() | |
projects_data = [] | |
for pdf_file in pdf_files: | |
# PDF ID ์ฐพ๊ธฐ | |
pdf_id = None | |
for pid, path in pdf_metadata.items(): | |
if os.path.basename(path) == pdf_file.name: | |
pdf_id = pid | |
break | |
# ID๊ฐ ์์ผ๋ฉด ์์ฑ | |
if not pdf_id: | |
pdf_id = generate_pdf_id(pdf_file.name) | |
pdf_metadata[pdf_id] = str(pdf_file) | |
save_pdf_metadata() | |
projects_data.append({ | |
"path": str(pdf_file), | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
}) | |
return projects_data | |
# API ์๋ํฌ์ธํธ: PDF ID๋ก ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ | |
async def get_pdf_info_by_id(pdf_id: str): | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if pdf_path: | |
pdf_file = pathlib.Path(pdf_path) | |
return { | |
"path": pdf_path, | |
"name": pdf_file.stem, | |
"id": pdf_id, | |
"exists": True, | |
"cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" | |
} | |
return {"exists": False, "error": "PDF๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค"} | |
# API ์๋ํฌ์ธํธ: PDF ์ธ๋ค์ผ ์ ๊ณต (์ต์ ํ) | |
async def get_pdf_thumbnail(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
# ์บ์์์ ์ธ๋ค์ผ ๊ฐ์ ธ์ค๊ธฐ | |
if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"): | |
if pdf_cache[pdf_name]["pages"][0].get("thumb"): | |
return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]} | |
# ์บ์์ ์์ผ๋ฉด ์์ฑ (๋ ์๊ณ ๋น ๋ฅธ ์ธ๋ค์ผ) | |
import fitz | |
doc = fitz.open(path) | |
if doc.page_count > 0: | |
page = doc[0] | |
pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # ๋ ์์ ์ธ๋ค์ผ | |
img_data = pix.tobytes("jpeg", 70) # JPEG ์์ถ ์ฌ์ฉ | |
b64_img = base64.b64encode(img_data).decode('utf-8') | |
# ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์บ์ฑ ์์ | |
asyncio.create_task(cache_pdf(path)) | |
return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"} | |
return {"thumbnail": None} | |
except Exception as e: | |
logger.error(f"์ธ๋ค์ผ ์์ฑ ์ค๋ฅ: {str(e)}") | |
return {"error": str(e), "thumbnail": None} | |
# API ์๋ํฌ์ธํธ: ์บ์ ์ํ ํ์ธ | |
async def get_cache_status(path: str = None): | |
if path: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
if pdf_name in pdf_cache: | |
return pdf_cache[pdf_name] | |
return {"status": "not_cached"} | |
else: | |
return {name: {"status": info["status"], "progress": info.get("progress", 0)} | |
for name, info in pdf_cache.items()} | |
# API ์๋ํฌ์ธํธ: PDF์ ๋ํ ์ง์์๋ต | |
async def api_query_pdf(pdf_id: str, query: Dict[str, str]): | |
try: | |
user_query = query.get("query", "") | |
if not user_query: | |
return JSONResponse(content={"error": "์ง๋ฌธ์ด ์ ๊ณต๋์ง ์์์ต๋๋ค"}, status_code=400) | |
# PDF ๊ฒฝ๋ก ํ์ธ | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
return JSONResponse(content={"error": f"PDF ID {pdf_id}์ ํด๋นํ๋ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค"}, status_code=404) | |
# ์ง์์๋ต ์ฒ๋ฆฌ | |
result = await query_pdf(pdf_id, user_query) | |
if "error" in result: | |
return JSONResponse(content={"error": result["error"]}, status_code=500) | |
return result | |
except Exception as e: | |
logger.error(f"์ง์์๋ต API ์ค๋ฅ: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# API ์๋ํฌ์ธํธ: PDF ์์ฝ | |
async def api_summarize_pdf(pdf_id: str): | |
try: | |
# PDF ๊ฒฝ๋ก ํ์ธ | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
return JSONResponse(content={"error": f"PDF ID {pdf_id}์ ํด๋นํ๋ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค"}, status_code=404) | |
# ์์ฝ ์ฒ๋ฆฌ | |
result = await summarize_pdf(pdf_id) | |
if "error" in result: | |
return JSONResponse(content={"error": result["error"]}, status_code=500) | |
return result | |
except Exception as e: | |
logger.error(f"PDF ์์ฝ API ์ค๋ฅ: {e}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# API ์๋ํฌ์ธํธ: ์บ์๋ PDF ์ฝํ ์ธ ์ ๊ณต (์ ์ง์ ๋ก๋ฉ ์ง์) | |
async def get_cached_pdf(path: str, background_tasks: BackgroundTasks): | |
try: | |
pdf_file = pathlib.Path(path) | |
pdf_name = pdf_file.stem | |
# ์บ์ ํ์ธ | |
if pdf_name in pdf_cache: | |
status = pdf_cache[pdf_name].get("status", "") | |
# ์๋ฃ๋ ๊ฒฝ์ฐ ์ ์ฒด ๋ฐ์ดํฐ ๋ฐํ | |
if status == "completed": | |
return pdf_cache[pdf_name] | |
# ์ฒ๋ฆฌ ์ค์ธ ๊ฒฝ์ฐ ํ์ฌ๊น์ง์ ํ์ด์ง ๋ฐ์ดํฐ ํฌํจ (์ ์ง์ ๋ก๋ฉ) | |
elif status == "processing": | |
progress = pdf_cache[pdf_name].get("progress", 0) | |
pages = pdf_cache[pdf_name].get("pages", []) | |
total_pages = pdf_cache[pdf_name].get("total_pages", 0) | |
# ์ผ๋ถ๋ง ์ฒ๋ฆฌ๋ ๊ฒฝ์ฐ์๋ ์ฌ์ฉ ๊ฐ๋ฅํ ํ์ด์ง ์ ๊ณต | |
return { | |
"status": "processing", | |
"progress": progress, | |
"pages": pages, | |
"total_pages": total_pages, | |
"available_pages": len([p for p in pages if p and p.get("src")]) | |
} | |
# ์บ์๊ฐ ์๋ ๊ฒฝ์ฐ ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์บ์ฑ ์์ | |
background_tasks.add_task(cache_pdf, path) | |
return {"status": "started", "progress": 0} | |
except Exception as e: | |
logger.error(f"์บ์๋ PDF ์ ๊ณต ์ค๋ฅ: {str(e)}") | |
return {"error": str(e), "status": "error"} | |
# API ์๋ํฌ์ธํธ: PDF ์๋ณธ ์ฝํ ์ธ ์ ๊ณต(์บ์๊ฐ ์๋ ๊ฒฝ์ฐ) | |
async def get_pdf_content(path: str, background_tasks: BackgroundTasks): | |
try: | |
# ์บ์ฑ ์ํ ํ์ธ | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return JSONResponse(content={"error": f"ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค: {path}"}, status_code=404) | |
pdf_name = pdf_file.stem | |
# ์บ์๋ ๊ฒฝ์ฐ ๋ฆฌ๋ค์ด๋ ํธ | |
if pdf_name in pdf_cache and (pdf_cache[pdf_name].get("status") == "completed" | |
or (pdf_cache[pdf_name].get("status") == "processing" | |
and pdf_cache[pdf_name].get("progress", 0) > 10)): | |
return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"}) | |
# ํ์ผ ์ฝ๊ธฐ | |
with open(path, "rb") as pdf_file: | |
content = pdf_file.read() | |
# ํ์ผ๋ช ์ฒ๋ฆฌ | |
import urllib.parse | |
filename = pdf_file.name | |
encoded_filename = urllib.parse.quote(filename) | |
# ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์บ์ฑ ์์ | |
background_tasks.add_task(cache_pdf, path) | |
# ์๋ต ํค๋ ์ค์ | |
headers = { | |
"Content-Type": "application/pdf", | |
"Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}" | |
} | |
return Response(content=content, media_type="application/pdf", headers=headers) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"PDF ์ฝํ ์ธ ๋ก๋ ์ค๋ฅ: {str(e)}\n{error_details}") | |
return JSONResponse(content={"error": str(e)}, status_code=500) | |
# PDF ์ ๋ก๋ ์๋ํฌ์ธํธ - ์๊ตฌ ์ ์ฅ์์ ์ ์ฅ ๋ฐ ๋ฉ์ธ ํ๋ฉด์ ์๋ ํ์ | |
async def upload_pdf(file: UploadFile = File(...)): | |
try: | |
# ํ์ผ ์ด๋ฆ ํ์ธ | |
if not file.filename.lower().endswith('.pdf'): | |
return JSONResponse( | |
content={"success": False, "message": "PDF ํ์ผ๋ง ์ ๋ก๋ ๊ฐ๋ฅํฉ๋๋ค"}, | |
status_code=400 | |
) | |
# ์๊ตฌ ์ ์ฅ์์ ํ์ผ ์ ์ฅ | |
file_path = PERMANENT_PDF_DIR / file.filename | |
# ํ์ผ ์ฝ๊ธฐ ๋ฐ ์ ์ฅ | |
content = await file.read() | |
with open(file_path, "wb") as buffer: | |
buffer.write(content) | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์๋ ์๋์ผ๋ก ๋ณต์ฌ (์๋ ํ์) | |
with open(PDF_DIR / file.filename, "wb") as buffer: | |
buffer.write(content) | |
# PDF ID ์์ฑ ๋ฐ ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | |
pdf_id = generate_pdf_id(file.filename) | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
# ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์บ์ฑ ์์ | |
asyncio.create_task(cache_pdf(str(file_path))) | |
return JSONResponse( | |
content={ | |
"success": True, | |
"path": str(file_path), | |
"name": file_path.stem, | |
"id": pdf_id, | |
"viewUrl": f"/view/{pdf_id}" | |
}, | |
status_code=200 | |
) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"PDF ์ ๋ก๋ ์ค๋ฅ: {str(e)}\n{error_details}") | |
return JSONResponse( | |
content={"success": False, "message": str(e)}, | |
status_code=500 | |
) | |
# ํ ์คํธ ํ์ผ์ PDF๋ก ๋ณํํ๋ ํจ์ | |
async def convert_text_to_pdf(text_content: str, title: str) -> str: | |
try: | |
# ์ ๋ชฉ์์ ์ ํจํ ํ์ผ๋ช ์์ฑ | |
import re | |
safe_title = re.sub(r'[^\w\-_\. ]', '_', title) | |
if not safe_title: | |
safe_title = "aibook" | |
# ํ์์คํฌํ ์ถ๊ฐ๋ก ๊ณ ์ ํ ํ์ผ๋ช ์์ฑ | |
timestamp = int(time.time()) | |
filename = f"{safe_title}_{timestamp}.pdf" | |
# ์๊ตฌ ์ ์ฅ์์ ํ์ผ ๊ฒฝ๋ก | |
file_path = PERMANENT_PDF_DIR / filename | |
# ํ๊ธ ํฐํธ ๋ฑ๋ก - ์ ๋ก๋๋ MaruBuri-SemiBold.ttf ์ฌ์ฉ | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.pdfbase.ttfonts import TTFont | |
# ํฐํธ ๊ฒฝ๋ก ์ค์ (app.py์ ๊ฐ์ ๋๋ ํ ๋ฆฌ์ ์๋ ํฐํธ ์ฌ์ฉ) | |
font_path = BASE / "MaruBuri-SemiBold.ttf" | |
# ํฐํธ ๋ฑ๋ก | |
font_name = "MaruBuri" | |
if font_path.exists(): | |
pdfmetrics.registerFont(TTFont(font_name, str(font_path))) | |
logger.info(f"ํ๊ธ ํฐํธ ๋ฑ๋ก ์ฑ๊ณต: {font_path}") | |
else: | |
font_name = "Helvetica" | |
logger.warning(f"ํ๊ธ ํฐํธ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค: {font_path}. ๊ธฐ๋ณธ ํฐํธ๋ฅผ ์ฌ์ฉํฉ๋๋ค.") | |
# ์์ PDF ํ์ผ ์์ฑ | |
pdf_buffer = io.BytesIO() | |
# ํ๊ธ ์ง์์ ์ํ ์คํ์ผ ์ค์ | |
from reportlab.lib.pagesizes import letter | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.lib.enums import TA_CENTER, TA_LEFT | |
doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8') | |
# ์ฌ์ฉ์ ์ ์ ์คํ์ผ ์์ฑ | |
title_style = ParagraphStyle( | |
name='CustomTitle', | |
fontName=font_name, | |
fontSize=18, | |
leading=22, | |
alignment=TA_CENTER, | |
spaceAfter=20 | |
) | |
normal_style = ParagraphStyle( | |
name='CustomNormal', | |
fontName=font_name, | |
fontSize=12, | |
leading=15, | |
alignment=TA_LEFT, | |
spaceBefore=6, | |
spaceAfter=6 | |
) | |
# ๋ด์ฉ์ ๋ฌธ๋จ์ผ๋ก ๋ถํ | |
content = [] | |
# ์ ๋ชฉ ์ถ๊ฐ | |
content.append(Paragraph(title, title_style)) | |
content.append(Spacer(1, 20)) | |
# ํ ์คํธ๋ฅผ ๋จ๋ฝ์ผ๋ก ๋ถ๋ฆฌํ์ฌ ์ถ๊ฐ | |
paragraphs = text_content.split('\n\n') | |
for para in paragraphs: | |
if para.strip(): | |
# XML ํน์๋ฌธ์ ์ด์ค์ผ์ดํ ์ฒ๋ฆฌ | |
from xml.sax.saxutils import escape | |
safe_para = escape(para.replace('\n', '<br/>')) | |
p = Paragraph(safe_para, normal_style) | |
content.append(p) | |
content.append(Spacer(1, 10)) | |
# PDF ์์ฑ | |
doc.build(content) | |
# ํ์ผ๋ก ์ ์ฅ | |
with open(file_path, 'wb') as f: | |
f.write(pdf_buffer.getvalue()) | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์๋ ๋ณต์ฌ | |
with open(PDF_DIR / filename, 'wb') as f: | |
f.write(pdf_buffer.getvalue()) | |
# PDF ID ์์ฑ ๋ฐ ๋ฉํ๋ฐ์ดํฐ ์ ์ฅ | |
pdf_id = generate_pdf_id(filename) | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
# ๋ฐฑ๊ทธ๋ผ์ด๋์์ ์บ์ฑ ์์ | |
asyncio.create_task(cache_pdf(str(file_path))) | |
return { | |
"path": str(file_path), | |
"filename": filename, | |
"id": pdf_id | |
} | |
except Exception as e: | |
logger.error(f"ํ ์คํธ๋ฅผ PDF๋ก ๋ณํ ์ค ์ค๋ฅ: {e}") | |
raise e | |
# AI๋ฅผ ์ฌ์ฉํ์ฌ ํ ์คํธ๋ฅผ ๋ ๊ตฌ์กฐํ๋ ํ์์ผ๋ก ๋ณํ (OpenAI ์ ๊ฑฐ ๋ฒ์ ) | |
async def enhance_text_with_ai(text_content: str, title: str) -> str: | |
# ์๋ณธ ํ ์คํธ ๊ทธ๋๋ก ๋ฐํ (AI ํฅ์ ๊ธฐ๋ฅ ๋นํ์ฑํ) | |
return text_content | |
# ํ ์คํธ ํ์ผ์ PDF๋ก ๋ณํํ๋ ์๋ํฌ์ธํธ | |
async def text_to_pdf(file: UploadFile = File(...)): | |
try: | |
# ์ง์ํ๋ ํ์ผ ํ์ ํ์ธ | |
filename = file.filename.lower() | |
if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')): | |
return JSONResponse( | |
content={"success": False, "message": "์ง์ํ๋ ํ์ผ ํ์์ .txt, .docx, .doc์ ๋๋ค."}, | |
status_code=400 | |
) | |
# ํ์ผ ๋ด์ฉ ์ฝ๊ธฐ | |
content = await file.read() | |
# ํ์ผ ํ์ ์ ๋ฐ๋ผ ํ ์คํธ ์ถ์ถ | |
if filename.endswith('.txt'): | |
# ์ธ์ฝ๋ฉ ์๋ ๊ฐ์ง ์๋ | |
encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1'] | |
text_content = None | |
for encoding in encodings: | |
try: | |
text_content = content.decode(encoding, errors='strict') | |
logger.info(f"ํ ์คํธ ํ์ผ ์ธ์ฝ๋ฉ ๊ฐ์ง: {encoding}") | |
break | |
except UnicodeDecodeError: | |
continue | |
if text_content is None: | |
# ๋ชจ๋ ์ธ์ฝ๋ฉ ์๋ ์คํจ ์ ๊ธฐ๋ณธ์ ์ผ๋ก UTF-8๋ก ์๋ํ๊ณ ์ค๋ฅ๋ ๋์ฒด ๋ฌธ์๋ก ์ฒ๋ฆฌ | |
text_content = content.decode('utf-8', errors='replace') | |
logger.warning("ํ ์คํธ ํ์ผ ์ธ์ฝ๋ฉ์ ๊ฐ์งํ ์ ์์ด UTF-8์ผ๋ก ์๋ํฉ๋๋ค.") | |
elif filename.endswith('.docx') or filename.endswith('.doc'): | |
# ์์ ํ์ผ๋ก ์ ์ฅ | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: | |
temp_file.write(content) | |
temp_path = temp_file.name | |
try: | |
# docx2txt๋ก ํ ์คํธ ์ถ์ถ | |
text_content = docx2txt.process(temp_path) | |
finally: | |
# ์์ ํ์ผ ์ญ์ | |
os.unlink(temp_path) | |
# ํ์ผ๋ช ์์ ์ ๋ชฉ ์ถ์ถ (ํ์ฅ์ ์ ์ธ) | |
title = os.path.splitext(filename)[0] | |
# AI๋ก ํ ์คํธ ๋ด์ฉ ํฅ์ | |
enhanced_text = await enhance_text_with_ai(text_content, title) | |
# ํ ์คํธ๋ฅผ PDF๋ก ๋ณํ | |
pdf_info = await convert_text_to_pdf(enhanced_text, title) | |
return JSONResponse( | |
content={ | |
"success": True, | |
"path": pdf_info["path"], | |
"name": os.path.splitext(pdf_info["filename"])[0], | |
"id": pdf_info["id"], | |
"viewUrl": f"/view/{pdf_info['id']}" | |
}, | |
status_code=200 | |
) | |
except Exception as e: | |
import traceback | |
error_details = traceback.format_exc() | |
logger.error(f"ํ ์คํธ๋ฅผ PDF๋ก ๋ณํ ์ค ์ค๋ฅ: {str(e)}\n{error_details}") | |
return JSONResponse( | |
content={"success": False, "message": str(e)}, | |
status_code=500 | |
) | |
# ๊ด๋ฆฌ์ ์ธ์ฆ ์๋ํฌ์ธํธ | |
async def admin_login(password: str = Form(...)): | |
if password == ADMIN_PASSWORD: | |
return {"success": True} | |
return {"success": False, "message": "์ธ์ฆ ์คํจ"} | |
# ๊ด๋ฆฌ์์ฉ PDF ์ญ์ ์๋ํฌ์ธํธ | |
async def delete_pdf(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return {"success": False, "message": "ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค"} | |
# PDF ํ์ผ๋ช ๊ฐ์ ธ์ค๊ธฐ | |
filename = pdf_file.name | |
# PDF ํ์ผ ์ญ์ (์๊ตฌ ์ ์ฅ์์์) | |
pdf_file.unlink() | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์์๋ ๋์ผํ ํ์ผ์ด ์์ผ๋ฉด ์ญ์ (๋ฒ๊ทธ ์์ ) | |
main_file_path = PDF_DIR / filename | |
if main_file_path.exists(): | |
main_file_path.unlink() | |
# ๊ด๋ จ ์บ์ ํ์ผ ์ญ์ | |
pdf_name = pdf_file.stem | |
cache_path = get_cache_path(pdf_name) | |
if cache_path.exists(): | |
cache_path.unlink() | |
# ์บ์ ๋ฉ๋ชจ๋ฆฌ์์๋ ์ ๊ฑฐ | |
if pdf_name in pdf_cache: | |
del pdf_cache[pdf_name] | |
# ๋ฉํ๋ฐ์ดํฐ์์ ํด๋น ํ์ผ ID ์ ๊ฑฐ | |
to_remove = [] | |
for pid, fpath in pdf_metadata.items(): | |
if os.path.basename(fpath) == filename: | |
to_remove.append(pid) | |
for pid in to_remove: | |
del pdf_metadata[pid] | |
save_pdf_metadata() | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"PDF ์ญ์ ์ค๋ฅ: {str(e)}") | |
return {"success": False, "message": str(e)} | |
# PDF๋ฅผ ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์ ํ์ ์ค์ | |
async def feature_pdf(path: str): | |
try: | |
pdf_file = pathlib.Path(path) | |
if not pdf_file.exists(): | |
return {"success": False, "message": "ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค"} | |
# ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์ ๋ณต์ฌ | |
target_path = PDF_DIR / pdf_file.name | |
shutil.copy2(pdf_file, target_path) | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"PDF ํ์ ์ค์ ์ค๋ฅ: {str(e)}") | |
return {"success": False, "message": str(e)} | |
# PDF๋ฅผ ๋ฉ์ธ ๋๋ ํ ๋ฆฌ์์ ์ ๊ฑฐ (์๊ตฌ ์ ์ฅ์์์๋ ์ ์ง) | |
async def unfeature_pdf(path: str): | |
try: | |
pdf_name = pathlib.Path(path).name | |
target_path = PDF_DIR / pdf_name | |
if target_path.exists(): | |
target_path.unlink() | |
return {"success": True} | |
except Exception as e: | |
logger.error(f"PDF ํ์ ํด์ ์ค๋ฅ: {str(e)}") | |
return {"success": False, "message": str(e)} | |
# ์ง์ PDF ๋ทฐ์ด URL ์ ๊ทผ์ฉ ๋ผ์ฐํธ | |
async def view_pdf_by_id(pdf_id: str): | |
# PDF ID ์ ํจํ์ง ํ์ธ | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
# ์ผ๋จ ๋ชจ๋ PDF ๋ฉํ๋ฐ์ดํฐ๋ฅผ ๋ค์ ๋ก๋ํ๊ณ ์ฌ์๋ | |
load_pdf_metadata() | |
pdf_path = get_pdf_path_by_id(pdf_id) | |
if not pdf_path: | |
# ๋ชจ๋ PDF ํ์ผ์ ์ง์ ์ค์บํ์ฌ ์ ์ฌํ ์ด๋ฆ ์ฐพ๊ธฐ | |
for file_path in get_pdf_files() + get_permanent_pdf_files(): | |
name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id | |
if file_path.stem.startswith(name_part): | |
pdf_metadata[pdf_id] = str(file_path) | |
save_pdf_metadata() | |
pdf_path = str(file_path) | |
break | |
if not pdf_path: | |
return HTMLResponse( | |
content=f"<html><body><h1>PDF๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค</h1><p>ID: {pdf_id}</p><a href='/'>ํ์ผ๋ก ๋์๊ฐ๊ธฐ</a></body></html>", | |
status_code=404 | |
) | |
# ๋ฉ์ธ ํ์ด์ง๋ก ๋ฆฌ๋ค์ด๋ ํธํ๋, PDF ID ํ๋ผ๋ฏธํฐ ์ถ๊ฐ | |
return get_html_content(pdf_id=pdf_id) | |
# HTML ํ์ผ ์ฝ๊ธฐ ํจ์ | |
def get_html_content(pdf_id: str = None): | |
html_path = BASE / "flipbook_template.html" | |
content = "" | |
if html_path.exists(): | |
with open(html_path, "r", encoding="utf-8") as f: | |
content = f.read() | |
else: | |
content = HTML # ๊ธฐ๋ณธ HTML ์ฌ์ฉ | |
# PDF ID๊ฐ ์ ๊ณต๋ ๊ฒฝ์ฐ, ์๋ ๋ก๋ ์คํฌ๋ฆฝํธ ์ถ๊ฐ | |
if pdf_id: | |
auto_load_script = f""" | |
<script> | |
// ํ์ด์ง ๋ก๋ ์ ์๋์ผ๋ก ํด๋น PDF ์ด๊ธฐ | |
document.addEventListener('DOMContentLoaded', async function() {{ | |
try {{ | |
// PDF ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ | |
const response = await fetch('/api/pdf-info-by-id/{pdf_id}'); | |
const pdfInfo = await response.json(); | |
if (pdfInfo.exists && pdfInfo.path) {{ | |
// ์ฝ๊ฐ์ ์ง์ฐ ํ PDF ๋ทฐ์ด ์ด๊ธฐ (UI๊ฐ ์ค๋น๋ ํ) | |
setTimeout(() => {{ | |
openPdfById('{pdf_id}', pdfInfo.path, pdfInfo.cached); | |
}}, 500); | |
}} else {{ | |
showError("์์ฒญํ PDF๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค."); | |
}} | |
}} catch (e) {{ | |
console.error("์๋ PDF ๋ก๋ ์ค๋ฅ:", e); | |
}} | |
}}); | |
</script> | |
""" | |
# body ์ข ๋ฃ ํ๊ทธ ์ ์ ์คํฌ๋ฆฝํธ ์ฝ์ | |
content = content.replace("</body>", auto_load_script + "</body>") | |
return HTMLResponse(content=content) | |
async def root(request: Request, pdf_id: Optional[str] = Query(None)): | |
# PDF ID๊ฐ ์ฟผ๋ฆฌ ํ๋ผ๋ฏธํฐ๋ก ์ ๊ณต๋ ๊ฒฝ์ฐ /view/{pdf_id}๋ก ๋ฆฌ๋ค์ด๋ ํธ | |
if pdf_id: | |
return RedirectResponse(url=f"/view/{pdf_id}") | |
return get_html_content() | |
# HTML ๋ฌธ์์ด (AI ๋ฒํผ ๋ฐ ์ฑ๋ด UI ์ถ๊ฐ) | |
# HTML ๋ฌธ์์ด (AI ๋ฒํผ ๋ฐ ์ฑ๋ด UI ์ถ๊ฐ) | |
import os | |
# Hugging Face Space์ secret์์ HTML ํ ํ๋ฆฟ ๋ก๋ | |
HTML = os.getenv("HTML_TEMPLATE", "") | |
# HTML์ด ๋น์ด์์ ๊ฒฝ์ฐ ๊ธฐ๋ณธ HTML ์ฌ์ฉ (fallback) | |
if not HTML: | |
logger.warning("HTML_TEMPLATE secret์ด ์ค์ ๋์ด ์์ง ์์ต๋๋ค. ๊ธฐ๋ณธ HTML์ ์ฌ์ฉํฉ๋๋ค.") | |
HTML = """ | |
<!doctype html> | |
<html lang="ko"> | |
<head> | |
<meta charset="utf-8"> | |
<title>FlipBook Space</title> | |
<style> | |
body { font-family: Arial, sans-serif; text-align: center; padding: 50px; } | |
.error { color: red; } | |
</style> | |
</head> | |
<body> | |
<h1>HTML ํ ํ๋ฆฟ์ ๋ถ๋ฌ์ฌ ์ ์์ต๋๋ค</h1> | |
<p class="error">HTML_TEMPLATE secret์ด ์ค์ ๋์ด ์์ง ์์ต๋๋ค.</p> | |
<p>Hugging Face Space์ secret ์์ญ์ HTML_TEMPLATE์ ์ค์ ํด์ฃผ์ธ์.</p> | |
</body> | |
</html> | |
""" | |
if __name__ == "__main__": | |
uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT", 7860))) |