diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,3908 +1,35 @@ -from fastapi import FastAPI, BackgroundTasks, UploadFile, File, Form, Request, Query -from fastapi.responses import HTMLResponse, JSONResponse, Response, RedirectResponse -from fastapi.staticfiles import StaticFiles -import pathlib, os, uvicorn, base64, json, shutil, uuid, time, urllib.parse -from typing import Dict, List, Any, Optional -import asyncio -import logging -import threading -import concurrent.futures -from openai import OpenAI -import fitz # PyMuPDF -import tempfile -from reportlab.lib.pagesizes import letter -from reportlab.pdfgen import canvas -from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer -from reportlab.lib.styles import getSampleStyleSheet -import io -import docx2txt +import os +import sys +import streamlit as st +from tempfile import NamedTemporaryFile -# 로깅 설정 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -BASE = pathlib.Path(__file__).parent -app = FastAPI() -app.mount("/static", StaticFiles(directory=BASE), name="static") - -# PDF 디렉토리 설정 -PDF_DIR = BASE / "pdf" -if not PDF_DIR.exists(): - PDF_DIR.mkdir(parents=True) - -# 영구 PDF 디렉토리 설정 (Hugging Face 영구 디스크) -PERMANENT_PDF_DIR = pathlib.Path("/data/pdfs") if os.path.exists("/data") else BASE / "permanent_pdfs" -if not PERMANENT_PDF_DIR.exists(): - PERMANENT_PDF_DIR.mkdir(parents=True) - -# 캐시 디렉토리 설정 -CACHE_DIR = BASE / "cache" -if not CACHE_DIR.exists(): - CACHE_DIR.mkdir(parents=True) - -# PDF 메타데이터 디렉토리 및 파일 설정 -METADATA_DIR = pathlib.Path("/data/metadata") if os.path.exists("/data") else BASE / "metadata" -if not METADATA_DIR.exists(): - METADATA_DIR.mkdir(parents=True) -PDF_METADATA_FILE = METADATA_DIR / "pdf_metadata.json" - -# 임베딩 캐시 디렉토리 설정 -EMBEDDING_DIR = pathlib.Path("/data/embeddings") if os.path.exists("/data") else BASE / "embeddings" -if not EMBEDDING_DIR.exists(): - EMBEDDING_DIR.mkdir(parents=True) - -# 관리자 비밀번호 -ADMIN_PASSWORD = os.getenv("PASSWORD", "admin") # 환경 변수에서 가져오기, 기본값은 테스트용 - -# OpenAI API 키 설정 -OPENAI_API_KEY = os.getenv("LLM_API", "") -# API 키가 없거나 비어있을 때 플래그 설정 -HAS_VALID_API_KEY = bool(OPENAI_API_KEY and OPENAI_API_KEY.strip()) - -if HAS_VALID_API_KEY: - try: - openai_client = OpenAI(api_key=OPENAI_API_KEY, timeout=30.0) - logger.info("OpenAI 클라이언트 초기화 성공") - except Exception as e: - logger.error(f"OpenAI 클라이언트 초기화 실패: {e}") - HAS_VALID_API_KEY = False -else: - logger.warning("유효한 OpenAI API 키가 없습니다. AI 기능이 제한됩니다.") - openai_client = None - -# 전역 캐시 객체 -pdf_cache: Dict[str, Dict[str, Any]] = {} -# 캐싱 락 -cache_locks = {} -# PDF 메타데이터 (ID to 경로 매핑) -pdf_metadata: Dict[str, str] = {} -# PDF 임베딩 캐시 -pdf_embeddings: Dict[str, Dict[str, Any]] = {} - -# PDF 메타데이터 로드 -def load_pdf_metadata(): - global pdf_metadata - if PDF_METADATA_FILE.exists(): - try: - with open(PDF_METADATA_FILE, "r") as f: - pdf_metadata = json.load(f) - logger.info(f"PDF 메타데이터 로드 완료: {len(pdf_metadata)} 항목") - except Exception as e: - logger.error(f"메타데이터 로드 오류: {e}") - pdf_metadata = {} - else: - pdf_metadata = {} - -# PDF 메타데이터 저장 -def save_pdf_metadata(): - try: - with open(PDF_METADATA_FILE, "w") as f: - json.dump(pdf_metadata, f) - except Exception as e: - logger.error(f"메타데이터 저장 오류: {e}") - -# PDF ID 생성 (파일명 + 타임스탬프 기반) - 더 단순하고 안전한 방식으로 변경 -def generate_pdf_id(filename: str) -> str: - # 파일명에서 확장자 제거 - base_name = os.path.splitext(filename)[0] - # 안전한 문자열로 변환 (URL 인코딩 대신 직접 변환) - import re - safe_name = re.sub(r'[^\w\-_]', '_', base_name.replace(" ", "_")) - # 타임스탬프 추가로 고유성 보장 - timestamp = int(time.time()) - # 짧은 임의 문자열 추가 - random_suffix = uuid.uuid4().hex[:6] - return f"{safe_name}_{timestamp}_{random_suffix}" - -# PDF 파일 목록 가져오기 (메인 디렉토리용) -def get_pdf_files(): - pdf_files = [] - if PDF_DIR.exists(): - pdf_files = [f for f in PDF_DIR.glob("*.pdf")] - return pdf_files - -# 영구 저장소의 PDF 파일 목록 가져오기 -def get_permanent_pdf_files(): - pdf_files = [] - if PERMANENT_PDF_DIR.exists(): - pdf_files = [f for f in PERMANENT_PDF_DIR.glob("*.pdf")] - return pdf_files - -# PDF 썸네일 생성 및 프로젝트 데이터 준비 -def generate_pdf_projects(): - projects_data = [] - - # 메인 디렉토리와 영구 저장소의 파일들 가져오기 - pdf_files = get_pdf_files() - permanent_pdf_files = get_permanent_pdf_files() - - # 모든 파일 합치기 (파일명 기준으로 중복 제거) - unique_files = {} - - # 먼저 메인 디렉토리의 파일들 추가 - for file in pdf_files: - unique_files[file.name] = file - - # 영구 저장소의 파일들 추가 (동일 파일명이 있으면 영구 저장소 파일 우선) - for file in permanent_pdf_files: - unique_files[file.name] = file - - # 중복 제거된 파일들로 프로젝트 데이터 생성 - for pdf_file in unique_files.values(): - # 해당 파일의 PDF ID 찾기 - pdf_id = None - for pid, path in pdf_metadata.items(): - if os.path.basename(path) == pdf_file.name: - pdf_id = pid - break - - # ID가 없으면 새로 생성하고 메타데이터에 추가 - if not pdf_id: - pdf_id = generate_pdf_id(pdf_file.name) - pdf_metadata[pdf_id] = str(pdf_file) - save_pdf_metadata() - - projects_data.append({ - "path": str(pdf_file), - "name": pdf_file.stem, - "id": pdf_id, - "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" - }) - - return projects_data - -# 캐시 파일 경로 생성 -def get_cache_path(pdf_name: str): - return CACHE_DIR / f"{pdf_name}_cache.json" - -# 임베딩 캐시 파일 경로 생성 -def get_embedding_path(pdf_id: str): - return EMBEDDING_DIR / f"{pdf_id}_embedding.json" - -# PDF 텍스트 추출 함수 -def extract_pdf_text(pdf_path: str) -> List[Dict[str, Any]]: - try: - doc = fitz.open(pdf_path) - chunks = [] - - for page_num in range(len(doc)): - page = doc[page_num] - text = page.get_text() - - # 페이지 텍스트가 있는 경우만 추가 - if text.strip(): - chunks.append({ - "page": page_num + 1, - "text": text, - "chunk_id": f"page_{page_num + 1}" - }) - - return chunks - except Exception as e: - logger.error(f"PDF 텍스트 추출 오류: {e}") - return [] - -# PDF ID로 임베딩 생성 또는 가져오기 -async def get_pdf_embedding(pdf_id: str) -> Dict[str, Any]: - try: - # 임베딩 캐시 확인 - embedding_path = get_embedding_path(pdf_id) - if embedding_path.exists(): - try: - with open(embedding_path, "r", encoding="utf-8") as f: - return json.load(f) - except Exception as e: - logger.error(f"임베딩 캐시 로드 오류: {e}") - - # PDF 경로 찾기 - pdf_path = get_pdf_path_by_id(pdf_id) - if not pdf_path: - raise ValueError(f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다") - - # 텍스트 추출 - chunks = extract_pdf_text(pdf_path) - if not chunks: - raise ValueError(f"PDF에서 텍스트를 추출할 수 없습니다: {pdf_path}") - - # 임베딩 저장 및 반환 - embedding_data = { - "pdf_id": pdf_id, - "pdf_path": pdf_path, - "chunks": chunks, - "created_at": time.time() - } - - # 임베딩 캐시 저장 - with open(embedding_path, "w", encoding="utf-8") as f: - json.dump(embedding_data, f, ensure_ascii=False) - - return embedding_data - - except Exception as e: - logger.error(f"PDF 임베딩 생성 오류: {e}") - return {"error": str(e), "pdf_id": pdf_id} - -# PDF 내용 기반 질의응답 -# PDF 내용 기반 질의응답 함수 개선 -async def query_pdf(pdf_id: str, query: str) -> Dict[str, Any]: - try: - # API 키가 없거나 유효하지 않은 경우 - if not HAS_VALID_API_KEY or not openai_client: - return { - "error": "OpenAI API 키가 설정되지 않았습니다.", - "answer": "죄송합니다. 현재 AI 기능이 비활성화되어 있어 질문에 답변할 수 없습니다. 시스템 관리자에게 문의하세요." - } - - # 임베딩 데이터 가져오기 - embedding_data = await get_pdf_embedding(pdf_id) - if "error" in embedding_data: - return {"error": embedding_data["error"]} - - # 청크 텍스트 모으기 (임시로 간단하게 전체 텍스트 사용) - all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) - - # 컨텍스트 크기를 고려하여 텍스트가 너무 길면 앞부분만 사용 - max_context_length = 60000 # 토큰 수가 아닌 문자 수 기준 (대략적인 제한) - if len(all_text) > max_context_length: - all_text = all_text[:max_context_length] + "...(이하 생략)" - - # 시스템 프롬프트 준비 - system_prompt = """ - 당신은 PDF 내용을 기반으로 질문에 답변하는 도우미입니다. 제공된 PDF 컨텍스트 정보만을 사용하여 답변하세요. - 컨텍스트에 관련 정보가 없는 경우, '제공된 PDF에서 해당 정보를 찾을 수 없습니다'라고 솔직히 답하세요. - 답변은 명확하고 간결하게 작성하고, 관련 페이지 번호를 인용하세요. 반드시 공손한 말투로 친절하게 응답하세요. - """ - - # gpt-4.1-mini 모델 사용 - try: - # 타임아웃 및 재시도 설정 개선 - for attempt in range(3): # 최대 3번 재시도 - try: - response = openai_client.chat.completions.create( - model="gpt-4.1-mini", - messages=[ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": f"다음 PDF 내용을 참고하여 질문에 답변해주세요.\n\nPDF 내용:\n{all_text}\n\n질문: {query}"} - ], - temperature=0.7, - max_tokens=2048, - timeout=30.0 # 30초 타임아웃 - ) - - answer = response.choices[0].message.content - return { - "answer": answer, - "pdf_id": pdf_id, - "query": query - } - except Exception as api_error: - logger.error(f"OpenAI API 호출 오류 (시도 {attempt+1}/3): {api_error}") - if attempt == 2: # 마지막 시도에서도 실패 - raise api_error - await asyncio.sleep(1 * (attempt + 1)) # 재시도 간 지연 시간 증가 - - # 여기까지 도달하지 않아야 함 - raise Exception("API 호출 재시도 모두 실패") - except Exception as api_error: - logger.error(f"OpenAI API 호출 최종 오류: {api_error}") - # 오류 유형에 따른 더 명확한 메시지 제공 - error_message = str(api_error) - if "Connection" in error_message: - return {"error": "OpenAI 서버와 연결할 수 없습니다. 인터넷 연결을 확인하세요."} - elif "Unauthorized" in error_message or "Authentication" in error_message: - return {"error": "API 키가 유효하지 않습니다."} - elif "Rate limit" in error_message: - return {"error": "API 호출 한도를 초과했습니다. 잠시 후 다시 시도하세요."} - else: - return {"error": f"AI 응답 생성 중 오류가 발생했습니다: {error_message}"} - - except Exception as e: - logger.error(f"질의응답 처리 오류: {e}") - return {"error": str(e)} - -# PDF 요약 생성 -# PDF 요약 생성 함수 개선 -async def summarize_pdf(pdf_id: str) -> Dict[str, Any]: - try: - # API 키가 없거나 유효하지 않은 경우 - if not HAS_VALID_API_KEY or not openai_client: - return { - "error": "OpenAI API 키가 설정되지 않았습니다. 'LLM_API' 환경 변수를 확인하세요.", - "summary": "API 키가 없어 요약을 생성할 수 없습니다. 시스템 관리자에게 문의하세요." - } - - # 임베딩 데이터 가져오기 - embedding_data = await get_pdf_embedding(pdf_id) - if "error" in embedding_data: - return {"error": embedding_data["error"], "summary": "PDF에서 텍스트를 추출할 수 없습니다."} - - # 청크 텍스트 모으기 (제한된 길이) - all_text = "\n\n".join([f"Page {chunk['page']}: {chunk['text']}" for chunk in embedding_data["chunks"]]) - - # 컨텍스트 크기를 고려하여 텍스트가 너무 길면 앞부분만 사용 - max_context_length = 60000 # 토큰 수가 아닌 문자 수 기준 (대략적인 제한) - if len(all_text) > max_context_length: - all_text = all_text[:max_context_length] + "...(이하 생략)" - - # OpenAI API 호출 - try: - # 타임아웃 및 재시도 설정 개선 - for attempt in range(3): # 최대 3번 재시도 - try: - response = openai_client.chat.completions.create( - model="gpt-4.1-mini", - messages=[ - {"role": "system", "content": "다음 PDF 내용을 간결하게 요약해주세요. 핵심 주제와 주요 포인트를 포함한 요약을 500자 이내로 작성해주세요."}, - {"role": "user", "content": f"PDF 내용:\n{all_text}"} - ], - temperature=0.7, - max_tokens=1024, - timeout=30.0 # 30초 타임아웃 - ) - - summary = response.choices[0].message.content - return { - "summary": summary, - "pdf_id": pdf_id - } - except Exception as api_error: - logger.error(f"OpenAI API 호출 오류 (시도 {attempt+1}/3): {api_error}") - if attempt == 2: # 마지막 시도에서도 실패 - raise api_error - await asyncio.sleep(1 * (attempt + 1)) # 재시도 간 지연 시간 증가 - - # 여기까지 도달하지 않아야 함 - raise Exception("API 호출 재시도 모두 실패") - except Exception as api_error: - logger.error(f"OpenAI API 호출 최종 오류: {api_error}") - # 오류 유형에 따른 더 명확한 메시지 제공 - error_message = str(api_error) - if "Connection" in error_message: - return {"error": "OpenAI 서버와 연결할 수 없습니다. 인터넷 연결을 확인하세요.", "pdf_id": pdf_id} - elif "Unauthorized" in error_message or "Authentication" in error_message: - return {"error": "API 키가 유효하지 않습니다.", "pdf_id": pdf_id} - elif "Rate limit" in error_message: - return {"error": "API 호출 한도를 초과했습니다. 잠시 후 다시 시도하세요.", "pdf_id": pdf_id} - else: - return {"error": f"AI 요약 생성 중 오류가 발생했습니다: {error_message}", "pdf_id": pdf_id} - - except Exception as e: - logger.error(f"PDF 요약 생성 오류: {e}") - return { - "error": str(e), - "summary": "PDF 요약 중 오류가 발생했습니다. PDF 페이지 수가 너무 많거나 형식이 지원되지 않을 수 있습니다." - } - - -# 최적화된 PDF 페이지 캐싱 함수 -async def cache_pdf(pdf_path: str): +def main(): try: - import fitz # PyMuPDF - - pdf_file = pathlib.Path(pdf_path) - pdf_name = pdf_file.stem - - # 락 생성 - 동일한 PDF에 대해 동시 캐싱 방지 - if pdf_name not in cache_locks: - cache_locks[pdf_name] = threading.Lock() + # Get the code from secrets + code = os.environ.get("MAIN_CODE") - # 이미 캐싱 중이거나 캐싱 완료된 PDF는 건너뛰기 - if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: - logger.info(f"PDF {pdf_name} 이미 캐싱 완료 또는 진행 중") + if not code: + st.error("⚠️ The application code wasn't found in secrets. Please add the MAIN_CODE secret.") return - with cache_locks[pdf_name]: - # 이중 체크 - 락 획득 후 다시 확인 - if pdf_name in pdf_cache and pdf_cache[pdf_name].get("status") in ["processing", "completed"]: - return - - # 캐시 상태 업데이트 - pdf_cache[pdf_name] = {"status": "processing", "progress": 0, "pages": []} - - # 캐시 파일이 이미 존재하는지 확인 - cache_path = get_cache_path(pdf_name) - if cache_path.exists(): - try: - with open(cache_path, "r") as cache_file: - cached_data = json.load(cache_file) - if cached_data.get("status") == "completed" and cached_data.get("pages"): - pdf_cache[pdf_name] = cached_data - pdf_cache[pdf_name]["status"] = "completed" - logger.info(f"캐시 파일에서 {pdf_name} 로드 완료") - return - except Exception as e: - logger.error(f"캐시 파일 로드 실패: {e}") - - # PDF 파일 열기 - doc = fitz.open(pdf_path) - total_pages = doc.page_count - - # 미리 썸네일만 먼저 생성 (빠른 UI 로딩용) - if total_pages > 0: - # 첫 페이지 썸네일 생성 - page = doc[0] - pix_thumb = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # 더 작은 썸네일 - thumb_data = pix_thumb.tobytes("png") - b64_thumb = base64.b64encode(thumb_data).decode('utf-8') - thumb_src = f"data:image/png;base64,{b64_thumb}" - - # 썸네일 페이지만 먼저 캐시 - pdf_cache[pdf_name]["pages"] = [{"thumb": thumb_src, "src": ""}] - pdf_cache[pdf_name]["progress"] = 1 - pdf_cache[pdf_name]["total_pages"] = total_pages - - # 이미지 해상도 및 압축 품질 설정 (성능 최적화) - scale_factor = 1.0 # 기본 해상도 (낮출수록 로딩 빠름) - jpeg_quality = 80 # JPEG 품질 (낮출수록 용량 작아짐) - - # 페이지 처리 작업자 함수 (병렬 처리용) - def process_page(page_num): - try: - page = doc[page_num] - - # 이미지로 변환 시 매트릭스 스케일링 적용 (성능 최적화) - pix = page.get_pixmap(matrix=fitz.Matrix(scale_factor, scale_factor)) - - # JPEG 형식으로 인코딩 (PNG보다 크기 작음) - img_data = pix.tobytes("jpeg", jpeg_quality) - b64_img = base64.b64encode(img_data).decode('utf-8') - img_src = f"data:image/jpeg;base64,{b64_img}" - - # 썸네일 (첫 페이지가 아니면 빈 문자열) - thumb_src = "" if page_num > 0 else pdf_cache[pdf_name]["pages"][0]["thumb"] - - return { - "page_num": page_num, - "src": img_src, - "thumb": thumb_src - } - except Exception as e: - logger.error(f"페이지 {page_num} 처리 오류: {e}") - return { - "page_num": page_num, - "src": "", - "thumb": "", - "error": str(e) - } - - # 병렬 처리로 모든 페이지 처리 - pages = [None] * total_pages - processed_count = 0 - - # 페이지 배치 처리 (메모리 관리) - batch_size = 5 # 한 번에 처리할 페이지 수 - - for batch_start in range(0, total_pages, batch_size): - batch_end = min(batch_start + batch_size, total_pages) - current_batch = list(range(batch_start, batch_end)) - - # 병렬 처리로 배치 페이지 렌더링 - with concurrent.futures.ThreadPoolExecutor(max_workers=min(5, batch_size)) as executor: - batch_results = list(executor.map(process_page, current_batch)) - - # 결과 저장 - for result in batch_results: - page_num = result["page_num"] - pages[page_num] = { - "src": result["src"], - "thumb": result["thumb"] - } - - processed_count += 1 - progress = round(processed_count / total_pages * 100) - pdf_cache[pdf_name]["progress"] = progress - - # 중간 저장 - pdf_cache[pdf_name]["pages"] = pages - try: - with open(cache_path, "w") as cache_file: - json.dump({ - "status": "processing", - "progress": pdf_cache[pdf_name]["progress"], - "pages": pdf_cache[pdf_name]["pages"], - "total_pages": total_pages - }, cache_file) - except Exception as e: - logger.error(f"중간 캐시 저장 실패: {e}") - - # 캐싱 완료 - pdf_cache[pdf_name] = { - "status": "completed", - "progress": 100, - "pages": pages, - "total_pages": total_pages - } - - # 최종 캐시 파일 저장 - try: - with open(cache_path, "w") as cache_file: - json.dump(pdf_cache[pdf_name], cache_file) - logger.info(f"PDF {pdf_name} 캐싱 완료, {total_pages}페이지") - except Exception as e: - logger.error(f"최종 캐시 저장 실패: {e}") - - except Exception as e: - import traceback - logger.error(f"PDF 캐싱 오류: {str(e)}\n{traceback.format_exc()}") - if pdf_name in pdf_cache: - pdf_cache[pdf_name]["status"] = "error" - pdf_cache[pdf_name]["error"] = str(e) - -# PDF ID로 PDF 경로 찾기 (개선된 검색 로직) -def get_pdf_path_by_id(pdf_id: str) -> str: - logger.info(f"PDF ID로 파일 조회: {pdf_id}") - - # 1. 메타데이터에서 직접 ID로 검색 - if pdf_id in pdf_metadata: - path = pdf_metadata[pdf_id] - # 파일 존재 확인 - if os.path.exists(path): - return path - - # 파일이 이동했을 수 있으므로 파일명으로 검색 - filename = os.path.basename(path) - - # 영구 저장소에서 검색 - perm_path = PERMANENT_PDF_DIR / filename - if perm_path.exists(): - # 메타데이터 ���데이트 - pdf_metadata[pdf_id] = str(perm_path) - save_pdf_metadata() - return str(perm_path) - - # 메인 디렉토리에서 검색 - main_path = PDF_DIR / filename - if main_path.exists(): - # 메타데이터 업데이트 - pdf_metadata[pdf_id] = str(main_path) - save_pdf_metadata() - return str(main_path) - - # 2. 파일명 부분만 추출하여 모든 PDF 파일 검색 - try: - # ID 형식: filename_timestamp_random - # 파일명 부분만 추출 - name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id + # Create a temporary Python file + with NamedTemporaryFile(suffix='.py', delete=False, mode='w') as tmp: + tmp.write(code) + tmp_path = tmp.name - # 모든 PDF 파일 검색 - for file_path in get_pdf_files() + get_permanent_pdf_files(): - # 파일명이 ID의 시작 부분과 일치하면 - file_basename = os.path.basename(file_path) - if file_basename.startswith(name_part) or file_path.stem.startswith(name_part): - # ID 매핑 업데이트 - pdf_metadata[pdf_id] = str(file_path) - save_pdf_metadata() - return str(file_path) - except Exception as e: - logger.error(f"파일명 검색 중 오류: {e}") - - # 3. 모든 PDF 파일에 대해 메타데이터 확인 - for pid, path in pdf_metadata.items(): - if os.path.exists(path): - file_basename = os.path.basename(path) - # 유사한 파일명을 가진 경우 - if pdf_id in pid or pid in pdf_id: - pdf_metadata[pdf_id] = path - save_pdf_metadata() - return path - - return None - -# 시작 시 모든 PDF 파일 캐싱 -async def init_cache_all_pdfs(): - logger.info("PDF 캐싱 작업 시작") - - # PDF 메타데이터 로드 - load_pdf_metadata() - - # 메인 및 영구 디렉토리에서 PDF 파일 모두 가져오기 - pdf_files = get_pdf_files() + get_permanent_pdf_files() - - # 중복 제거 - unique_pdf_paths = set(str(p) for p in pdf_files) - pdf_files = [pathlib.Path(p) for p in unique_pdf_paths] - - # 파일 기반 메타데이터 업데이트 - for pdf_file in pdf_files: - # ID가 없는 파일에 대해 ID 생성 - found = False - for pid, path in pdf_metadata.items(): - if os.path.basename(path) == pdf_file.name: - found = True - # 경로 업데이트 필요한 경우 - if not os.path.exists(path): - pdf_metadata[pid] = str(pdf_file) - break + # Execute the code + exec(compile(code, tmp_path, 'exec'), globals()) - if not found: - pdf_id = generate_pdf_id(pdf_file.name) - pdf_metadata[pdf_id] = str(pdf_file) - - # 메타데이터 저장 - save_pdf_metadata() - - # 이미 캐시된 PDF 파일 로드 (빠른 시작을 위해 먼저 수행) - for cache_file in CACHE_DIR.glob("*_cache.json"): + # Clean up the temporary file try: - pdf_name = cache_file.stem.replace("_cache", "") - with open(cache_file, "r") as f: - cached_data = json.load(f) - if cached_data.get("status") == "completed" and cached_data.get("pages"): - pdf_cache[pdf_name] = cached_data - pdf_cache[pdf_name]["status"] = "completed" - logger.info(f"기존 캐시 로드: {pdf_name}") - except Exception as e: - logger.error(f"캐시 파일 로드 오류: {str(e)}") - - # 캐싱되지 않은 PDF 파일 병렬 처리 - await asyncio.gather(*[asyncio.create_task(cache_pdf(str(pdf_file))) - for pdf_file in pdf_files - if pdf_file.stem not in pdf_cache - or pdf_cache[pdf_file.stem].get("status") != "completed"]) - -# 백그라운드 작업 시작 함수 -@app.on_event("startup") -async def startup_event(): - # PDF 메타데이터 로드 - load_pdf_metadata() - - # 누락된 PDF 파일에 대한 메타데이터 생성 - for pdf_file in get_pdf_files() + get_permanent_pdf_files(): - found = False - for pid, path in pdf_metadata.items(): - if os.path.basename(path) == pdf_file.name: - found = True - # 경로 업데이트 - if not os.path.exists(path): - pdf_metadata[pid] = str(pdf_file) - break - - if not found: - # 새 ID 생성 및 메타데이터에 추가 - pdf_id = generate_pdf_id(pdf_file.name) - pdf_metadata[pdf_id] = str(pdf_file) - - # 변경사항 저장 - save_pdf_metadata() - - # 백그라운드 태스크로 캐싱 실행 - asyncio.create_task(init_cache_all_pdfs()) - -# API 엔드포인트: PDF 프로젝트 목록 -@app.get("/api/pdf-projects") -async def get_pdf_projects_api(): - return generate_pdf_projects() - -# API 엔드포인트: 영구 저장된 PDF 프로젝트 목록 -@app.get("/api/permanent-pdf-projects") -async def get_permanent_pdf_projects_api(): - pdf_files = get_permanent_pdf_files() - projects_data = [] - - for pdf_file in pdf_files: - # PDF ID 찾기 - pdf_id = None - for pid, path in pdf_metadata.items(): - if os.path.basename(path) == pdf_file.name: - pdf_id = pid - break - - # ID가 없으면 생성 - if not pdf_id: - pdf_id = generate_pdf_id(pdf_file.name) - pdf_metadata[pdf_id] = str(pdf_file) - save_pdf_metadata() - - projects_data.append({ - "path": str(pdf_file), - "name": pdf_file.stem, - "id": pdf_id, - "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" - }) - - return projects_data - -# API 엔드포인트: PDF ID로 정보 가져오기 -@app.get("/api/pdf-info-by-id/{pdf_id}") -async def get_pdf_info_by_id(pdf_id: str): - pdf_path = get_pdf_path_by_id(pdf_id) - if pdf_path: - pdf_file = pathlib.Path(pdf_path) - return { - "path": pdf_path, - "name": pdf_file.stem, - "id": pdf_id, - "exists": True, - "cached": pdf_file.stem in pdf_cache and pdf_cache[pdf_file.stem].get("status") == "completed" - } - return {"exists": False, "error": "PDF를 찾을 수 없습니다"} - -# API 엔드포인트: PDF 썸네일 제공 (최적화) -@app.get("/api/pdf-thumbnail") -async def get_pdf_thumbnail(path: str): - try: - pdf_file = pathlib.Path(path) - pdf_name = pdf_file.stem - - # 캐시에서 썸네일 가져오기 - if pdf_name in pdf_cache and pdf_cache[pdf_name].get("pages"): - if pdf_cache[pdf_name]["pages"][0].get("thumb"): - return {"thumbnail": pdf_cache[pdf_name]["pages"][0]["thumb"]} - - # 캐시에 없으면 생성 (더 작고 빠른 썸네일) - import fitz - doc = fitz.open(path) - if doc.page_count > 0: - page = doc[0] - pix = page.get_pixmap(matrix=fitz.Matrix(0.2, 0.2)) # 더 작은 썸네일 - img_data = pix.tobytes("jpeg", 70) # JPEG 압축 사용 - b64_img = base64.b64encode(img_data).decode('utf-8') - - # 백그라운드에서 캐싱 시작 - asyncio.create_task(cache_pdf(path)) - - return {"thumbnail": f"data:image/jpeg;base64,{b64_img}"} - - return {"thumbnail": None} - except Exception as e: - logger.error(f"썸네일 생성 오류: {str(e)}") - return {"error": str(e), "thumbnail": None} - -# API 엔드포인트: 캐시 상태 확인 -@app.get("/api/cache-status") -async def get_cache_status(path: str = None): - if path: - pdf_file = pathlib.Path(path) - pdf_name = pdf_file.stem - if pdf_name in pdf_cache: - return pdf_cache[pdf_name] - return {"status": "not_cached"} - else: - return {name: {"status": info["status"], "progress": info.get("progress", 0)} - for name, info in pdf_cache.items()} - -# API 엔드포인트: PDF에 대한 질의응답 -@app.post("/api/ai/query-pdf/{pdf_id}") -async def api_query_pdf(pdf_id: str, query: Dict[str, str]): - try: - user_query = query.get("query", "") - if not user_query: - return JSONResponse(content={"error": "질문이 제공되지 않았습니다"}, status_code=400) - - # PDF 경로 확인 - pdf_path = get_pdf_path_by_id(pdf_id) - if not pdf_path: - return JSONResponse(content={"error": f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다"}, status_code=404) - - # 질의응답 처리 - result = await query_pdf(pdf_id, user_query) - - if "error" in result: - return JSONResponse(content={"error": result["error"]}, status_code=500) - - return result - except Exception as e: - logger.error(f"질의응답 API 오류: {e}") - return JSONResponse(content={"error": str(e)}, status_code=500) - -# API 엔드포인트: PDF 요약 -@app.get("/api/ai/summarize-pdf/{pdf_id}") -async def api_summarize_pdf(pdf_id: str): - try: - # PDF 경로 확인 - pdf_path = get_pdf_path_by_id(pdf_id) - if not pdf_path: - return JSONResponse(content={"error": f"PDF ID {pdf_id}에 해당하는 파일을 찾을 수 없습니다"}, status_code=404) - - # 요약 처리 - result = await summarize_pdf(pdf_id) - - if "error" in result: - return JSONResponse(content={"error": result["error"]}, status_code=500) - - return result - except Exception as e: - logger.error(f"PDF 요약 API 오류: {e}") - return JSONResponse(content={"error": str(e)}, status_code=500) - -# API 엔드포인트: 캐시된 PDF 콘텐츠 제공 (점진적 로딩 지원) -@app.get("/api/cached-pdf") -async def get_cached_pdf(path: str, background_tasks: BackgroundTasks): - try: - pdf_file = pathlib.Path(path) - pdf_name = pdf_file.stem - - # 캐시 확인 - if pdf_name in pdf_cache: - status = pdf_cache[pdf_name].get("status", "") - - # 완료된 경우 전체 데이터 반환 - if status == "completed": - return pdf_cache[pdf_name] - - # 처리 중인 경우 현재까지의 페이지 데이터 포함 (점진적 로딩) - elif status == "processing": - progress = pdf_cache[pdf_name].get("progress", 0) - pages = pdf_cache[pdf_name].get("pages", []) - total_pages = pdf_cache[pdf_name].get("total_pages", 0) - - # 일부만 처리된 경우에도 사용 가능한 페이지 제공 - return { - "status": "processing", - "progress": progress, - "pages": pages, - "total_pages": total_pages, - "available_pages": len([p for p in pages if p and p.get("src")]) - } - - # 캐시가 없는 경우 백그라운드에서 캐싱 시작 - background_tasks.add_task(cache_pdf, path) - return {"status": "started", "progress": 0} - - except Exception as e: - logger.error(f"캐시된 PDF 제공 오류: {str(e)}") - return {"error": str(e), "status": "error"} - -# API 엔드포인트: PDF 원본 콘텐츠 제공(캐시가 없는 경우) -@app.get("/api/pdf-content") -async def get_pdf_content(path: str, background_tasks: BackgroundTasks): - try: - # 캐싱 상태 확인 - pdf_file = pathlib.Path(path) - if not pdf_file.exists(): - return JSONResponse(content={"error": f"파일을 찾을 수 없습니다: {path}"}, status_code=404) - - pdf_name = pdf_file.stem - - # 캐시된 경우 리다이렉트 - if pdf_name in pdf_cache and (pdf_cache[pdf_name].get("status") == "completed" - or (pdf_cache[pdf_name].get("status") == "processing" - and pdf_cache[pdf_name].get("progress", 0) > 10)): - return JSONResponse(content={"redirect": f"/api/cached-pdf?path={path}"}) - - # 파일 읽기 - with open(path, "rb") as pdf_file: - content = pdf_file.read() - - # 파일명 처리 - import urllib.parse - filename = pdf_file.name - encoded_filename = urllib.parse.quote(filename) - - # 백그라운드에서 캐싱 시작 - background_tasks.add_task(cache_pdf, path) - - # 응답 헤더 설정 - headers = { - "Content-Type": "application/pdf", - "Content-Disposition": f"inline; filename=\"{encoded_filename}\"; filename*=UTF-8''{encoded_filename}" - } - - return Response(content=content, media_type="application/pdf", headers=headers) - except Exception as e: - import traceback - error_details = traceback.format_exc() - logger.error(f"PDF 콘텐츠 로드 오류: {str(e)}\n{error_details}") - return JSONResponse(content={"error": str(e)}, status_code=500) - -# PDF 업로드 엔드포인트 - 영구 저장소에 저장 및 메인 화면에 자동 표시 -@app.post("/api/upload-pdf") -async def upload_pdf(file: UploadFile = File(...)): - try: - # 파일 이름 확인 - if not file.filename.lower().endswith('.pdf'): - return JSONResponse( - content={"success": False, "message": "PDF 파일만 업로드 가능합니다"}, - status_code=400 - ) - - # 영구 저장소에 파일 저장 - file_path = PERMANENT_PDF_DIR / file.filename - - # 파일 읽기 및 저장 - content = await file.read() - with open(file_path, "wb") as buffer: - buffer.write(content) - - # 메인 디렉토리에도 자동으로 복사 (자동 표시) - with open(PDF_DIR / file.filename, "wb") as buffer: - buffer.write(content) - - # PDF ID 생성 및 메타데이터 저장 - pdf_id = generate_pdf_id(file.filename) - pdf_metadata[pdf_id] = str(file_path) - save_pdf_metadata() - - # 백그라운드에서 캐싱 시작 - asyncio.create_task(cache_pdf(str(file_path))) - - return JSONResponse( - content={ - "success": True, - "path": str(file_path), - "name": file_path.stem, - "id": pdf_id, - "viewUrl": f"/view/{pdf_id}" - }, - status_code=200 - ) - except Exception as e: - import traceback - error_details = traceback.format_exc() - logger.error(f"PDF 업로드 오류: {str(e)}\n{error_details}") - return JSONResponse( - content={"success": False, "message": str(e)}, - status_code=500 - ) - -# 텍스트 파일을 PDF로 변환하는 함수 -async def convert_text_to_pdf(text_content: str, title: str) -> str: - try: - # 제목에서 유효한 파일명 생성 - import re - safe_title = re.sub(r'[^\w\-_\. ]', '_', title) - if not safe_title: - safe_title = "aibook" - - # 타임스탬프 추가로 고유한 파일명 생성 - timestamp = int(time.time()) - filename = f"{safe_title}_{timestamp}.pdf" - - # 영구 저장소의 파일 경로 - file_path = PERMANENT_PDF_DIR / filename - - # 한글 폰트 등록 - 업로드된 MaruBuri-SemiBold.ttf 사용 - from reportlab.pdfbase import pdfmetrics - from reportlab.pdfbase.ttfonts import TTFont - - # 폰트 경로 설정 (app.py와 같은 디렉토리에 있는 폰트 사용) - font_path = BASE / "MaruBuri-SemiBold.ttf" - - # 폰트 등록 - font_name = "MaruBuri" - if font_path.exists(): - pdfmetrics.registerFont(TTFont(font_name, str(font_path))) - logger.info(f"한글 폰트 등록 성공: {font_path}") - else: - font_name = "Helvetica" - logger.warning(f"한글 폰트 파일을 찾을 수 없습니다: {font_path}. 기본 폰트를 사용합니다.") - - # 임시 PDF 파일 생성 - pdf_buffer = io.BytesIO() - - # 한글 지원을 위한 스타일 설정 - from reportlab.lib.pagesizes import letter - from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer - from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle - from reportlab.lib.enums import TA_CENTER, TA_LEFT - - doc = SimpleDocTemplate(pdf_buffer, pagesize=letter, encoding='utf-8') - - # 사용자 정의 스타일 생성 - title_style = ParagraphStyle( - name='CustomTitle', - fontName=font_name, - fontSize=18, - leading=22, - alignment=TA_CENTER, - spaceAfter=20 - ) - - normal_style = ParagraphStyle( - name='CustomNormal', - fontName=font_name, - fontSize=12, - leading=15, - alignment=TA_LEFT, - spaceBefore=6, - spaceAfter=6 - ) - - # 내용을 문단으로 분할 - content = [] - - # 제목 추가 - content.append(Paragraph(title, title_style)) - content.append(Spacer(1, 20)) - - # 텍스트를 단락으로 분리하여 추가 - paragraphs = text_content.split('\n\n') - for para in paragraphs: - if para.strip(): - # XML 특수문자 이스케이프 처리 - from xml.sax.saxutils import escape - safe_para = escape(para.replace('\n', '
')) - p = Paragraph(safe_para, normal_style) - content.append(p) - content.append(Spacer(1, 10)) - - # PDF 생성 - doc.build(content) - - # 파일로 저장 - with open(file_path, 'wb') as f: - f.write(pdf_buffer.getvalue()) - - # 메인 디렉토리에도 복사 - with open(PDF_DIR / filename, 'wb') as f: - f.write(pdf_buffer.getvalue()) - - # PDF ID 생성 및 메타데이터 저장 - pdf_id = generate_pdf_id(filename) - pdf_metadata[pdf_id] = str(file_path) - save_pdf_metadata() - - # 백그라운드에서 캐싱 시작 - asyncio.create_task(cache_pdf(str(file_path))) - - return { - "path": str(file_path), - "filename": filename, - "id": pdf_id - } - - except Exception as e: - logger.error(f"텍스트를 PDF로 변환 중 오류: {e}") - raise e - - -# AI를 사용하여 텍스트를 더 구조화된 형식으로 변환 (OpenAI 제거 버전) -async def enhance_text_with_ai(text_content: str, title: str) -> str: - # 원본 텍스트 그대로 반환 (AI 향상 기능 비활성화) - return text_content - - - -# 텍스트 파일을 PDF로 변환하는 엔드포인트 -@app.post("/api/text-to-pdf") -async def text_to_pdf(file: UploadFile = File(...)): - try: - # 지원하는 파일 형식 확인 - filename = file.filename.lower() - if not (filename.endswith('.txt') or filename.endswith('.docx') or filename.endswith('.doc')): - return JSONResponse( - content={"success": False, "message": "지원하는 파일 형식은 .txt, .docx, .doc입���다."}, - status_code=400 - ) - - # 파일 내용 읽기 - content = await file.read() - - # 파일 타입에 따라 텍스트 추출 - if filename.endswith('.txt'): - # 인코딩 자동 감지 시도 - encodings = ['utf-8', 'euc-kr', 'cp949', 'latin1'] - text_content = None - - for encoding in encodings: - try: - text_content = content.decode(encoding, errors='strict') - logger.info(f"텍스트 파일 인코딩 감지: {encoding}") - break - except UnicodeDecodeError: - continue + os.unlink(tmp_path) + except: + pass - if text_content is None: - # 모든 인코딩 시도 실패 시 기본적으로 UTF-8로 시도하고 오류는 대체 문자로 처리 - text_content = content.decode('utf-8', errors='replace') - logger.warning("텍스트 파일 인코딩을 감지할 수 없어 UTF-8으로 시도합니다.") - - elif filename.endswith('.docx') or filename.endswith('.doc'): - # 임시 파일로 저장 - with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: - temp_file.write(content) - temp_path = temp_file.name - - try: - # docx2txt로 텍스트 추출 - text_content = docx2txt.process(temp_path) - finally: - # 임시 파일 삭제 - os.unlink(temp_path) - - # 파일명에서 제목 추출 (확장자 제외) - title = os.path.splitext(filename)[0] - - # AI로 텍스트 내용 향상 - enhanced_text = await enhance_text_with_ai(text_content, title) - - # 텍스트를 PDF로 변환 - pdf_info = await convert_text_to_pdf(enhanced_text, title) - - return JSONResponse( - content={ - "success": True, - "path": pdf_info["path"], - "name": os.path.splitext(pdf_info["filename"])[0], - "id": pdf_info["id"], - "viewUrl": f"/view/{pdf_info['id']}" - }, - status_code=200 - ) except Exception as e: + st.error(f"⚠️ Error loading or executing the application: {str(e)}") import traceback - error_details = traceback.format_exc() - logger.error(f"텍스트를 PDF로 변환 중 오류: {str(e)}\n{error_details}") - return JSONResponse( - content={"success": False, "message": str(e)}, - status_code=500 - ) - -# 관리자 인증 엔드포인트 -@app.post("/api/admin-login") -async def admin_login(password: str = Form(...)): - if password == ADMIN_PASSWORD: - return {"success": True} - return {"success": False, "message": "인증 실패"} - -# 관리자용 PDF 삭제 엔드포인트 -@app.delete("/api/admin/delete-pdf") -async def delete_pdf(path: str): - try: - pdf_file = pathlib.Path(path) - if not pdf_file.exists(): - return {"success": False, "message": "파일을 찾을 수 없습니다"} - - # PDF 파일명 가져오기 - filename = pdf_file.name - - # PDF 파일 삭제 (영구 저장소에서) - pdf_file.unlink() - - # 메인 디렉토리에서도 동일한 파일이 있으면 삭제 (버그 수정) - main_file_path = PDF_DIR / filename - if main_file_path.exists(): - main_file_path.unlink() - - # 관련 캐시 파일 삭제 - pdf_name = pdf_file.stem - cache_path = get_cache_path(pdf_name) - if cache_path.exists(): - cache_path.unlink() - - # 캐시 메모리에서도 제거 - if pdf_name in pdf_cache: - del pdf_cache[pdf_name] - - # 메타데이터에서 해당 파일 ID 제거 - to_remove = [] - for pid, fpath in pdf_metadata.items(): - if os.path.basename(fpath) == filename: - to_remove.append(pid) - - for pid in to_remove: - del pdf_metadata[pid] - - save_pdf_metadata() - - return {"success": True} - except Exception as e: - logger.error(f"PDF 삭제 오류: {str(e)}") - return {"success": False, "message": str(e)} - -# PDF를 메인 디렉토리에 표시 설정 -@app.post("/api/admin/feature-pdf") -async def feature_pdf(path: str): - try: - pdf_file = pathlib.Path(path) - if not pdf_file.exists(): - return {"success": False, "message": "파일을 찾을 수 없습니다"} - - # 메인 디렉토리에 복사 - target_path = PDF_DIR / pdf_file.name - shutil.copy2(pdf_file, target_path) - - return {"success": True} - except Exception as e: - logger.error(f"PDF 표시 설정 오류: {str(e)}") - return {"success": False, "message": str(e)} - -# PDF를 메인 디렉토리에서 제거 (영구 저장소에서는 유지) -@app.delete("/api/admin/unfeature-pdf") -async def unfeature_pdf(path: str): - try: - pdf_name = pathlib.Path(path).name - target_path = PDF_DIR / pdf_name - - if target_path.exists(): - target_path.unlink() - - return {"success": True} - except Exception as e: - logger.error(f"PDF 표시 해제 오류: {str(e)}") - return {"success": False, "message": str(e)} - -# 직접 PDF 뷰어 URL 접근용 라우트 -@app.get("/view/{pdf_id}") -async def view_pdf_by_id(pdf_id: str): - # PDF ID 유효한지 확인 - pdf_path = get_pdf_path_by_id(pdf_id) - - if not pdf_path: - # 일단 모든 PDF 메타데이터를 다시 로드하고 재시도 - load_pdf_metadata() - pdf_path = get_pdf_path_by_id(pdf_id) - - if not pdf_path: - # 모든 PDF 파일을 직접 스캔하여 유사한 이름 찾기 - for file_path in get_pdf_files() + get_permanent_pdf_files(): - name_part = pdf_id.split('_')[0] if '_' in pdf_id else pdf_id - if file_path.stem.startswith(name_part): - pdf_metadata[pdf_id] = str(file_path) - save_pdf_metadata() - pdf_path = str(file_path) - break - - if not pdf_path: - return HTMLResponse( - content=f"

