""" 음성인식 기능이 추가된 RAG 챗봇 앱 """ import os import time import tempfile from typing import List, Dict, Tuple, Any, Optional import hashlib import pickle import json # 기존 임포트 from config import PDF_DIRECTORY, CHUNK_SIZE, CHUNK_OVERLAP, LLM_MODEL from optimized_document_processor import OptimizedDocumentProcessor from vector_store import VectorStore from langchain.schema import Document # 클로바 STT 모듈 임포트 from clova_stt import ClovaSTT # 안전한 임포트 try: from rag_chain import RAGChain RAG_CHAIN_AVAILABLE = True except ImportError: print("RAG 체인 모듈을 로드할 수 없습니다.") RAG_CHAIN_AVAILABLE = False class VoiceRAGChatApp: """ 음성인식 기능이 추가된 RAG 챗봇 애플리케이션 """ def __init__(self): """ 음성인식 RAG 챗봇 애플리케이션 초기화 """ # 데이터 디렉토리 정의 self.pdf_directory = PDF_DIRECTORY self.cache_directory = "cached_data" self.index_file = os.path.join(self.cache_directory, "file_index.json") self.chunks_dir = os.path.join(self.cache_directory, "chunks") self.vector_index_dir = os.path.join(self.cache_directory, "vector_index") # 디렉토리 생성 os.makedirs(self.pdf_directory, exist_ok=True) os.makedirs(self.cache_directory, exist_ok=True) os.makedirs(self.chunks_dir, exist_ok=True) os.makedirs(self.vector_index_dir, exist_ok=True) print(f"PDF 문서 디렉토리: '{self.pdf_directory}'") print(f"캐시 디렉토리: '{self.cache_directory}'") # 컴포넌트 초기화 self.document_processor = OptimizedDocumentProcessor( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) # 벡터 저장소 초기화 self.vector_store = VectorStore(use_milvus=False) # 문서 인덱스 로드 self.file_index = self._load_file_index() # 기본 변수 초기화 self.documents = [] self.processed_files = [] self.is_initialized = False # 클로바 STT 클라이언트 초기화 self.stt_client = ClovaSTT() print("음성인식(STT) 기능이 초기화되었습니다.") # 시작 시 자동으로 문서 로드 및 처리 print("문서 자동 로드 및 처리 시작...") self.auto_process_documents() def _load_file_index(self) -> Dict[str, Dict[str, Any]]: """ 파일 인덱스 로드 Returns: 파일 경로 -> 메타데이터 매핑 """ if os.path.exists(self.index_file): try: with open(self.index_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"인덱스 파일 로드 실패: {e}") return {} return {} def _save_file_index(self) -> None: """ 파일 인덱스 저장 """ with open(self.index_file, 'w', encoding='utf-8') as f: json.dump(self.file_index, f, ensure_ascii=False, indent=2) def _calculate_file_hash(self, file_path: str) -> str: """ 파일 해시 계산 Args: file_path: 파일 경로 Returns: MD5 해시값 """ hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read(65536) while len(buf) > 0: hasher.update(buf) buf = f.read(65536) return hasher.hexdigest() def _is_file_processed(self, file_path: str) -> bool: """ 파일이 이미 처리되었고 변경되지 않았는지 확인 Args: file_path: 파일 경로 Returns: 처리 여부 """ if file_path not in self.file_index: return False # 현재 해시값 계산 current_hash = self._calculate_file_hash(file_path) # 저장된 해시값과 비교 if self.file_index[file_path]['hash'] != current_hash: print(f"파일 변경 감지: {file_path}") return False # 청크 파일 존재 확인 chunks_path = self.file_index[file_path]['chunks_path'] if not os.path.exists(chunks_path): return False return True def _get_chunks_path(self, file_hash: str) -> str: """ 청크 파일 경로 생성 Args: file_hash: 파일 해시값 Returns: 청크 파일 경로 """ return os.path.join(self.chunks_dir, f"{file_hash}.pkl") def _save_chunks(self, file_path: str, chunks: List[Document]) -> None: """ 청크 데이터 저장 Args: file_path: 원본 파일 경로 chunks: 문서 청크 리스트 """ # 해시 계산 file_hash = self._calculate_file_hash(file_path) # 청크 파일 경로 chunks_path = self._get_chunks_path(file_hash) # 청크 데이터 저장 with open(chunks_path, 'wb') as f: pickle.dump(chunks, f) # 인덱스 업데이트 self.file_index[file_path] = { 'hash': file_hash, 'chunks_path': chunks_path, 'last_processed': time.time(), 'chunks_count': len(chunks) } # 인덱스 저장 self._save_file_index() print(f"청크 저장 완료: {file_path} ({len(chunks)}개 청크)") def _load_chunks(self, file_path: str) -> List[Document]: """ 저장된 청크 데이터 로드 Args: file_path: 파일 경로 Returns: 문서 청크 리스트 """ chunks_path = self.file_index[file_path]['chunks_path'] with open(chunks_path, 'rb') as f: chunks = pickle.load(f) print(f"청크 로드 완료: {file_path} ({len(chunks)}개 청크)") return chunks def _process_pdf_file(self, file_path: str) -> List[Document]: """ PDF 파일 처리 - docling 실패 시 PyPDFLoader 사용 Args: file_path: 처리할 PDF 파일 경로 Returns: 처리된 문서 청크 리스트 """ try: print(f"docling으로 처리 시도: {file_path}") # docling 사용 시도 try: # 10초 타임아웃 설정 (옵션) import signal def timeout_handler(signum, frame): raise TimeoutError("docling 처리 시간 초과") # 리눅스/맥에서만 작동 (윈도우에서는 무시됨) try: signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(60) # 60초 타임아웃 except: pass # docling으로 처리 시도 chunks = self.document_processor.process_pdf(file_path, use_docling=True) # 타임아웃 취소 try: signal.alarm(0) except: pass return chunks except Exception as e: # docling 오류 확인 error_str = str(e) if "Invalid code point" in error_str or "RuntimeError" in error_str: print(f"docling 처리 오류 (코드 포인트 문제): {error_str}") print("PyPDFLoader로 대체합니다.") else: print(f"docling 처리 오류: {error_str}") print("PyPDFLoader로 대체합니다.") # PyPDFLoader로 대체 try: return self.document_processor.process_pdf(file_path, use_docling=False) except Exception as inner_e: print(f"PyPDFLoader 처리 오류: {inner_e}") raise # 두 방법 모두 실패하면 예외 발생 except Exception as e: print(f"PDF 처리 중 심각한 오류: {e}") # 빈 청크라도 반환하여 전체 처리가 중단되지 않도록 함 return [] def auto_process_documents(self) -> str: """ documents 폴더의 PDF 파일 자동 처리 Returns: 처리 결과 메시지 """ try: start_time = time.time() # PDF 파일 목록 수집 pdf_files = [] for filename in os.listdir(self.pdf_directory): if filename.lower().endswith('.pdf'): pdf_files.append(os.path.join(self.pdf_directory, filename)) if not pdf_files: return f"'{self.pdf_directory}' 폴더에 PDF 파일이 없습니다." print(f"발견된 PDF 파일: {len(pdf_files)}개") # 폴더 내 PDF 파일 처리 new_files = [] updated_files = [] cached_files = [] failed_files = [] all_chunks = [] for file_path in pdf_files: if self._is_file_processed(file_path): # 캐시에서 청크 로드 chunks = self._load_chunks(file_path) all_chunks.extend(chunks) cached_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: # 새 파일 또는 변경된 파일 처리 print(f"처리 중: {file_path}") try: # 개선된 PDF 처리 메서드 사용 chunks = self._process_pdf_file(file_path) if chunks: # 청크가 있는 경우에만 저장 # 청크 저장 self._save_chunks(file_path, chunks) all_chunks.extend(chunks) if file_path in self.file_index: updated_files.append(file_path) else: new_files.append(file_path) self.processed_files.append(os.path.basename(file_path)) else: print(f"'{file_path}' 처리 실패: 추출된 청크 없음") failed_files.append(file_path) except Exception as e: print(f"'{file_path}' 처리 중 오류: {e}") failed_files.append(file_path) # 모든 청크 저장 self.documents = all_chunks processing_time = time.time() - start_time print(f"문서 처리 완료: {len(all_chunks)}개 청크, {processing_time:.2f}초") # 벡터 인덱스 저장 경로 확인 if os.path.exists(self.vector_index_dir) and any(os.listdir(self.vector_index_dir)): # 기존 벡터 인덱스 로드 try: print("저장된 벡터 인덱스 로드 중...") vector_store_loaded = self.vector_store.load_local(self.vector_index_dir) # 인덱스 로드 성공 확인 if self.vector_store.vector_store is not None: # 새 문서나 변경된 문서가 있으면 인덱스 업데이트 if new_files or updated_files: print("벡터 인덱스 업데이트 중...") self.vector_store.add_documents(self.documents) print("벡터 인덱스 로드 완료") else: print("벡터 인덱스를 로드했으나 유효하지 않음, 새로 생성합니다.") self.vector_store.create_or_load(self.documents) except Exception as e: print(f"벡터 인덱스 로드 실패, 새로 생성합니다: {e}") # 오류 상세 정보 출력 import traceback traceback.print_exc() # 새 벡터 인덱스 생성 self.vector_store.create_or_load(self.documents) else: # 새 벡터 인덱스 생성 print("새 벡터 인덱스 생성 중...") self.vector_store.create_or_load(self.documents) # 벡터 인덱스 저장 if self.vector_store and self.vector_store.vector_store is not None: try: print(f"벡터 인덱스 저장 중: {self.vector_index_dir}") save_result = self.vector_store.save_local(self.vector_index_dir) print(f"벡터 인덱스 저장 완료: {self.vector_index_dir}") except Exception as e: print(f"벡터 인덱스 저장 실패: {e}") # 오류 상세 정보 출력 import traceback traceback.print_exc() else: print("벡터 인덱스가 초기화되지 않아 저장하지 않습니다.") # RAG 체인 초기화 if RAG_CHAIN_AVAILABLE: self.rag_chain = RAGChain(self.vector_store) self.is_initialized = True total_time = time.time() - start_time status_message = ( f"문서 처리 완료!\n" f"- 처리된 파일: {len(self.processed_files)}개\n" f"- 캐시된 파일: {len(cached_files)}개\n" f"- 새 파일: {len(new_files)}개\n" f"- 업데이트된 파일: {len(updated_files)}개\n" f"- 실패한 파일: {len(failed_files)}개\n" f"- 총 청크 수: {len(self.documents)}개\n" f"- 처리 시간: {total_time:.2f}초\n" f"이제 질문할 준비가 되었습니다!" ) print(status_message) return status_message else: return "RAG 체인을 초기화할 수 없습니다. 필요한 라이브러리가 설치되어 있는지 확인하세요." except Exception as e: error_message = f"문서 처리 중 오류 발생: {str(e)}" print(error_message) import traceback traceback.print_exc() return error_message def reset_cache(self) -> str: """ 캐시 초기화 Returns: 결과 메시지 """ try: # 청크 파일 삭제 for filename in os.listdir(self.chunks_dir): file_path = os.path.join(self.chunks_dir, filename) if os.path.isfile(file_path): os.remove(file_path) # 인덱스 초기화 self.file_index = {} self._save_file_index() # 벡터 인덱스 삭제 for filename in os.listdir(self.vector_index_dir): file_path = os.path.join(self.vector_index_dir, filename) if os.path.isfile(file_path): os.remove(file_path) self.documents = [] self.processed_files = [] self.is_initialized = False return "캐시가 초기화되었습니다. 다음 실행 시 모든 문서가 다시 처리됩니다." except Exception as e: return f"캐시 초기화 중 오류 발생: {str(e)}" def process_query(self, query: str, chat_history: List[Tuple[str, str]]) -> Tuple[str, List[Tuple[str, str]]]: """ 사용자 쿼리 처리 Args: query: 사용자 질문 chat_history: 대화 기록 Returns: 응답 및 업데이트된 대화 기록 """ if not query: # 비어있는 쿼리 처리 return "", chat_history if not self.is_initialized: response = "문서 로드가 초기화되지 않았습니다. 자동 로드를 시도합니다." chat_history.append((query, response)) # 자동 로드 시도 try: self.auto_process_documents() if not self.is_initialized: response = "문서를 로드할 수 없습니다. 'documents' 폴더에 PDF 파일이 있는지 확인하세요." chat_history.append((query, response)) return "", chat_history except Exception as e: response = f"문서 로드 중 오류 발생: {str(e)}" chat_history.append((query, response)) return "", chat_history try: # RAG 체인 실행 및 응답 생성 start_time = time.time() response = self.rag_chain.run(query) end_time = time.time() query_time = end_time - start_time print(f"쿼리 처리 시간: {query_time:.2f}초") chat_history.append((query, response)) return "", chat_history except Exception as e: error_msg = f"오류 발생: {str(e)}" chat_history.append((query, error_msg)) return "", chat_history def process_voice_query(self, audio, chat_history: List[Tuple[str, str]]) -> Tuple[str, List[Tuple[str, str]]]: """ 음성 쿼리 처리 Args: audio: 녹음된 오디오 데이터 chat_history: 대화 기록 Returns: 응답 및 업데이트된 대화 기록 """ if audio is None: return "", chat_history try: # 임시 파일에 오디오 저장 with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as temp_file: temp_path = temp_file.name temp_file.write(audio) print(f"[STT] 임시 오디오 파일 생성: {temp_path}") # STT 실행 result = self.stt_client.recognize_file(temp_path) # 임시 파일 삭제 try: os.unlink(temp_path) print("[STT] 임시 오디오 파일 삭제됨") except Exception as e: print(f"[STT] 임시 파일 삭제 실패: {e}") # STT 결과 처리 if "error" in result: error_msg = f"음성인식 오류: {result.get('error')}" print(f"[STT] {error_msg}") chat_history.append(("음성 메시지", error_msg)) return "", chat_history # 인식된 텍스트 추출 recognized_text = result.get("text", "") if not recognized_text: error_msg = "음성을 인식할 수 없습니다. 다시 시도해주세요." print("[STT] 인식된 텍스트 없음") chat_history.append(("음성 메시지", error_msg)) return "", chat_history print(f"[STT] 인식된 텍스트: {recognized_text}") # 인식된 텍스트로 쿼리 처리 (음성 메시지 접두어 추가) return self.process_query(f"🎤 {recognized_text}", chat_history) except Exception as e: error_msg = f"음성 처리 중 오류 발생: {str(e)}" print(f"[STT] {error_msg}") chat_history.append(("음성 메시지", error_msg)) return "", chat_history def launch_app(self) -> None: """ 음성인식 기능이 추가된 Gradio 앱 실행 """ import gradio as gr with gr.Blocks(title="음성인식 기능이 추가된 PDF 문서 기반 RAG 챗봇") as app: gr.Markdown("# 음성인식 기능이 추가된 PDF 문서 기반 RAG 챗봇") gr.Markdown(f"* 사용 중인 LLM 모델: **{LLM_MODEL}**") gr.Markdown(f"* PDF 문서 폴더: **{self.pdf_directory}**") gr.Markdown("* 네이버 클로바 음성인식 API 통합") with gr.Row(): with gr.Column(scale=1): # 문서 상태 섹션 status_box = gr.Textbox( label="문서 처리 상태", value=f"처리된 문서 ({len(self.processed_files)}개): {', '.join(self.processed_files)}", lines=5, interactive=False ) # 캐시 관리 버튼 refresh_button = gr.Button("문서 새로 읽기", variant="primary") reset_button = gr.Button("캐시 초기화", variant="stop") # 처리된 파일 정보 with gr.Accordion("캐시 세부 정보", open=False): file_info = "" for file_path, info in self.file_index.items(): file_info += f"- {os.path.basename(file_path)}: {info['chunks_count']}개 청크\n" cache_info = gr.Textbox( label="캐시된 파일 정보", value=file_info or "캐시된 파일이 없습니다.", lines=5, interactive=False ) with gr.Column(scale=2): # 채팅 인터페이스 chatbot = gr.Chatbot( label="대화 내용", bubble_full_width=False, height=500, show_copy_button=True ) with gr.Tabs() as input_tabs: # 텍스트 입력 탭 with gr.Tab("텍스트 입력"): # 텍스트 입력과 전송 버튼을 수평으로 배치 with gr.Row(): query_box = gr.Textbox( label="질문", placeholder="처리된 문서 내용에 대해 질문하세요...", lines=2, scale=4 ) submit_btn = gr.Button("전송", variant="primary", scale=1) # 음성 입력 탭 with gr.Tab("음성 입력"): audio_input = gr.Audio( label="마이크 입력", sources=["microphone"], type="bytes", format="wav" ) voice_submit_btn = gr.Button("음성 질문 전송", variant="primary") clear_chat_button = gr.Button("대화 초기화") # 이벤트 핸들러 설정 refresh_button.click( fn=self.auto_process_documents, inputs=[], outputs=[status_box] ) reset_button.click( fn=lambda: (self.reset_cache(), self.auto_process_documents()), inputs=[], outputs=[status_box] ) # 텍스트 전송 버튼 클릭 이벤트 submit_btn.click( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) # 엔터키 입력 이벤트 query_box.submit( fn=self.process_query, inputs=[query_box, chatbot], outputs=[query_box, chatbot] ) # 음성 전송 버튼 클릭 이벤트 voice_submit_btn.click( fn=self.process_voice_query, inputs=[audio_input, chatbot], outputs=[audio_input, chatbot] ) # 대화 초기화 버튼 clear_chat_button.click( fn=lambda: [], outputs=[chatbot] ) # 앱 실행 app.launch(share=False) if __name__ == "__main__": app = VoiceRAGChatApp() app.launch_app()