diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,35 +1,3908 @@ -import os -import sys -import streamlit as st -from tempfile import NamedTemporaryFile +from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query +from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse +from fastapi.staticfiles import StaticFiles +import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse +from typing import Dict, List, Any, Optional +import asyncio +import logging +import threading +import concurrent.futures +from openai import OpenAI +import fitz # PyMuPDF +import tempfile +from reportlab.lib.pagesizes import letter +from reportlab.pdfgen import canvas +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer +from reportlab.lib.styles import getSampleStyleSheet +import io +import docx2txt -def main(): +# 로깅 설정 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +BASE = pathlib.Path(__file__).parent +app = FastAPI() +app.mount("/static", StaticFiles(directory=BASE), name="static") + +# PDF 디렉토리 설정 +PDF_DIR = BASE / "pdf" +if not PDF_DIR.exists(): + PDF_DIR.mkdir(parents=True) + +# 영구 PDF 디렉토리 설정 (Hugging Face 영구 디스크) +PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs" +if not PERMANENT_PDF_DIR.exists(): + PERMANENT_PDF_DIR.mkdir(parents=True) + +# 캐시 디렉토리 설정 +CACHE_DIR = BASE / "cache" +if not CACHE_DIR.exists(): + CACHE_DIR.mkdir(parents=True) + +# PDF 메타데이터 디렉토리 및 파일 설정 +METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata" +if not METADATA_DIR.exists(): + METADATA_DIR.mkdir(parents=True) +PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json" + +# 임베딩 캐시 디렉토리 설정 +EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings" +if not EMBEDDING_DIR.exists(): + EMBEDDING_DIR.mkdir(parents=True) + +# 관리자 비밀번호 +ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # 환경 변수에서 가져오기, 기본값은 테스트용 + +# OpenAI API 키 설정 +OPENAI_API_KEY = os.getenv("LLM_API", "") +# API 키가 없거나 비어있을 때 플래그 설정 +HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip()) + +if HAS_VALID_API_KEY: + try: + openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0) + logger.info("OpenAI 클라이언트 초기화 성공") + except Exception as e: + logger.error(f"OpenAI 클라이언트 초기화 실패: {e}") + HAS_VALID_API_KEY = False +else: + logger.warning("유효한 OpenAI API 키가 없습니다. AI 기능이 제한됩니다.") + openai_client = None + +# 전역 캐시 객체 +pdf_cache: Dict[str, Dict[str, Any]] = {} +# 캐싱 락 +cache_locks = {} +# PDF 메타데이터 (ID to 경로 매핑) +pdf_metadata: Dict[str, str] = {} +# PDF 임베딩 캐시 +pdf_embeddings: Dict[str, Dict[str, Any]] = {} + +# PDF 메타데이터 로드 +def load_pdf_metadata(): + global pdf_metadata + if PDF_METADATA_FILE.exists(): + try: + with open(PDF_METADATA_FILE, "r") as f: + pdf_metadata = json.load(f) + logger.info(f"PDF 메타데이터 로드 완료: {len(pdf_metadata)} 항목") + except Exception as e: + logger.error(f"메타데이터 로드 오류: {e}") + pdf_metadata = {} + else: + pdf_metadata = {} + +# PDF 메타데이터 저장 +def save_pdf_metadata(): + try: + with open(PDF_METADATA_FILE, "w") as f: + json.dump(pdf_metadata, f) + except Exception as e: + logger.error(f"메타데이터 저장 오류: {e}") + +# PDF ID 생성 (파일명 + 타임스탬프 기반) - 더 단순하고 안전한 방식으로 변경 +def generate_pdf_id(filename: str) -> str: + # 파일명에서 확장자 제거 + base_name = os.path.splitext(filename)[0] + # 안전한 문자열로 변환 (URL 인코딩 대신 직접 변환) + import re + safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_")) + # 타임스탬프 추가로 고유성 보장 + timestamp = int(time.time()) + # 짧은 임의 문자열 추가 + random_suffix = uuid.uuid4().hex[:6] + return f"{safe_name}_{timestamp}_{random_suffix}" + +# PDF 파일 목록 가져오기 (메인 디렉토리용) +def get_pdf_files(): + pdf_files = [] + if PDF_DIR.exists(): + pdf_files = [f for f in PDF_DIR.glob("*.pdf")] + return pdf_files + +# 영구 저장소의 PDF 파일 목록 가져오기 +def get_permanent_pdf_files(): + pdf_files = [] + if PERMANENT_PDF_DIR.exists(): + pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")] + return pdf_files + +# PDF 썸네일 생성 및 프로젝트 데이터 준비 +def generate_pdf_projects(): + projects_data = [] + + # 메인 디렉토리와 영구 저장소의 파일들 가져오기 + pdf_files = get_pdf_files() + permanent_pdf_files = get_permanent_pdf_files() + + # 모든 파일 합치기 (파일명 기준으로 중복 제거) + unique_files = {} + + # 먼저 메인 디렉토리의 파일들 추가 + for file in pdf_files: + unique_files[file.name] = file + + # 영구 저장소의 파일들 추가 (동일 파일명이 있으면 영구 저장소 파일 우선) + for file in permanent_pdf_files: + unique_files[file.name] = file + + # 중복 제거된 파일들로 프로젝트 데이터 생성 + for pdf_file in unique_files.values(): + # 해당 파일의 PDF ID 찾기 + pdf_id = None + for pid, path in pdf_metadata.items(): + if os.path.basename(path) == pdf_file.name: + pdf_id = pid + break + + # ID가 없으면 새로 생성하고 메타데이터에 추가 + if not pdf_id: + pdf_id = generate_pdf_id(pdf_file.name) + pdf_metadata[pdf_id] = str(pdf_file) + save_pdf_metadata() + + projects_data.append({ + "path": str(pdf_file), + "name": pdf_file.stem, + "id": pdf_id, + "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" + }) + + return projects_data + +# 캐시 파일 경로 생성 +def get_cache_path(pdf_name: str): + return CACHE_DIR / f"{pdf_name}_cache.json" + +# 임베딩 캐시 파일 경로 생성 +def get_embedding_path(pdf_id: str): + return EMBEDDING_DIR / f"{pdf_id}_embedding.json" + +# PDF 텍스트 추출 함수 +def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]: + try: + doc = fitz.open(pdf_path) + chunks = [] + + for page_num in range(len(doc)): + page = doc[page_num] + text = page.get_text() + + # 페이지 텍스트가 있는 경우만 추가 + if text.strip(): + chunks.append({ + "page": page_num + 1, + "text": text, + "chunk_id": f"page_{page_num + 1}" + }) + + return chunks + except Exception as e: + logger.error(f"PDF 텍스트 추출 오류: {e}") + return [] + +# PDF ID로 임베딩 생성 또는 가져오기 +async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]: + try: + # 임베딩 캐시 확인 + embedding_path = get_embedding_path(pdf_id) + if embedding_path.exists(): + try: + with open(embedding_path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.error(f"임베딩 캐시 로드 오류: {e}") + + # PDF 경로 찾기 + pdf_path = get_pdf_path_by_id(pdf_id) + if not pdf_path: + raise ValueError(f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다") + + # 텍스트 추출 + chunks = extract_pdf_text(pdf_path) + if not chunks: + raise ValueError(f"PDF에서 텍스트를 추출할 수 없습니다: {pdf_path}") + + # 임베딩 저장 및 반환 + embedding_data = { + "pdf_id": pdf_id, + "pdf_path": pdf_path, + "chunks": chunks, + "created_at": time.time() + } + + # 임베딩 캐시 저장 + with open(embedding_path, "w", encoding="utf-8") as f: + json.dump(embedding_data, f, ensure_ascii=False) + + return embedding_data + + except Exception as e: + logger.error(f"PDF 임베딩 생성 오류: {e}") + return {"error": str(e), "pdf_id": pdf_id} + +# PDF 내용 기반 질의응답 +# PDF 내용 기반 질의응답 함수 개선 +async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]: + try: + # API 키가 없거나 유효하지 않은 경우 + if not HAS_VALID_API_KEY or not openai_client: + return { + "error": "OpenAI API 키가 설정되지 않았습니다.", + "answer": "죄송합니다. 현재 AI 기능이 비활성화되어 있어 질문에 답변할 수 없습니다. 시스템 관리자에게 문의하세요." + } + + # 임베딩 데이터 가져오기 + embedding_data = await get_pdf_embedding(pdf_id) + if "error" in embedding_data: + return {"error": embedding_data["error"]} + + # 청크 텍스트 모으기 (임시로 간단하게 전체 텍스트 사용) + all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) + + # 컨텍스트 크기를 고려하여 텍스트가 너무 길면 앞부분만 사용 + max_context_length = 60000 # 토큰 수가 아닌 문자 수 기준 (대략적인 제한) + if len(all_text) > max_context_length: + all_text = all_text[:max_context_length] + "...(이하 생략)" + + # 시스템 프롬프트 준비 + system_prompt = """ + 당신은 PDF 내용을 기반으로 질문에 답변하는 도우미입니다. 제공된 PDF 컨텍스트 정보만을 사용하여 답변하세요. + 컨텍스트에 관련 정보가 없는 경우, '제공된 PDF에서 해당 정보를 찾을 수 없습니다'라고 솔직히 답하세요. + 답변은 명확하고 간결하게 작성하고, 관련 페이지 번호를 인용하세요. 반드시 공손한 말투로 친절하게 응답하세요. + """ + + # gpt-4.1-mini 모델 사용 + try: + # 타임아웃 및 재시도 설정 개선 + for attempt in range(3): # 최대 3번 재시도 + try: + response = openai_client.chat.completions.create( + model="gpt-4.1-mini", + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"다음 PDF 내용을 참고하여 질문에 답변해주세요.\n\nPDF 내용:\n{all_text}\n\n질문: {query}"} + ], + temperature=0.7, + max_tokens=2048, + timeout=30.0 # 30초 타임아웃 + ) + + answer = response.choices[0].message.content + return { + "answer": answer, + "pdf_id": pdf_id, + "query": query + } + except Exception as api_error: + logger.error(f"OpenAI API 호출 오류 (시도 {attempt+1}/3): {api_error}") + if attempt == 2: # 마지막 시도에서도 실패 + raise api_error + await asyncio.sleep(1 * (attempt + 1)) # 재시도 간 지연 시간 증가 + + # 여기까지 도달하지 않아야 함 + raise Exception("API 호출 재시도 모두 실패") + except Exception as api_error: + logger.error(f"OpenAI API 호출 최종 오류: {api_error}") + # 오류 유형에 따른 더 명확한 메시지 제공 + error_message = str(api_error) + if "Connection" in error_message: + return {"error": "OpenAI 서버와 연결할 수 없습니다. 인터넷 연결을 확인하세요."} + elif "Unauthorized" in error_message or "Authentication" in error_message: + return {"error": "API 키가 유효하지 않습니다."} + elif "Rate limit" in error_message: + return {"error": "API 호출 한도를 초과했습니다. 잠시 후 다시 시도하세요."} + else: + return {"error": f"AI 응답 생성 중 오류가 발생했습니다: {error_message}"} + + except Exception as e: + logger.error(f"질의응답 처리 오류: {e}") + return {"error": str(e)} + +# PDF 요약 생성 +# PDF 요약 생성 함수 개선 +async def summarize_pdf(pdf_id: str) -> Dict[str, Any]: + try: + # API 키가 없거나 유효하지 않은 경우 + if not HAS_VALID_API_KEY or not openai_client: + return { + "error": "OpenAI API 키가 설정되지 않았습니다. 'LLM_API' 환경 변수를 확인하세요.", + "summary": "API 키가 없어 요약을 생성할 수 없습니다. 시스템 관리자에게 문의하세요." + } + + # 임베딩 데이터 가져오기 + embedding_data = await get_pdf_embedding(pdf_id) + if "error" in embedding_data: + return {"error": embedding_data["error"], "summary": "PDF에서 텍스트를 추출할 수 없습니다."} + + # 청크 텍스트 모으기 (제한된 길이) + all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) + + # 컨텍스트 크기를 고려하여 텍스트가 너무 길면 앞부분만 사용 + max_context_length = 60000 # 토큰 수가 아닌 문자 수 기준 (대략적인 제한) + if len(all_text) > max_context_length: + all_text = all_text[:max_context_length] + "...(이하 생략)" + + # OpenAI API 호출 + try: + # 타임아웃 및 재시도 설정 개선 + for attempt in range(3): # 최대 3번 재시도 + try: + response = openai_client.chat.completions.create( + model="gpt-4.1-mini", + messages=[ + {"role": "system", "content": "다음 PDF 내용을 간결하게 요약해주세요. 핵심 주제와 주요 포인트를 포함한 요약을 500자 이내로 작성해주세요."}, + {"role": "user", "content": f"PDF 내용:\n{all_text}"} + ], + temperature=0.7, + max_tokens=1024, + timeout=30.0 # 30초 타임아웃 + ) + + summary = response.choices[0].message.content + return { + "summary": summary, + "pdf_id": pdf_id + } + except Exception as api_error: + logger.error(f"OpenAI API 호출 오류 (시도 {attempt+1}/3): {api_error}") + if attempt == 2: # 마지막 시도에서도 실패 + raise api_error + await asyncio.sleep(1 * (attempt + 1)) # 재시도 간 지연 시간 증가 + + # 여기까지 도달하지 않아야 함 + raise Exception("API 호출 재시도 모두 실패") + except Exception as api_error: + logger.error(f"OpenAI API 호출 최종 오류: {api_error}") + # 오류 유형에 따른 더 명확한 메시지 제공 + error_message = str(api_error) + if "Connection" in error_message: + return {"error": "OpenAI 서버와 연결할 수 없습니다. 인터넷 연결을 확인하세요.", "pdf_id": pdf_id} + elif "Unauthorized" in error_message or "Authentication" in error_message: + return {"error": "API 키가 유효하지 않습니다.", "pdf_id": pdf_id} + elif "Rate limit" in error_message: + return {"error": "API 호출 한도를 초과했습니다. 잠시 후 다시 시도하세요.", "pdf_id": pdf_id} + else: + return {"error": f"AI 요약 생성 중 오류가 발생했습니다: {error_message}", "pdf_id": pdf_id} + + except Exception as e: + logger.error(f"PDF 요약 생성 오류: {e}") + return { + "error": str(e), + "summary": "PDF 요약 중 오류가 발생했습니다. PDF 페이지 수가 너무 많거나 형식이 지원되지 않을 수 있습니다." + } + + +# 최적화된 PDF 페이지 캐싱 함수 +async def cache_pdf(pdf_path: str): try: - # Get the code from secrets - code = os.environ.get("MAIN_CODE") + import fitz # PyMuPDF + + pdf_file = pathlib.Path(pdf_path) + pdf_name = pdf_file.stem + + # 락 생성 - 동일한 PDF에 대해 동시 캐싱 방지 + if pdf_name not in cache_locks: + cache_locks[pdf_name] = threading.Lock() - if not code: - st.error("⚠️ The application code wasn't found in secrets. Please add the MAIN_CODE secret.") + # 이미 캐싱 중이거나 캐싱 완료된 PDF는 건너뛰기 + if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: + logger.info(f"PDF {pdf_name} 이미 캐싱 완료 또는 진행 중") return - # Create a temporary Python file - with NamedTemporaryFile(suffix='.py', delete=False, mode='w') as tmp: - tmp.write(code) - tmp_path = tmp.name + with cache_locks[pdf_name]: + # 이중 체크 - 락 획득 후 다시 확인 + if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: + return + + # 캐시 상태 업데이트 + pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []} + + # 캐시 파일이 이미 존재하는지 확인 + cache_path = get_cache_path(pdf_name) + if cache_path.exists(): + try: + with open(cache_path, "r") as cache_file: + cached_data = json.load(cache_file) + if cached_data.get("status") == "completed" and cached_data.get("pages"): + pdf_cache[pdf_name] = cached_data + pdf_cache[pdf_name]["status"] = "completed" + logger.info(f"캐시 파일에서 {pdf_name} 로드 완료") + return + except Exception as e: + logger.error(f"캐시 파일 로드 실패: {e}") + + # PDF 파일 열기 + doc = fitz.open(pdf_path) + total_pages = doc.page_count + + # 미리 썸네일만 먼저 생성 (빠른 UI 로딩용) + if total_pages > 0: + # 첫 페이지 썸네일 생성 + page = doc[0] + pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # 더 작은 썸네일 + thumb_data = pix_thumb.tobytes("png") + b64_thumb = base64.b64encode(thumb_data).decode('utf-8') + thumb_src = f"data:image/png;base64,{b64_thumb}" + + # 썸네일 페이지만 먼저 캐시 + pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}] + pdf_cache[pdf_name]["progress"] = 1 + pdf_cache[pdf_name]["total_pages"] = total_pages + + # 이미지 해상도 및 압축 품질 설정 (성능 최적화) + scale_factor = 1.0 # 기본 해상도 (낮출수록 로딩 빠름) + jpeg_quality = 80 # JPEG 품질 (낮출수록 용량 작아짐) + + # 페이지 처리 작업자 함수 (병렬 처리용) + def process_page(page_num): + try: + page = doc[page_num] + + # 이미지로 변환 시 매트릭스 스케일링 적용 (성능 최적화) + pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor)) + + # JPEG 형식으로 인코딩 (PNG보다 크기 작음) + img_data = pix.tobytes("jpeg", jpeg_quality) + b64_img = base64.b64encode(img_data).decode('utf-8') + img_src = f"data:image/jpeg;base64,{b64_img}" + + # 썸네일 (첫 페이지가 아니면 빈 문자열) + thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"] + + return { + "page_num": page_num, + "src": img_src, + "thumb": thumb_src + } + except Exception as e: + logger.error(f"페이지 {page_num} 처리 오류: {e}") + return { + "page_num": page_num, + "src": "", + "thumb": "", + "error": str(e) + } + + # 병렬 처리로 모든 페이지 처리 + pages = [None] * total_pages + processed_count = 0 + + # 페이지 배치 처리 (메모리 관리) + batch_size = 5 # 한 번에 처리할 페이지 수 + + for batch_start in range(0, total_pages, batch_size): + batch_end = min(batch_start + batch_size, total_pages) + current_batch = list(range(batch_start, batch_end)) + + # 병렬 처리로 배치 페이지 렌더링 + with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor: + batch_results = list(executor.map(process_page, current_batch)) + + # 결과 저장 + for result in batch_results: + page_num = result["page_num"] + pages[page_num] = { + "src": result["src"], + "thumb": result["thumb"] + } + + processed_count += 1 + progress = round(processed_count / total_pages * 100) + pdf_cache[pdf_name]["progress"] = progress + + # 중간 저장 + pdf_cache[pdf_name]["pages"] = pages + try: + with open(cache_path, "w") as cache_file: + json.dump({ + "status": "processing", + "progress": pdf_cache[pdf_name]["progress"], + "pages": pdf_cache[pdf_name]["pages"], + "total_pages": total_pages + }, cache_file) + except Exception as e: + logger.error(f"중간 캐시 저장 실패: {e}") + + # 캐싱 완료 + pdf_cache[pdf_name] = { + "status": "completed", + "progress": 100, + "pages": pages, + "total_pages": total_pages + } + + # 최종 캐시 파일 저장 + try: + with open(cache_path, "w") as cache_file: + json.dump(pdf_cache[pdf_name], cache_file) + logger.info(f"PDF {pdf_name} 캐싱 완료, {total_pages}페이지") + except Exception as e: + logger.error(f"최종 캐시 저장 실패: {e}") + + except Exception as e: + import traceback + logger.error(f"PDF 캐싱 오류: {str(e)}\n{traceback.format_exc()}") + if pdf_name in pdf_cache: + pdf_cache[pdf_name]["status"] = "error" + pdf_cache[pdf_name]["error"] = str(e) + +# PDF ID로 PDF 경로 찾기 (개선된 검색 로직) +def get_pdf_path_by_id(pdf_id: str) -> str: + logger.info(f"PDF ID로 파일 조회: {pdf_id}") + + # 1. 메타데이터에서 직접 ID로 검색 + if pdf_id in pdf_metadata: + path = pdf_metadata[pdf_id] + # 파일 존재 확인 + if os.path.exists(path): + return path + + # 파일이 이동했을 수 있으므로 파일명으로 검색 + filename = os.path.basename(path) + + # 영구 저장소에서 검색 + perm_path = PERMANENT_PDF_DIR / filename + if perm_path.exists(): + # 메타데이터 업데이트 + pdf_metadata[pdf_id] = str(perm_path) + save_pdf_metadata() + return str(perm_path) + + # 메인 디렉토리에서 검색 + main_path = PDF_DIR / filename + if main_path.exists(): + # 메타데이터 업데이트 + pdf_metadata[pdf_id] = str(main_path) + save_pdf_metadata() + return str(main_path) + + # 2. 파일명 부분만 추출하여 모든 PDF 파일 검색 + try: + # ID 형식: filename_timestamp_random + # 파일명 부분만 추출 + name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id - # Execute the code - exec(compile(code, tmp_path, 'exec'), globals()) + # 모든 PDF 파일 검색 + for file_path in get_pdf_files() + get_permanent_pdf_files(): + # 파일명이 ID의 시작 부분과 일치하면 + file_basename = os.path.basename(file_path) + if file_basename.startswith(name_part) or file_path.stem.startswith(name_part): + # ID 매핑 업데이트 + pdf_metadata[pdf_id] = str(file_path) + save_pdf_metadata() + return str(file_path) + except Exception as e: + logger.error(f"파일명 검색 중 오류: {e}") + + # 3. 모든 PDF 파일에 대해 메타데이터 확인 + for pid, path in pdf_metadata.items(): + if os.path.exists(path): + file_basename = os.path.basename(path) + # 유사한 파일명을 가진 경우 + if pdf_id in pid or pid in pdf_id: + pdf_metadata[pdf_id] = path + save_pdf_metadata() + return path + + return None + +# 시작 시 모든 PDF 파일 캐싱 +async def init_cache_all_pdfs(): + logger.info("PDF 캐싱 작업 시작") + + # PDF 메타데이터 로드 + load_pdf_metadata() + + # 메인 및 영구 디렉토리에서 PDF 파일 모두 가져오기 + pdf_files = get_pdf_files() + get_permanent_pdf_files() + + # 중복 제거 + unique_pdf_paths = set(str(p) for p in pdf_files) + pdf_files = [pathlib.Path(p) for p in unique_pdf_paths] + + # 파일 기반 메타데이터 업데이트 + for pdf_file in pdf_files: + # ID가 없는 파일에 대해 ID 생성 + found = False + for pid, path in pdf_metadata.items(): + if os.path.basename(path) == pdf_file.name: + found = True + # 경로 업데이트 필요한 경우 + if not os.path.exists(path): + pdf_metadata[pid] = str(pdf_file) + break - # Clean up the temporary file + if not found: + pdf_id = generate_pdf_id(pdf_file.name) + pdf_metadata[pdf_id] = str(pdf_file) + + # 메타데이터 저장 + save_pdf_metadata() + + # 이미 캐시된 PDF 파일 로드 (빠른 시작을 위해 먼저 수행) + for cache_file in CACHE_DIR.glob("*_cache.json"): try: - os.unlink(tmp_path) - except: - pass + pdf_name = cache_file.stem.replace("_cache", "") + with open(cache_file, "r") as f: + cached_data = json.load(f) + if cached_data.get("status") == "completed" and cached_data.get("pages"): + pdf_cache[pdf_name] = cached_data + pdf_cache[pdf_name]["status"] = "completed" + logger.info(f"기존 캐시 로드: {pdf_name}") + except Exception as e: + logger.error(f"캐시 파일 로드 오류: {str(e)}") + + # 캐싱되지 않은 PDF 파일 병렬 처리 + await asyncio.gather(*[asyncio.create_task(cache_pdf(str(pdf_file))) + for pdf_file in pdf_files + if pdf_file.stem not in pdf_cache + or pdf_cache[pdf_file.stem].get("status") != "completed"]) + +# 백그라운드 작업 시작 함수 +@app.on_event("startup") +async def startup_event(): + # PDF 메타데이터 로드 + load_pdf_metadata() + + # 누락된 PDF 파일에 대한 메타데이터 생성 + for pdf_file in get_pdf_files() + get_permanent_pdf_files(): + found = False + for pid, path in pdf_metadata.items(): + if os.path.basename(path) == pdf_file.name: + found = True + # 경로 업데이트 + if not os.path.exists(path): + pdf_metadata[pid] = str(pdf_file) + break + + if not found: + # 새 ID 생성 및 메타데이터에 추가 + pdf_id = generate_pdf_id(pdf_file.name) + pdf_metadata[pdf_id] = str(pdf_file) + + # 변경사항 저장 + save_pdf_metadata() + + # 백그라운��� 태스크로 캐싱 실행 + asyncio.create_task(init_cache_all_pdfs()) + +# API 엔드포인트: PDF 프로젝트 목록 +@app.get("/api/pdf-projects") +async def get_pdf_projects_api(): + return generate_pdf_projects() + +# API 엔드포인트: 영구 저장된 PDF 프로젝트 목록 +@app.get("/api/permanent-pdf-projects") +async def get_permanent_pdf_projects_api(): + pdf_files = get_permanent_pdf_files() + projects_data = [] + + for pdf_file in pdf_files: + # PDF ID 찾기 + pdf_id = None + for pid, path in pdf_metadata.items(): + if os.path.basename(path) == pdf_file.name: + pdf_id = pid + break + + # ID가 없으면 생성 + if not pdf_id: + pdf_id = generate_pdf_id(pdf_file.name) + pdf_metadata[pdf_id] = str(pdf_file) + save_pdf_metadata() + + projects_data.append({ + "path": str(pdf_file), + "name": pdf_file.stem, + "id": pdf_id, + "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" + }) + + return projects_data + +# API 엔드포인트: PDF ID로 정보 가져오기 +@app.get("/api/pdf-info-by-id/{pdf_id}") +async def get_pdf_info_by_id(pdf_id: str): + pdf_path = get_pdf_path_by_id(pdf_id) + if pdf_path: + pdf_file = pathlib.Path(pdf_path) + return { + "path": pdf_path, + "name": pdf_file.stem, + "id": pdf_id, + "exists": True, + "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" + } + return {"exists": False, "error": "PDF를 찾을 수 없습니다"} + +# API 엔드포인트: PDF 썸네일 제공 (최적화) +@app.get("/api/pdf-thumbnail") +async def get_pdf_thumbnail(path: str): + try: + pdf_file = pathlib.Path(path) + pdf_name = pdf_file.stem + + # 캐시에서 썸네일 가져오기 + if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"): + if pdf_cache[pdf_name]["pages"][0].get("thumb"): + return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]} + + # 캐시에 없으면 생성 (더 작고 빠른 썸네일) + import fitz + doc = fitz.open(path) + if doc.page_count > 0: + page = doc[0] + pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # 더 작은 썸네일 + img_data = pix.tobytes("jpeg", 70) # JPEG 압축 사용 + b64_img = base64.b64encode(img_data).decode('utf-8') + + # 백그라운드에서 캐싱 시작 + asyncio.create_task(cache_pdf(path)) + + return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"} + + return {"thumbnail": None} + except Exception as e: + logger.error(f"썸네일 생성 오류: {str(e)}") + return {"error": str(e), "thumbnail": None} + +# API 엔드포인트: 캐시 상태 확인 +@app.get("/api/cache-status") +async def get_cache_status(path: str = None): + if path: + pdf_file = pathlib.Path(path) + pdf_name = pdf_file.stem + if pdf_name in pdf_cache: + return pdf_cache[pdf_name] + return {"status": "not_cached"} + else: + return {name: {"status": info["status"], "progress": info.get("progress", 0)} + for name, info in pdf_cache.items()} + +# API 엔드포인트: PDF에 대한 질의응답 +@app.post("/api/ai/query-pdf/{pdf_id}") +async def api_query_pdf(pdf_id: str, query: Dict[str, str]): + try: + user_query = query.get("query", "") + if not user_query: + return JSONResponse(content={"error": "질문이 제공되지 않았습니다"}, status_code=400) + + # PDF 경로 확인 + pdf_path = get_pdf_path_by_id(pdf_id) + if not pdf_path: + return JSONResponse(content={"error": f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다"}, status_code=404) + + # 질의응답 처리 + result = await query_pdf(pdf_id, user_query) + + if "error" in result: + return JSONResponse(content={"error": result["error"]}, status_code=500) + + return result + except Exception as e: + logger.error(f"질의응답 API 오류: {e}") + return JSONResponse(content={"error": str(e)}, status_code=500) + +# API 엔드포인트: PDF 요약 +@app.get("/api/ai/summarize-pdf/{pdf_id}") +async def api_summarize_pdf(pdf_id: str): + try: + # PDF 경로 확인 + pdf_path = get_pdf_path_by_id(pdf_id) + if not pdf_path: + return JSONResponse(content={"error": f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다"}, status_code=404) + + # 요약 처리 + result = await summarize_pdf(pdf_id) + + if "error" in result: + return JSONResponse(content={"error": result["error"]}, status_code=500) + + return result + except Exception as e: + logger.error(f"PDF 요약 API 오류: {e}") + return JSONResponse(content={"error": str(e)}, status_code=500) + +# API 엔드포인트: 캐시된 PDF 콘텐츠 제공 (점진적 로딩 지원) +@app.get("/api/cached-pdf") +async def get_cached_pdf(path: str, background_tasks: BackgroundTasks): + try: + pdf_file = pathlib.Path(path) + pdf_name = pdf_file.stem + + # 캐시 확인 + if pdf_name in pdf_cache: + status = pdf_cache[pdf_name].get("status", "") + + # 완료된 경우 전체 데이터 반환 + if status == "completed": + return pdf_cache[pdf_name] + + # 처리 중인 경우 현재까지의 페이지 데이터 포함 (점진적 로딩) + elif status == "processing": + progress = pdf_cache[pdf_name].get("progress", 0) + pages = pdf_cache[pdf_name].get("pages", []) + total_pages = pdf_cache[pdf_name].get("total_pages", 0) + + # 일부만 처리된 경우에도 사용 가능한 페이지 제공 + return { + "status": "processing", + "progress": progress, + "pages": pages, + "total_pages": total_pages, + "available_pages": len([p for p in pages if p and p.get("src")]) + } + + # 캐시가 없는 경우 백그라운드에서 캐싱 시작 + background_tasks.add_task(cache_pdf, path) + return {"status": "started", "progress": 0} + + except Exception as e: + logger.error(f"캐시된 PDF 제공 오류: {str(e)}") + return {"error": str(e), "status": "error"} + +# API 엔드포인트: PDF 원본 콘텐츠 제공(캐시가 없는 경우) +@app.get("/api/pdf-content") +async def get_pdf_content(path: str, background_tasks: BackgroundTasks): + try: + # 캐싱 상태 확인 + pdf_file = pathlib.Path(path) + if not pdf_file.exists(): + return JSONResponse(content={"error": f"파일을 찾을 수 없습니다: {path}"}, status_code=404) + + pdf_name = pdf_file.stem + + # 캐시된 경우 리다이렉트 + if pdf_name in pdf_cache and (pdf_cache[pdf_name].get("status") == "completed" + or (pdf_cache[pdf_name].get("status") == "processing" + and pdf_cache[pdf_name].get("progress", 0) > 10)): + return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"}) + + # 파일 읽기 + with open(path, "rb") as pdf_file: + content = pdf_file.read() + + # 파일명 처리 + import urllib.parse + filename = pdf_file.name + encoded_filename = urllib.parse.quote(filename) + + # 백그라운드에서 캐싱 시작 + background_tasks.add_task(cache_pdf, path) + + # 응답 헤더 설정 + headers = { + "Content-Type": "application/pdf", + "Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}" + } + + return Response(content=content, media_type="application/pdf", headers=headers) + except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"PDF 콘텐츠 로드 오류: {str(e)}\n{error_details}") + return JSONResponse(content={"error": str(e)}, status_code=500) + +# PDF 업로드 엔드포인트 - 영구 저장소에 저장 및 메인 화면에 자동 표시 +@app.post("/api/upload-pdf") +async def upload_pdf(file: UploadFile = File(...)): + try: + # 파일 이름 확인 + if not file.filename.lower().endswith('.pdf'): + return JSONResponse( + content={"success": False, "message": "PDF 파일만 업로드 가능합니다"}, + status_code=400 + ) + + # 영구 저장소에 파일 저장 + file_path = PERMANENT_PDF_DIR / file.filename + + # 파일 읽기 및 저장 + content = await file.read() + with open(file_path, "wb") as buffer: + buffer.write(content) + + # 메인 디렉토리에도 자동으로 복사 (자동 표시) + with open(PDF_DIR / file.filename, "wb") as buffer: + buffer.write(content) + + # PDF ID 생성 및 메타데이터 저장 + pdf_id = generate_pdf_id(file.filename) + pdf_metadata[pdf_id] = str(file_path) + save_pdf_metadata() + + # 백그라운드에서 캐싱 시작 + asyncio.create_task(cache_pdf(str(file_path))) + + return JSONResponse( + content={ + "success": True, + "path": str(file_path), + "name": file_path.stem, + "id": pdf_id, + "viewUrl": f"/view/{pdf_id}" + }, + status_code=200 + ) + except Exception as e: + import traceback + error_details = traceback.format_exc() + logger.error(f"PDF 업로드 오류: {str(e)}\n{error_details}") + return JSONResponse( + content={"success": False, "message": str(e)}, + status_code=500 + ) + +# 텍스트 파일을 PDF로 변환하는 함수 +async def convert_text_to_pdf(text_content: str, title: str) -> str: + try: + # 제목에서 유효한 파일명 생성 + import re + safe_title = re.sub(r'[^\w\-_\. ]', '_', title) + if not safe_title: + safe_title = "aibook" + + # 타임스탬프 추가로 고유한 파일명 생성 + timestamp = int(time.time()) + filename = f"{safe_title}_{timestamp}.pdf" + + # 영구 저장소의 파일 경로 + file_path = PERMANENT_PDF_DIR / filename + + # 한글 폰트 등록 - 업로드된 MaruBuri-SemiBold.ttf 사용 + from reportlab.pdfbase import pdfmetrics + from reportlab.pdfbase.ttfonts import TTFont + + # 폰트 경로 설정 (app.py와 같은 디렉토리에 있는 폰트 사용) + font_path = BASE / "MaruBuri-SemiBold.ttf" + + # 폰트 등록 + font_name = "MaruBuri" + if font_path.exists(): + pdfmetrics.registerFont(TTFont(font_name, str(font_path))) + logger.info(f"한글 폰트 등록 성공: {font_path}") + else: + font_name = "Helvetica" + logger.warning(f"한글 폰트 파일을 찾을 수 없습니다: {font_path}. 기본 폰트를 사용합니다.") + + # 임시 PDF 파일 생성 + pdf_buffer = io.BytesIO() + + # 한글 지원을 위한 스타일 설정 + from reportlab.lib.pagesizes import letter + from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer + from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle + from reportlab.lib.enums import TA_CENTER, TA_LEFT + + doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8') + + # 사용자 정의 스타일 생성 + title_style = ParagraphStyle( + name='CustomTitle', + fontName=font_name, + fontSize=18, + leading=22, + alignment=TA_CENTER, + spaceAfter=20 + ) + + normal_style = ParagraphStyle( + name='CustomNormal', + fontName=font_name, + fontSize=12, + leading=15, + alignment=TA_LEFT, + spaceBefore=6, + spaceAfter=6 + ) + + # 내용을 문단으로 분할 + content = [] + + # 제목 추가 + content.append(Paragraph(title, title_style)) + content.append(Spacer(1, 20)) + + # 텍스트를 단락으로 분리하여 추가 + paragraphs = text_content.split('\n\n') + for para in paragraphs: + if para.strip(): + # XML 특수문자 이스케이프 처리 + from xml.sax.saxutils import escape + safe_para = escape(para.replace('\n', '
')) + p = Paragraph(safe_para, normal_style) + content.append(p) + content.append(Spacer(1, 10)) + + # PDF 생성 + doc.build(content) + + # 파일로 저장 + with open(file_path, 'wb') as f: + f.write(pdf_buffer.getvalue()) + + # 메인 디렉토리에도 복사 + with open(PDF_DIR / filename, 'wb') as f: + f.write(pdf_buffer.getvalue()) + + # PDF ID 생성 및 메타데이터 저장 + pdf_id = generate_pdf_id(filename) + pdf_metadata[pdf_id] = str(file_path) + save_pdf_metadata() + + # 백그라운드에서 캐싱 시작 + asyncio.create_task(cache_pdf(str(file_path))) + + return { + "path": str(file_path), + "filename": filename, + "id": pdf_id + } + + except Exception as e: + logger.error(f"텍스트를 PDF로 변환 중 오류: {e}") + raise e + + +# AI를 사용하여 텍스트를 더 구조화된 형식으로 변환 (OpenAI 제거 버전) +async def enhance_text_with_ai(text_content: str, title: str) -> str: + # 원본 텍스트 그대로 반환 (AI 향상 기능 비활성화) + return text_content + + + +# 텍스트 파일을 PDF로 변환하는 엔드포인트 +@app.post("/api/text-to-pdf") +async def text_to_pdf(file: UploadFile = File(...)): + try: + # 지원하는 파일 형식 확인 + filename = file.filename.lower() + if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')): + return JSONResponse( + content={"success": False, "message": "지원하는 파일 형식은 .txt, .docx, .doc입니다."}, + status_code=400 + ) + + # 파일 내용 읽기 + content = await file.read() + + # 파일 타입에 따라 텍스트 추출 + if filename.endswith('.txt'): + # 인코딩 자동 감지 시도 + encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1'] + text_content = None + + for encoding in encodings: + try: + text_content = content.decode(encoding, errors='strict') + logger.info(f"텍스트 파일 인코딩 감지: {encoding}") + break + except UnicodeDecodeError: + continue + if text_content is None: + # 모든 인코딩 시도 실패 시 기본적으로 UTF-8로 시도하고 오류는 대체 문자로 처리 + text_content = content.decode('utf-8', errors='replace') + logger.warning("텍스트 파일 인코딩을 감지할 수 없어 UTF-8으로 시도합니다.") + + elif filename.endswith('.docx') or filename.endswith('.doc'): + # 임시 파일로 저장 + with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: + temp_file.write(content) + temp_path = temp_file.name + + try: + # docx2txt로 텍스트 추출 + text_content = docx2txt.process(temp_path) + finally: + # 임시 파일 삭제 + os.unlink(temp_path) + + # 파일명에서 제목 추출 (확장자 제외) + title = os.path.splitext(filename)[0] + + # AI로 텍스트 내용 향상 + enhanced_text = await enhance_text_with_ai(text_content, title) + + # 텍스트를 PDF로 변환 + pdf_info = await convert_text_to_pdf(enhanced_text, title) + + return JSONResponse( + content={ + "success": True, + "path": pdf_info["path"], + "name": os.path.splitext(pdf_info["filename"])[0], + "id": pdf_info["id"], + "viewUrl": f"/view/{pdf_info['id']}" + }, + status_code=200 + ) except Exception as e: - st.error(f"⚠️ Error loading or executing the application: {str(e)}") import traceback - st.code(traceback.format_exc()) + error_details = traceback.format_exc() + logger.error(f"텍스트를 PDF로 변환 중 오류: {str(e)}\n{error_details}") + return JSONResponse( + content={"success": False, "message": str(e)}, + status_code=500 + ) + +# 관리자 인증 엔드포인트 +@app.post("/api/admin-login") +async def admin_login(password: str = Form(...)): + if password == ADMIN_PASSWORD: + return {"success": True} + return {"success": False, "message": "인증 실패"} + +# 관리자용 PDF 삭제 엔드포인트 +@app.delete("/api/admin/delete-pdf") +async def delete_pdf(path: str): + try: + pdf_file = pathlib.Path(path) + if not pdf_file.exists(): + return {"success": False, "message": "파일을 찾을 수 없습니다"} + + # PDF 파일명 가져오기 + filename = pdf_file.name + + # PDF 파일 삭제 (영구 저장소에서) + pdf_file.unlink() + + # 메인 디렉토리에서도 동일한 파일이 있으면 삭제 (버그 수정) + main_file_path = PDF_DIR / filename + if main_file_path.exists(): + main_file_path.unlink() + + # 관련 캐시 파일 삭제 + pdf_name = pdf_file.stem + cache_path = get_cache_path(pdf_name) + if cache_path.exists(): + cache_path.unlink() + + # 캐시 메모리에서도 제거 + if pdf_name in pdf_cache: + del pdf_cache[pdf_name] + + # 메타데이터에서 해당 파일 ID 제거 + to_remove = [] + for pid, fpath in pdf_metadata.items(): + if os.path.basename(fpath) == filename: + to_remove.append(pid) + + for pid in to_remove: + del pdf_metadata[pid] + + save_pdf_metadata() + + return {"success": True} + except Exception as e: + logger.error(f"PDF 삭제 오류: {str(e)}") + return {"success": False, "message": str(e)} + +# PDF를 메인 디렉토리에 표시 설정 +@app.post("/api/admin/feature-pdf") +async def feature_pdf(path: str): + try: + pdf_file = pathlib.Path(path) + if not pdf_file.exists(): + return {"success": False, "message": "파일을 찾을 수 없습니다"} + + # 메인 디렉토리에 복사 + target_path = PDF_DIR / pdf_file.name + shutil.copy2(pdf_file, target_path) + + return {"success": True} + except Exception as e: + logger.error(f"PDF 표시 설정 오류: {str(e)}") + return {"success": False, "message": str(e)} + +# PDF를 메인 디렉토리에서 제거 (영구 저장소에서는 유지) +@app.delete("/api/admin/unfeature-pdf") +async def unfeature_pdf(path: str): + try: + pdf_name = pathlib.Path(path).name + target_path = PDF_DIR / pdf_name + + if target_path.exists(): + target_path.unlink() + + return {"success": True} + except Exception as e: + logger.error(f"PDF 표시 해제 오류: {str(e)}") + return {"success": False, "message": str(e)} + +# 직접 PDF 뷰어 URL 접근용 라우트 +@app.get("/view/{pdf_id}") +async def view_pdf_by_id(pdf_id: str): + # PDF ID 유효한지 확인 + pdf_path = get_pdf_path_by_id(pdf_id) + + if not pdf_path: + # 일단 모든 PDF 메타데이터를 다시 로드하고 재시도 + load_pdf_metadata() + pdf_path = get_pdf_path_by_id(pdf_id) + + if not pdf_path: + # 모든 PDF 파일을 직접 스캔하여 유사한 이름 찾기 + for file_path in get_pdf_files() + get_permanent_pdf_files(): + name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id + if file_path.stem.startswith(name_part): + pdf_metadata[pdf_id] = str(file_path) + save_pdf_metadata() + pdf_path = str(file_path) + break + + if not pdf_path: + return HTMLResponse( + content=f"

