# --- 임베딩 관련 헬퍼 함수 --- def save_embeddings(base_retriever, file_path): """임베딩 데이터를 압축하여 파일에 저장""" try: # 저장 디렉토리가 없으면 생성 os.makedirs(os.path.dirname(file_path), exist_ok=True) # 타임스탬프 추가 save_data = { 'timestamp': datetime.now().isoformat(), 'retriever': base_retriever } # 압축하여 저장 (용량 줄이기) with gzip.open(file_path, 'wb') as f: pickle.dump(save_data, f) logger.info(f"임베딩 데이터를 {file_path}에 압축하여 저장했습니다.") return True except Exception as e: logger.error(f"임베딩 저장 중 오류 발생: {e}") return False def load_embeddings(file_path, max_age_days=30): """저장된 임베딩 데이터를 파일에서 로드""" try: if not os.path.exists(file_path): logger.info(f"저장된 임베딩 파일({file_path})이 없습니다.") return None # 압축 파일 로드 with gzip.open(file_path, 'rb') as f: data = pickle.load(f) # 타임스탬프 확인 (너무 오래된 데이터는 사용하지 않음) saved_time = datetime.fromisoformat(data['timestamp']) age = (datetime.now() - saved_time).days if age > max_age_days: logger.info(f"저장된 임베딩이 {age}일로 너무 오래되었습니다. 새로 생성합니다.") return None logger.info(f"{file_path}에서 임베딩 데이터를 로드했습니다. (생성일: {saved_time})") return data['retriever'] except Exception as e: logger.error(f"임베딩 로드 중 오류 발생: {e}") return None def init_retriever(): """검색기 객체 초기화 또는 로드""" global base_retriever, retriever # 임베딩 캐시 파일 경로 cache_path = os.path.join(app.config['INDEX_PATH'], "cached_embeddings.gz") # 먼저 저장된 임베딩 데이터 로드 시도 cached_retriever = load_embeddings(cache_path) if cached_retriever: logger.info("캐시된 임베딩 데이터를 성공적으로 로드했습니다.") base_retriever = cached_retriever else: # 캐시된 데이터가 없으면 기존 방식으로 초기화 index_path = app.config['INDEX_PATH'] # VectorRetriever 로드 또는 초기화 if os.path.exists(os.path.join(index_path, "documents.json")): try: logger.info(f"기존 벡터 인덱스를 '{index_path}'에서 로드합니다...") base_retriever = VectorRetriever.load(index_path) logger.info(f"{len(base_retriever.documents) if hasattr(base_retriever, 'documents') else 0}개 문서가 로드되었습니다.") except Exception as e: logger.error(f"인덱스 로드 중 오류 발생: {e}. 새 검색기를 초기화합니다.") base_retriever = VectorRetriever() else: logger.info("기존 인덱스를 찾을 수 없어 새 검색기를 초기화합니다...") base_retriever = VectorRetriever() # 데이터 폴더의 문서 로드 data_path = app.config['DATA_FOLDER'] if (not hasattr(base_retriever, 'documents') or not base_retriever.documents) and os.path.exists(data_path): logger.info(f"{data_path}에서 문서를 로드합니다...") try: docs = DocumentProcessor.load_documents_from_directory( data_path, extensions=[".txt", ".md", ".csv"], recursive=True ) if docs and hasattr(base_retriever, 'add_documents'): logger.info(f"{len(docs)}개 문서를 검색기에 추가합니다...") base_retriever.add_documents(docs) if hasattr(base_retriever, 'save'): logger.info(f"검색기 상태를 '{index_path}'에 저장합니다...") try: base_retriever.save(index_path) logger.info("인덱스 저장 완료") # 새로 생성된 검색기 캐싱 if hasattr(base_retriever, 'documents') and base_retriever.documents: save_embeddings(base_retriever, cache_path) logger.info(f"검색기를 캐시 파일 {cache_path}에 저장 완료") except Exception as e: logger.error(f"인덱스 저장 중 오류 발생: {e}") except Exception as e: logger.error(f"DATA_FOLDER에서 문서 로드 중 오류: {e}") # 재순위화 검색기 초기화 logger.info("재순위화 검색기를 초기화합니다...") try: # 자체 구현된 재순위화 함수 def custom_rerank_fn(query, results): query_terms = set(query.lower().split()) for result in results: if isinstance(result, dict) and "text" in result: text = result["text"].lower() term_freq = sum(1 for term in query_terms if term in text) normalized_score = term_freq / (len(text.split()) + 1) * 10 result["rerank_score"] = result.get("score", 0) * 0.7 + normalized_score * 0.3 elif isinstance(result, dict): result["rerank_score"] = result.get("score", 0) results.sort(key=lambda x: x.get("rerank_score", 0) if isinstance(x, dict) else 0, reverse=True) return results # ReRanker 클래스 사용 retriever = ReRanker( base_retriever=base_retriever, rerank_fn=custom_rerank_fn, rerank_field="text" ) logger.info("재순위화 검색기 초기화 완료") except Exception as e: logger.error(f"재순위화 검색기 초기화 실패: {e}") retriever = base_retriever # 실패 시 기본 검색기 사용 return retriever def background_init(): """백그라운드에서 검색기 초기화 수행""" global app_ready, retriever, base_retriever # 즉시 앱 사용 가능 상태로 설정 app_ready = True logger.info("앱을 즉시 사용 가능 상태로 설정 (app_ready=True)") try: # 기본 검색기 초기화 (보험) if base_retriever is None: base_retriever = MockComponent() if hasattr(base_retriever, 'documents'): base_retriever.documents = [] # 임시 retriever 설정 if retriever is None: retriever = MockComponent() if not hasattr(retriever, 'search'): retriever.search = lambda query, **kwargs: [] # 캐시된 임베딩 로드 시도 cache_path = os.path.join(app.config['INDEX_PATH'], "cached_embeddings.gz") cached_retriever = load_embeddings(cache_path) if cached_retriever: # 캐시된 데이터가 있으면 바로 사용 base_retriever = cached_retriever # 간단한 재순위화 함수 def simple_rerank(query, results): if results: for result in results: if isinstance(result, dict): result["rerank_score"] = result.get("score", 0) results.sort(key=lambda x: x.get("rerank_score", 0) if isinstance(x, dict) else 0, reverse=True) return results # 재순위화 검색기 초기화 retriever = ReRanker( base_retriever=base_retriever, rerank_fn=simple_rerank, rerank_field="text" ) logger.info("캐시된 임베딩으로 검색기 초기화 완료 (빠른 시작)") else: # 캐시된 데이터가 없으면 전체 초기화 진행 logger.info("캐시된 임베딩이 없어 전체 초기화 시작") retriever = init_retriever() logger.info("전체 초기화 완료") logger.info("앱 초기화 완료 (모든 컴포넌트 준비됨)") except Exception as e: logger.error(f"앱 백그라운드 초기화 중 심각한 오류 발생: {e}", exc_info=True) # 초기화 실패 시 기본 객체 생성 if base_retriever is None: base_retriever = MockComponent() if hasattr(base_retriever, 'documents'): base_retriever.documents = [] if retriever is None: retriever = MockComponent() if not hasattr(retriever, 'search'): retriever.search = lambda query, **kwargs: [] logger.warning("초기화 중 오류가 있지만 앱은 계속 사용 가능합니다.") # 백그라운드 스레드 시작 init_thread = threading.Thread(target=background_init) init_thread.daemon = True init_thread.start()