PDF를 찾을 수 없습니다

ID: {pdf_id}

홈으로 돌아가기", - status_code=404 - ) - - # 메인 페이지로 리다이렉트하되, PDF ID 파라미터 추가 - return get_html_content(pdf_id=pdf_id) - -# HTML 파일 읽기 함수 -def get_html_content(pdf_id: str = None): - html_path = BASE / "flipbook_template.html" - content = "" - if html_path.exists(): - with open(html_path, "r", encoding="utf-8") as f: - content = f.read() - else: - content = HTML # 기본 HTML 사용 - - # PDF ID가 제공된 경우, 자동 로드 스크립트 추가 - if pdf_id: - auto_load_script = f""" - - """ - - # body 종료 태그 전에 스크립트 삽입 - content = content.replace("", auto_load_script + "") - - return HTMLResponse(content=content) - -@app.get("/", response_class=HTMLResponse) -async def root(request: Request, pdf_id: Optional[str] = Query(None)): - # PDF ID가 쿼리 파라미터로 제공된 경우 /view/{pdf_id}로 리다이렉트 - if pdf_id: - return RedirectResponse(url=f"/view/{pdf_id}") - return get_html_content() - -# HTML 문자열 (AI 버튼 및 챗봇 UI 추가) -HTML = """ - - - - - FlipBook Space - - - - - - - - - - - - - - - - - - - - - - -
-
-

AI 어시스턴트

- -
-
-
- - -
-
- - -
- Admin -
- - - - - -
-
AI FlipBook Maker
-
- -
-
- - - - -
- -
Projects
-
- -
- -
- - - - - - - - - -""" + st.code(traceback.format_exc()) if __name__ == "__main__": - uvicorn.run("app:app", host="0.0.0.0", port=int(os.getenv("PORT", 7860))) \ No newline at end of file + main() \ No newline at end of file