PDF를 찾을 수 없습니다

ID: {pdf_id}

홈으로 돌아가기", + status_code=404 + ) + + # 메인 페이지로 리다이렉트하되, PDF ID 파라미터 추가 + return get_html_content(pdf_id=pdf_id) + +# HTML 파일 읽기 함수 +def get_html_content(pdf_id: str = None): + html_path = BASE / "flipbook_template.html" + content = "" + if html_path.exists(): + with open(html_path, "r", encoding="utf-8") as f: + content = f.read() + else: + content = HTML # 기본 HTML 사용 + + # PDF ID가 제공된 경우, 자동 로드 스크립트 추가 + if pdf_id: + auto_load_script = f""" + + """ + + # body 종료 태그 전에 스크립트 삽입 + content = content.replace("", auto_load_script + "") + + return HTMLResponse(content=content) + +@app.get("/", response_class=HTMLResponse) +async def root(request: Request, pdf_id: Optional[str] = Query(None)): + # PDF ID가 쿼리 파라미터로 제공된 경우 /view/{pdf_id}로 리다이렉트 + if pdf_id: + return RedirectResponse(url=f"/view/{pdf_id}") + return get_html_content() + +# HTML 문자열 (AI 버튼 및 챗봇 UI 추가) +HTML = """ + + + + + FlipBook Space + + + + + + + + + + + + + + + + + + + + + + +
+
+

AI 어시스턴트

+ +
+
+
+ + +
+
+ + +
+ Admin +
+ + + + + +
+
AI FlipBook Maker
+
+ +
+
+ + + + +
+ +
Projects
+
+ +
+ +
+ + + + + + + + + +""" if __name__ == "__main__": - main() \ No newline at end of file + uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT", 7860))) \ No newline at end of file