RAG6_AgenticAI / app /app_routes.py
jeongsoo's picture
init
d93e680
raw
history blame
28.1 kB
"""
RAG 검색 챗봇 μ›Ή μ• ν”Œλ¦¬μΌ€μ΄μ…˜ - API 라우트 μ •μ˜
"""
import os
import json
import logging
import tempfile
import requests
from flask import request, jsonify, render_template, send_from_directory, session, redirect, url_for
from datetime import datetime
from werkzeug.utils import secure_filename
# 둜거 κ°€μ Έμ˜€κΈ°
logger = logging.getLogger(__name__)
def register_routes(app, login_required, llm_interface, retriever, stt_client, DocumentProcessor, base_retriever, app_ready, ADMIN_USERNAME, ADMIN_PASSWORD, DEVICE_SERVER_URL):
"""Flask μ• ν”Œλ¦¬μΌ€μ΄μ…˜μ— κΈ°λ³Έ 라우트 등둝"""
# 헬퍼 ν•¨μˆ˜
def allowed_audio_file(filename):
"""파일이 ν—ˆμš©λœ μ˜€λ””μ˜€ ν™•μž₯자λ₯Ό κ°€μ§€λŠ”μ§€ 확인"""
ALLOWED_AUDIO_EXTENSIONS = {'mp3', 'wav', 'ogg', 'm4a'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_AUDIO_EXTENSIONS
def allowed_doc_file(filename):
"""파일이 ν—ˆμš©λœ λ¬Έμ„œ ν™•μž₯자λ₯Ό κ°€μ§€λŠ”μ§€ 확인"""
ALLOWED_DOC_EXTENSIONS = {'txt', 'md', 'pdf', 'docx', 'csv'}
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_DOC_EXTENSIONS
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
next_url = request.args.get('next')
logger.info(f"-------------- 둜그인 νŽ˜μ΄μ§€ 접속 (Next: {next_url}) --------------")
logger.info(f"Method: {request.method}")
if request.method == 'POST':
logger.info("둜그인 μ‹œλ„ λ°›μŒ")
username = request.form.get('username', '')
password = request.form.get('password', '')
logger.info(f"μž…λ ₯된 μ‚¬μš©μžλͺ…: {username}")
logger.info(f"λΉ„λ°€λ²ˆν˜Έ μž…λ ₯ μ—¬λΆ€: {len(password) > 0}")
# ν™˜κ²½ λ³€μˆ˜ λ˜λŠ” κΈ°λ³Έκ°’κ³Ό 비ꡐ
valid_username = ADMIN_USERNAME
valid_password = ADMIN_PASSWORD
logger.info(f"κ²€μ¦μš© μ‚¬μš©μžλͺ…: {valid_username}")
logger.info(f"κ²€μ¦μš© λΉ„λ°€λ²ˆν˜Έ 쑴재 μ—¬λΆ€: {valid_password is not None and len(valid_password) > 0}")
if username == valid_username and password == valid_password:
logger.info(f"둜그인 성곡: {username}")
# μ„Έμ…˜ μ„€μ • μ „ ν˜„μž¬ μ„Έμ…˜ μƒνƒœ λ‘œκΉ…
logger.debug(f"μ„Έμ…˜ μ„€μ • μ „: {session}")
# μ„Έμ…˜μ— 둜그인 정보 μ €μž₯
session.permanent = True
session['logged_in'] = True
session['username'] = username
session.modified = True
logger.info(f"μ„Έμ…˜ μ„€μ • ν›„: {session}")
logger.info("μ„Έμ…˜ μ„€μ • μ™„λ£Œ, λ¦¬λ””λ ‰μ…˜ μ‹œλ„")
# 둜그인 성곡 ν›„ λ¦¬λ””λ ‰μ…˜
redirect_to = next_url or url_for('index')
logger.info(f"λ¦¬λ””λ ‰μ…˜ λŒ€μƒ: {redirect_to}")
response = redirect(redirect_to)
return response
else:
logger.warning("둜그인 μ‹€νŒ¨: 아이디 λ˜λŠ” λΉ„λ°€λ²ˆν˜Έ 뢈일치")
if username != valid_username: logger.warning("μ‚¬μš©μžλͺ… 뢈일치")
if password != valid_password: logger.warning("λΉ„λ°€λ²ˆν˜Έ 뢈일치")
error = '아이디 λ˜λŠ” λΉ„λ°€λ²ˆν˜Έκ°€ μ˜¬λ°”λ₯΄μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.'
else:
logger.info("둜그인 νŽ˜μ΄μ§€ GET μš”μ²­")
if 'logged_in' in session:
logger.info("이미 둜그인된 μ‚¬μš©μž, 메인 νŽ˜μ΄μ§€λ‘œ λ¦¬λ””λ ‰μ…˜")
return redirect(url_for('index'))
logger.info("---------- 둜그인 νŽ˜μ΄μ§€ λ Œλ”λ§ ----------")
return render_template('login.html', error=error, next=next_url)
@app.route('/logout')
def logout():
logger.info("-------------- λ‘œκ·Έμ•„μ›ƒ μš”μ²­ --------------")
logger.info(f"λ‘œκ·Έμ•„μ›ƒ μ „ μ„Έμ…˜ μƒνƒœ: {session}")
if 'logged_in' in session:
username = session.get('username', 'unknown')
logger.info(f"μ‚¬μš©μž {username} λ‘œκ·Έμ•„μ›ƒ 처리 μ‹œμž‘")
session.pop('logged_in', None)
session.pop('username', None)
session.modified = True
logger.info(f"μ„Έμ…˜ 정보 μ‚­μ œ μ™„λ£Œ. ν˜„μž¬ μ„Έμ…˜: {session}")
else:
logger.warning("λ‘œκ·ΈμΈλ˜μ§€ μ•Šμ€ μƒνƒœμ—μ„œ λ‘œκ·Έμ•„μ›ƒ μ‹œλ„")
logger.info("둜그인 νŽ˜μ΄μ§€λ‘œ λ¦¬λ””λ ‰μ…˜")
response = redirect(url_for('login'))
return response
@app.route('/')
@login_required
def index():
"""메인 νŽ˜μ΄μ§€"""
nonlocal app_ready
# μ•± μ€€λΉ„ μƒνƒœ 확인 - 30초 이상 μ§€λ‚¬μœΌλ©΄ κ°•μ œλ‘œ ready μƒνƒœλ‘œ λ³€κ²½
current_time = datetime.now()
start_time = datetime.fromtimestamp(os.path.getmtime(__file__))
time_diff = (current_time - start_time).total_seconds()
if not app_ready and time_diff > 30:
logger.warning(f"앱이 30초 이상 μ΄ˆκΈ°ν™” 쀑 μƒνƒœμž…λ‹ˆλ‹€. κ°•μ œλ‘œ ready μƒνƒœλ‘œ λ³€κ²½ν•©λ‹ˆλ‹€.")
app_ready = True
if not app_ready:
logger.info("앱이 아직 μ€€λΉ„λ˜μ§€ μ•Šμ•„ λ‘œλ”© νŽ˜μ΄μ§€ ν‘œμ‹œ")
return render_template('loading.html'), 503 # μ„œλΉ„μŠ€ μ€€λΉ„ μ•ˆλ¨ μƒνƒœ μ½”λ“œ
logger.info("메인 νŽ˜μ΄μ§€ μš”μ²­")
return render_template('index.html')
@app.route('/api/status')
@login_required
def app_status():
"""μ•± μ΄ˆκΈ°ν™” μƒνƒœ 확인 API"""
logger.info(f"μ•± μƒνƒœ 확인 μš”μ²­: {'Ready' if app_ready else 'Not Ready'}")
return jsonify({"ready": app_ready})
@app.route('/api/llm', methods=['GET', 'POST'])
@login_required
def llm_api():
"""μ‚¬μš© κ°€λŠ₯ν•œ LLM λͺ©λ‘ 및 선택 API"""
if not app_ready:
return jsonify({"error": "앱이 아직 μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”."}), 503
if request.method == 'GET':
logger.info("LLM λͺ©λ‘ μš”μ²­")
try:
current_details = llm_interface.get_current_llm_details() if hasattr(llm_interface, 'get_current_llm_details') else {"id": "unknown", "name": "Unknown"}
supported_llms_dict = llm_interface.SUPPORTED_LLMS if hasattr(llm_interface, 'SUPPORTED_LLMS') else {}
supported_list = [{
"name": name, "id": id, "current": id == current_details.get("id")
} for name, id in supported_llms_dict.items()]
return jsonify({
"supported_llms": supported_list,
"current_llm": current_details
})
except Exception as e:
logger.error(f"LLM 정보 쑰회 였λ₯˜: {e}")
return jsonify({"error": "LLM 정보 쑰회 쀑 였λ₯˜ λ°œμƒ"}), 500
elif request.method == 'POST':
data = request.get_json()
if not data or 'llm_id' not in data:
return jsonify({"error": "LLM IDκ°€ μ œκ³΅λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}), 400
llm_id = data['llm_id']
logger.info(f"LLM λ³€κ²½ μš”μ²­: {llm_id}")
try:
if not hasattr(llm_interface, 'set_llm') or not hasattr(llm_interface, 'llm_clients'):
raise NotImplementedError("LLM μΈν„°νŽ˜μ΄μŠ€μ— ν•„μš”ν•œ λ©”μ†Œλ“œ/속성 μ—†μŒ")
if llm_id not in llm_interface.llm_clients:
return jsonify({"error": f"μ§€μ›λ˜μ§€ μ•ŠλŠ” LLM ID: {llm_id}"}), 400
success = llm_interface.set_llm(llm_id)
if success:
new_details = llm_interface.get_current_llm_details()
logger.info(f"LLM이 '{new_details.get('name', llm_id)}'둜 λ³€κ²½λ˜μ—ˆμŠ΅λ‹ˆλ‹€.")
return jsonify({
"success": True,
"message": f"LLM이 '{new_details.get('name', llm_id)}'둜 λ³€κ²½λ˜μ—ˆμŠ΅λ‹ˆλ‹€.",
"current_llm": new_details
})
else:
logger.error(f"LLM λ³€κ²½ μ‹€νŒ¨ (ID: {llm_id})")
return jsonify({"error": "LLM λ³€κ²½ 쀑 λ‚΄λΆ€ 였λ₯˜ λ°œμƒ"}), 500
except Exception as e:
logger.error(f"LLM λ³€κ²½ 처리 쀑 였λ₯˜: {e}", exc_info=True)
return jsonify({"error": f"LLM λ³€κ²½ 쀑 였λ₯˜ λ°œμƒ: {str(e)}"}), 500
@app.route('/api/chat', methods=['POST'])
@login_required
def chat():
"""ν…μŠ€νŠΈ 기반 챗봇 API"""
if not app_ready or retriever is None:
return jsonify({"error": "μ•±/검색기가 아직 μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”."}), 503
try:
data = request.get_json()
if not data or 'query' not in data:
return jsonify({"error": "쿼리가 μ œκ³΅λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}), 400
query = data['query']
logger.info(f"ν…μŠ€νŠΈ 쿼리 μˆ˜μ‹ : {query[:100]}...")
# RAG 검색 μˆ˜ν–‰
if not hasattr(retriever, 'search'):
raise NotImplementedError("Retriever에 search λ©”μ†Œλ“œκ°€ μ—†μŠ΅λ‹ˆλ‹€.")
search_results = retriever.search(query, top_k=5, first_stage_k=6)
# μ»¨ν…μŠ€νŠΈ μ€€λΉ„
if not hasattr(DocumentProcessor, 'prepare_rag_context'):
raise NotImplementedError("DocumentProcessor에 prepare_rag_context λ©”μ†Œλ“œκ°€ μ—†μŠ΅λ‹ˆλ‹€.")
context = DocumentProcessor.prepare_rag_context(search_results, field="text")
if not context:
logger.warning("검색 κ²°κ³Όκ°€ μ—†μ–΄ μ»¨ν…μŠ€νŠΈλ₯Ό μƒμ„±ν•˜μ§€ λͺ»ν•¨.")
# LLM에 질의
llm_id = data.get('llm_id', None)
if not hasattr(llm_interface, 'rag_generate'):
raise NotImplementedError("LLMInterface에 rag_generate λ©”μ†Œλ“œκ°€ μ—†μŠ΅λ‹ˆλ‹€.")
if not context:
answer = "μ£„μ†‘ν•©λ‹ˆλ‹€. κ΄€λ ¨ 정보λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
logger.info("μ»¨ν…μŠ€νŠΈ 없이 κΈ°λ³Έ 응닡 생성")
else:
answer = llm_interface.rag_generate(query, context, llm_id=llm_id)
logger.info(f"LLM 응닡 생성 μ™„λ£Œ (길이: {len(answer)})")
# μ†ŒμŠ€ 정보 μΆ”μΆœ (CSV ID μΆ”μΆœ 둜직 포함)
sources = []
if search_results:
for result in search_results:
if not isinstance(result, dict):
logger.warning(f"μ˜ˆμƒμΉ˜ λͺ»ν•œ 검색 κ²°κ³Ό ν˜•μ‹: {type(result)}")
continue
if "source" in result:
source_info = {
"source": result.get("source", "Unknown"),
"score": result.get("rerank_score", result.get("score", 0))
}
# CSV 파일 νŠΉμ • 처리
if "text" in result and result.get("filetype") == "csv":
try:
text_lines = result["text"].strip().split('\n')
if text_lines:
first_line = text_lines[0].strip()
if ',' in first_line:
first_column = first_line.split(',')[0].strip()
source_info["id"] = first_column
logger.debug(f"CSV μ†ŒμŠ€ ID μΆ”μΆœ: {first_column} from {source_info['source']}")
except Exception as e:
logger.warning(f"CSV μ†ŒμŠ€ ID μΆ”μΆœ μ‹€νŒ¨ ({result.get('source')}): {e}")
sources.append(source_info)
# μ΅œμ’… 응닡
response_data = {
"answer": answer,
"sources": sources,
"llm": llm_interface.get_current_llm_details() if hasattr(llm_interface, 'get_current_llm_details') else {}
}
return jsonify(response_data)
except Exception as e:
logger.error(f"μ±„νŒ… 처리 쀑 였λ₯˜ λ°œμƒ: {e}", exc_info=True)
return jsonify({"error": f"처리 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"}), 500
@app.route('/api/voice', methods=['POST'])
@login_required
def voice_chat():
"""μŒμ„± μ±— API μ—”λ“œν¬μΈνŠΈ"""
if not app_ready:
logger.warning("μ•± μ΄ˆκΈ°ν™”κ°€ μ™„λ£Œλ˜μ§€ μ•Šμ•˜μ§€λ§Œ μŒμ„± API μš”μ²­ 처리 μ‹œλ„")
# μ—¬κΈ°μ„œ λ°”λ‘œ λ¦¬ν„΄ν•˜μ§€ μ•Šκ³  계속 μ§„ν–‰
# 사전 검사: retriever와 stt_clientκ°€ μ œλŒ€λ‘œ μ΄ˆκΈ°ν™”λ˜μ—ˆλŠ”μ§€ 확인
if retriever is None:
logger.error("retrieverκ°€ 아직 μ΄ˆκΈ°ν™”λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€")
return jsonify({
"transcription": "(μŒμ„±μ„ ν…μŠ€νŠΈλ‘œ λ³€ν™˜ν–ˆμ§€λ§Œ 검색 엔진이 아직 μ€€λΉ„λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€)",
"answer": "μ£„μ†‘ν•©λ‹ˆλ‹€. 검색 엔진이 아직 μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.",
"sources": []
})
# λ˜λŠ” ν•„μˆ˜ μ»΄ν¬λ„ŒνŠΈκ°€ 없을 λ•Œλ§Œ νŠΉλ³„ 응닡 λ°˜ν™˜
if stt_client is None:
return jsonify({
"transcription": "(μŒμ„± 인식 κΈ°λŠ₯이 μ€€λΉ„ μ€‘μž…λ‹ˆλ‹€)",
"answer": "μ£„μ†‘ν•©λ‹ˆλ‹€. ν˜„μž¬ μŒμ„± 인식 μ„œλΉ„μŠ€κ°€ μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.",
"sources": []
})
logger.info("μŒμ„± μ±— μš”μ²­ μˆ˜μ‹ ")
if 'audio' not in request.files:
logger.error("μ˜€λ””μ˜€ 파일이 μ œκ³΅λ˜μ§€ μ•ŠμŒ")
return jsonify({"error": "μ˜€λ””μ˜€ 파일이 μ œκ³΅λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}), 400
audio_file = request.files['audio']
logger.info(f"μˆ˜μ‹ λœ μ˜€λ””μ˜€ 파일: {audio_file.filename} ({audio_file.content_type})")
try:
# μ˜€λ””μ˜€ 파일 처리
# μž„μ‹œ 파일 μ‚¬μš© κ³ λ € (λ©”λͺ¨λ¦¬ λΆ€λ‹΄ 쀄이기 μœ„ν•΄)
with tempfile.NamedTemporaryFile(delete=True) as temp_audio:
audio_file.save(temp_audio.name)
logger.info(f"μ˜€λ””μ˜€ νŒŒμΌμ„ μž„μ‹œ μ €μž₯: {temp_audio.name}")
# VitoSTT.transcribe_audio κ°€ 파일 경둜 λ˜λŠ” λ°”μ΄νŠΈλ₯Ό 받을 수 μžˆλ„λ‘ κ΅¬ν˜„λ˜μ–΄μ•Ό 함
# μ—¬κΈ°μ„œλŠ” 파일 경둜λ₯Ό μ‚¬μš©ν•œλ‹€κ³  κ°€μ •
if not hasattr(stt_client, 'transcribe_audio'):
raise NotImplementedError("STT ν΄λΌμ΄μ–ΈνŠΈμ— transcribe_audio λ©”μ†Œλ“œκ°€ μ—†μŠ΅λ‹ˆλ‹€.")
# 파일 경둜둜 전달 μ‹œ
# stt_result = stt_client.transcribe_audio(temp_audio.name, language="ko")
# λ°”μ΄νŠΈλ‘œ 전달 μ‹œ
with open(temp_audio.name, 'rb') as f_bytes:
audio_bytes = f_bytes.read()
stt_result = stt_client.transcribe_audio(audio_bytes, language="ko")
if not isinstance(stt_result, dict) or not stt_result.get("success"):
error_msg = stt_result.get("error", "μ•Œ 수 μ—†λŠ” STT 였λ₯˜") if isinstance(stt_result, dict) else "STT κ²°κ³Ό ν˜•μ‹ 였λ₯˜"
logger.error(f"μŒμ„±μΈμ‹ μ‹€νŒ¨: {error_msg}")
return jsonify({
"error": "μŒμ„±μΈμ‹ μ‹€νŒ¨",
"details": error_msg
}), 500
transcription = stt_result.get("text", "")
if not transcription:
logger.warning("μŒμ„±μΈμ‹ κ²°κ³Όκ°€ λΉ„μ–΄μžˆμŠ΅λ‹ˆλ‹€.")
return jsonify({"error": "μŒμ„±μ—μ„œ ν…μŠ€νŠΈλ₯Ό μΈμ‹ν•˜μ§€ λͺ»ν–ˆμŠ΅λ‹ˆλ‹€.", "transcription": ""}), 400
logger.info(f"μŒμ„±μΈμ‹ 성곡: {transcription[:50]}...")
if retriever is None:
logger.error("STT 성곡 ν›„ 검색 μ‹œλ„ 쀑 retrieverκ°€ Noneμž„")
return jsonify({
"transcription": transcription,
"answer": "μŒμ„±μ„ μΈμ‹ν–ˆμ§€λ§Œ, ν˜„μž¬ 검색 μ‹œμŠ€ν…œμ΄ μ€€λΉ„λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.",
"sources": []
})
# --- 이후 λ‘œμ§μ€ /api/chatκ³Ό 거의 동일 ---
# RAG 검색 μˆ˜ν–‰
search_results = retriever.search(transcription, top_k=5, first_stage_k=6)
context = DocumentProcessor.prepare_rag_context(search_results, field="text")
if not context:
logger.warning("μŒμ„± 쿼리에 λŒ€ν•œ 검색 κ²°κ³Ό μ—†μŒ.")
# answer = "μ£„μ†‘ν•©λ‹ˆλ‹€. κ΄€λ ¨ 정보λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€." (μ•„λž˜ LLM 호좜 λ‘œμ§μ—μ„œ 처리)
pass
# LLM 호좜
llm_id = request.form.get('llm_id', None) # μŒμ„± μš”μ²­μ€ form λ°μ΄ν„°λ‘œ LLM ID 받을 수 있음
if not context:
answer = "μ£„μ†‘ν•©λ‹ˆλ‹€. κ΄€λ ¨ 정보λ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
logger.info("μ»¨ν…μŠ€νŠΈ 없이 κΈ°λ³Έ 응닡 생성")
else:
answer = llm_interface.rag_generate(transcription, context, llm_id=llm_id)
logger.info(f"LLM 응닡 생성 μ™„λ£Œ (길이: {len(answer)})")
# μ†ŒμŠ€ 정보 μΆ”μΆœ
enhanced_sources = []
if search_results:
for doc in search_results:
if not isinstance(doc, dict): continue # A
if "source" in doc:
source_info = {
"source": doc.get("source", "Unknown"),
"score": doc.get("rerank_score", doc.get("score", 0))
}
if "text" in doc and doc.get("filetype") == "csv":
try:
text_lines = doc["text"].strip().split('\n')
if text_lines:
first_line = text_lines[0].strip()
if ',' in first_line:
first_column = first_line.split(',')[0].strip()
source_info["id"] = first_column
except Exception as e:
logger.warning(f"[μŒμ„±μ±—] CSV μ†ŒμŠ€ ID μΆ”μΆœ μ‹€νŒ¨ ({doc.get('source')}): {e}")
enhanced_sources.append(source_info)
# μ΅œμ’… 응닡
response_data = {
"transcription": transcription,
"answer": answer,
"sources": enhanced_sources,
"llm": llm_interface.get_current_llm_details() if hasattr(llm_interface, 'get_current_llm_details') else {}
}
return jsonify(response_data)
except Exception as e:
logger.error(f"μŒμ„± μ±— 처리 쀑 였λ₯˜ λ°œμƒ: {e}", exc_info=True)
return jsonify({
"error": "μŒμ„± 처리 쀑 λ‚΄λΆ€ 였λ₯˜ λ°œμƒ",
"details": str(e)
}), 500
@app.route('/api/upload', methods=['POST'])
@login_required
def upload_document():
"""μ§€μ‹λ² μ΄μŠ€ λ¬Έμ„œ μ—…λ‘œλ“œ API"""
if not app_ready or base_retriever is None:
return jsonify({"error": "μ•±/κΈ°λ³Έ 검색기가 아직 μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€."}), 503
if 'document' not in request.files:
return jsonify({"error": "λ¬Έμ„œ 파일이 μ œκ³΅λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."}), 400
doc_file = request.files['document']
if doc_file.filename == '':
return jsonify({"error": "μ„ νƒλœ 파일이 μ—†μŠ΅λ‹ˆλ‹€."}), 400
if not allowed_doc_file(doc_file.filename):
logger.error(f"ν—ˆμš©λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹: {doc_file.filename}")
return jsonify({"error": f"ν—ˆμš©λ˜μ§€ μ•ŠλŠ” 파일 ν˜•μ‹μž…λ‹ˆλ‹€. ν—ˆμš©: {', '.join(ALLOWED_DOC_EXTENSIONS)}"}), 400
try:
filename = secure_filename(doc_file.filename)
filepath = os.path.join(app.config['DATA_FOLDER'], filename)
doc_file.save(filepath)
logger.info(f"λ¬Έμ„œ μ €μž₯ μ™„λ£Œ: {filepath}")
# λ¬Έμ„œ 처리 (인코딩 처리 포함)
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
logger.info(f"UTF-8 λ””μ½”λ”© μ‹€νŒ¨, CP949둜 μ‹œλ„: {filename}")
try:
with open(filepath, 'r', encoding='cp949') as f:
content = f.read()
except Exception as e_cp949:
logger.error(f"CP949 λ””μ½”λ”© μ‹€νŒ¨ ({filename}): {e_cp949}")
return jsonify({"error": "파일 인코딩을 읽을 수 μ—†μŠ΅λ‹ˆλ‹€ (UTF-8, CP949 μ‹œλ„ μ‹€νŒ¨)."}), 400
except Exception as e_read:
logger.error(f"파일 읽기 였λ₯˜ ({filename}): {e_read}")
return jsonify({"error": f"파일 읽기 쀑 였λ₯˜ λ°œμƒ: {str(e_read)}"}), 500
# 메타데이터 및 λ¬Έμ„œ λΆ„ν• /처리
metadata = {
"source": filename, "filename": filename,
"filetype": filename.rsplit('.', 1)[1].lower(),
"filepath": filepath
}
file_ext = metadata["filetype"]
docs = []
if not hasattr(DocumentProcessor, 'csv_to_documents') or not hasattr(DocumentProcessor, 'text_to_documents'):
raise NotImplementedError("DocumentProcessor에 ν•„μš”ν•œ λ©”μ†Œλ“œ μ—†μŒ")
if file_ext == 'csv':
logger.info(f"CSV 파일 처리 μ‹œμž‘: {filename}")
docs = DocumentProcessor.csv_to_documents(content, metadata) # ν–‰ λ‹¨μœ„ 처리 κ°€μ •
else: # 기타 ν…μŠ€νŠΈ 기반 λ¬Έμ„œ
logger.info(f"일반 ν…μŠ€νŠΈ λ¬Έμ„œ 처리 μ‹œμž‘: {filename}")
# PDF, DOCX 등은 별도 라이브러리(pypdf, python-docx) ν•„μš”
if file_ext in ['pdf', 'docx']:
logger.warning(f".{file_ext} 파일 μ²˜λ¦¬λŠ” ν˜„μž¬ κ΅¬ν˜„λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. ν…μŠ€νŠΈ μΆ”μΆœ 둜직 μΆ”κ°€ ν•„μš”.")
# 여기에 pdf/docx ν…μŠ€νŠΈ μΆ”μΆœ 둜직 μΆ”κ°€
# 예: content = extract_text_from_pdf(filepath)
# content = extract_text_from_docx(filepath)
# μž„μ‹œλ‘œ λΉ„μ›Œλ‘ 
content = ""
if content: # ν…μŠ€νŠΈ λ‚΄μš©μ΄ μžˆμ„ λ•Œλ§Œ 처리
docs = DocumentProcessor.text_to_documents(
content, metadata=metadata,
chunk_size=512, chunk_overlap=50
)
# 검색기에 λ¬Έμ„œ μΆ”κ°€ 및 인덱슀 μ €μž₯
if docs:
if not hasattr(base_retriever, 'add_documents') or not hasattr(base_retriever, 'save'):
raise NotImplementedError("κΈ°λ³Έ 검색기에 add_documents λ˜λŠ” save λ©”μ†Œλ“œ μ—†μŒ")
logger.info(f"{len(docs)}개 λ¬Έμ„œ 청크λ₯Ό 검색기에 μΆ”κ°€ν•©λ‹ˆλ‹€...")
base_retriever.add_documents(docs)
# 인덱슀 μ €μž₯ (μ—…λ‘œλ“œλ§ˆλ‹€ μ €μž₯ - λΉ„νš¨μœ¨μ μΌ 수 있음)
logger.info(f"검색기 μƒνƒœλ₯Ό μ €μž₯ν•©λ‹ˆλ‹€...")
index_path = app.config['INDEX_PATH']
try:
base_retriever.save(index_path)
logger.info("인덱슀 μ €μž₯ μ™„λ£Œ")
# μž¬μˆœμœ„ν™” 검색기도 μ—…λ°μ΄νŠΈ ν•„μš” μ‹œ 둜직 μΆ”κ°€
# 예: retriever.update_base_retriever(base_retriever)
return jsonify({
"success": True,
"message": f"파일 '{filename}' μ—…λ‘œλ“œ 및 처리 μ™„λ£Œ ({len(docs)}개 청크 μΆ”κ°€)."
})
except Exception as e_save:
logger.error(f"인덱슀 μ €μž₯ 쀑 였λ₯˜ λ°œμƒ: {e_save}")
return jsonify({"error": f"인덱슀 μ €μž₯ 쀑 였λ₯˜: {str(e_save)}"}), 500
else:
logger.warning(f"파일 '{filename}'μ—μ„œ μ²˜λ¦¬ν•  λ‚΄μš©μ΄ μ—†κ±°λ‚˜ μ§€μ›λ˜μ§€ μ•ŠλŠ” ν˜•μ‹μž…λ‹ˆλ‹€.")
# νŒŒμΌμ€ μ €μž₯λ˜μ—ˆμœΌλ―€λ‘œ μ„±κ³΅μœΌλ‘œ κ°„μ£Όν• μ§€ κ²°μ • ν•„μš”
return jsonify({
"warning": True,
"message": f"파일 '{filename}'이 μ €μž₯λ˜μ—ˆμ§€λ§Œ μ²˜λ¦¬ν•  λ‚΄μš©μ΄ μ—†μŠ΅λ‹ˆλ‹€."
})
except Exception as e:
logger.error(f"파일 μ—…λ‘œλ“œ λ˜λŠ” 처리 쀑 였λ₯˜ λ°œμƒ: {e}", exc_info=True)
return jsonify({"error": f"파일 μ—…λ‘œλ“œ 쀑 였λ₯˜: {str(e)}"}), 500
@app.route('/api/documents', methods=['GET'])
@login_required
def list_documents():
"""μ§€μ‹λ² μ΄μŠ€ λ¬Έμ„œ λͺ©λ‘ API"""
if not app_ready or base_retriever is None:
return jsonify({"error": "μ•±/κΈ°λ³Έ 검색기가 아직 μ΄ˆκΈ°ν™” μ€‘μž…λ‹ˆλ‹€."}), 503
try:
sources = {}
total_chunks = 0
# base_retriever.documents 와 같은 속성이 μ‹€μ œ ν΄λž˜μŠ€μ— μžˆλ‹€κ³  κ°€μ •
if hasattr(base_retriever, 'documents') and base_retriever.documents:
logger.info(f"총 {len(base_retriever.documents)}개 λ¬Έμ„œ μ²­ν¬μ—μ„œ μ†ŒμŠ€ λͺ©λ‘ 생성 쀑...")
for doc in base_retriever.documents:
# λ¬Έμ„œ 청크가 λ”•μ…”λ„ˆλ¦¬ ν˜•νƒœλΌκ³  κ°€μ •
if not isinstance(doc, dict): continue
source = doc.get("source", "unknown") # λ©”νƒ€λ°μ΄ν„°μ—μ„œ source κ°€μ Έμ˜€κΈ°
if source == "unknown" and "metadata" in doc and isinstance(doc["metadata"], dict):
source = doc["metadata"].get("source", "unknown") # Langchain Document ꡬ쑰 고렀
if source != "unknown":
if source in sources:
sources[source]["chunks"] += 1
else:
# λ©”νƒ€λ°μ΄ν„°μ—μ„œ μΆ”κ°€ 정보 κ°€μ Έμ˜€κΈ°
filename = doc.get("filename", source)
filetype = doc.get("filetype", "unknown")
if "metadata" in doc and isinstance(doc["metadata"], dict):
filename = doc["metadata"].get("filename", filename)
filetype = doc["metadata"].get("filetype", filetype)
sources[source] = {
"filename": filename,
"chunks": 1,
"filetype": filetype
}
total_chunks += 1
else:
logger.info("검색기에 λ¬Έμ„œκ°€ μ—†κ±°λ‚˜ documents 속성을 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.")
# λͺ©λ‘ ν˜•μ‹ λ³€ν™˜ 및 μ •λ ¬
documents = [{"source": src, **info} for src, info in sources.items()]
documents.sort(key=lambda x: x["chunks"], reverse=True)
logger.info(f"λ¬Έμ„œ λͺ©λ‘ 쑰회 μ™„λ£Œ: {len(documents)}개 μ†ŒμŠ€ 파일, {total_chunks}개 청크")
return jsonify({
"documents": documents,
"total_documents": len(documents),
"total_chunks": total_chunks
})
except Exception as e:
logger.error(f"λ¬Έμ„œ λͺ©λ‘ 쑰회 쀑 였λ₯˜ λ°œμƒ: {e}", exc_info=True)
return jsonify({"error": f"λ¬Έμ„œ λͺ©λ‘ 쑰회 쀑 였λ₯˜: {str(e)}"}), 500