import logging from flask import Blueprint, render_template, request, jsonify, send_from_directory from pathlib import Path import shutil # For zipping the cache directory import json # For parsing streamed JSON data from werkzeug.utils import secure_filename import os import config import utils from llm_client import make_chat_completion_request, is_initialized as llm_is_initialized from cache_store import cache from cache_store import cache_directory import requests logger = logging.getLogger(__name__) main_bp = Blueprint('main', __name__) # 업로드 허용 확장자 ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'dcm', 'dicom'} def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # LLM client is initialized in app.py create_app() # --- Serve the cache directory as a zip file --- @main_bp.route('/download_cache') def download_cache_zip(): """캐시 디렉토리를 압축하여 다운로드를 위해 제공합니다.""" zip_filename = "radexplain-cache.zip" # Create the zip file in a temporary directory # Using /tmp is common in containerized environments temp_dir = "/tmp" zip_base_path = os.path.join(temp_dir, "radexplain-cache") # shutil adds .zip zip_filepath = zip_base_path + ".zip" # Ensure the cache directory exists before trying to zip it if not os.path.isdir(cache_directory): logger.error(f"캐시 디렉토리를 찾을 수 없습니다: {cache_directory}") return jsonify({"error": f"서버에서 캐시 디렉토리를 찾을 수 없습니다: {cache_directory}"}), 500 try: logger.info(f"캐시 디렉토리의 압축 파일 생성 중: {cache_directory} -> {zip_filepath}") shutil.make_archive( zip_base_path, # This is the base name, shutil adds the .zip extension "zip", cache_directory, # This is the root directory to archive ) logger.info("압축 파일이 성공적으로 생성되었습니다.") # Send the file and then clean it up return send_from_directory(temp_dir, zip_filename, as_attachment=True) except Exception as e: logger.error(f"캐시 디렉토리 압축 파일 생성 또는 전송 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"압축 파일 생성 또는 전송 중 오류 발생: {e}"}), 500 @main_bp.route('/clear_cache', methods=['POST']) def clear_cache(): """캐시를 초기화합니다.""" try: cache.clear() logger.info("캐시가 성공적으로 초기화되었습니다.") return jsonify({"message": "캐시가 초기화되었습니다.", "success": True}) except Exception as e: logger.error(f"캐시 초기화 중 오류 발생: {e}", exc_info=True) return jsonify({"error": f"캐시 초기화 중 오류 발생: {e}", "success": False}), 500 @main_bp.route('/') def index(): """메인 HTML 페이지를 제공합니다.""" # The backend now only provides the list of available reports. # The frontend will be responsible for selecting a report, # fetching its details (text, image path), and managing the current state. if not config.AVAILABLE_REPORTS: logger.warning("설정에서 보고서를 찾을 수 없습니다. AVAILABLE_REPORTS가 비어있습니다.") return render_template( 'index.html', available_reports=config.AVAILABLE_REPORTS ) @main_bp.route('/get_report_details/') def get_report_details(report_name): """주어진 보고서 이름에 대한 텍스트 내용과 이미지 경로를 가져옵니다.""" selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) if not selected_report_info: logger.error(f"상세 정보를 가져올 때 보고서 '{report_name}'을(를) 찾을 수 없습니다.") return jsonify({"error": f"보고서 '{report_name}'을(를) 찾을 수 없습니다."}), 404 report_file = selected_report_info.get('report_file') image_file = selected_report_info.get('image_file') report_text_content = "" # Default to empty if no report file is configured. if report_file: actual_server_report_path = config.BASE_DIR / report_file try: report_text_content = actual_server_report_path.read_text(encoding='utf-8').strip() except Exception as e: logger.error(f"보고서 '{report_name}'의 파일 {actual_server_report_path} 읽기 오류: {e}", exc_info=True) return jsonify({"error": "보고서 파일을 읽는 중 오류가 발생했습니다."}), 500 # If report_file was empty, report_text_content remains "". image_type_from_config = selected_report_info.get('image_type') display_image_type = '흉부 X-레이' if image_type_from_config == 'CXR' else ('CT' if image_type_from_config == 'CT' else '의료 영상') return jsonify({"text": report_text_content, "image_file": image_file, "image_type": display_image_type}) @main_bp.route('/explain', methods=['POST']) def explain_sentence(): """LLM API를 사용하여 설명 요청을 처리합니다.""" if not llm_is_initialized(): logger.error("LLM 클라이언트(REST API)가 초기화되지 않았습니다. 요청을 처리할 수 없습니다.") return jsonify({"error": "LLM 클라이언트(REST API)가 초기화되지 않았습니다. API 키와 베이스 URL을 확인하세요."}), 500 data = request.get_json() if not data or 'sentence' not in data or 'report_name' not in data: logger.warning("요청 페이로드에 'sentence' 또는 'report_name'이 누락되었습니다.") return jsonify({"error": "요청에 'sentence' 또는 'report_name'이 누락되었습니다"}), 400 selected_sentence = data['sentence'] report_name = data['report_name'] logger.info(f"설명 요청 수신: '{selected_sentence}' (보고서: '{report_name}')") # --- Find the selected report info --- selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) if not selected_report_info: logger.error(f"사용 가능한 보고서에서 '{report_name}'을(를) 찾을 수 없습니다.") return jsonify({"error": f"보고서 '{report_name}'을(를) 찾을 수 없습니다."}), 404 image_file = selected_report_info.get('image_file') report_file = selected_report_info.get('report_file') image_type = selected_report_info.get('image_type') if not image_file: logger.error(f"보고서 '{report_name}'의 설정에서 이미지 또는 보고서 파일 경로(static 기준 상대경로)가 누락되었습니다.") return jsonify({"error": f"보고서 '{report_name}'의 파일 설정이 누락되었습니다."}), 500 full_report_text = "" if report_file: # Only attempt to read if a report file is configured server_report_path = config.BASE_DIR / report_file try: full_report_text = server_report_path.read_text(encoding='utf-8') except FileNotFoundError: logger.error(f"보고서 파일을 찾을 수 없습니다: {server_report_path}") return jsonify({"error": f"보고서 '{report_name}'의 파일을 서버에서 찾을 수 없습니다."}), 500 except Exception as e: logger.error(f"보고서 파일 {server_report_path} 읽기 오류: {e}", exc_info=True) return jsonify({"error": "보고서 파일을 읽는 중 오류가 발생했습니다."}), 500 else: # If report_file is not configured (e.g. empty string from selected_report_info) logger.info(f"보고서 '{report_name}'에 대한 보고서 파일이 설정되지 않았습니다. 시스템 프롬프트에 전체 보고서 텍스트 없이 진행합니다.") # 이미지 타입에 따른 한글 표시 image_type_korean = '흉부 X-레이' if image_type == 'CXR' else ('CT' if image_type == 'CT' else '의료 영상') system_prompt = ( "당신은 일반인을 대상으로 설명하는 임상의입니다. " "반드시 한국어로 응답하세요. " f"학습 중인 사용자가 방사선 보고서의 한 문장을 제공했고 함께 제공된 {image_type_korean} 영상을 보고 있습니다. " "당신의 임무는 제공된 문장만의 의미를 간단하고 명확한 한국어로 설명하는 것입니다. 전문 용어와 약어를 한국어로 설명하세요. 간결하게 유지하세요. " "문장의 의미를 직접적으로 설명하세요. '네' 또는 '알겠습니다'와 같은 도입 문구를 사용하지 말고, 문장 자체나 보고서 자체를 언급하지 마세요(예: '이 문장은...'). " f"{f'중요한 점은, 사용자가 {image_type_korean} 영상을 보고 있으므로, 해당되는 경우 설명을 이해하기 위해 영상의 어느 부분을 봐야 하는지 한국어로 안내를 제공하세요. ' if image_type != 'CT' else ''}" "사용자가 명시적으로 제공하지 않은 보고서의 다른 부분이나 문장에 대해서는 논의하지 마세요. 텍스트에 있는 사실만을 고수하세요. 추론하지 마세요. " "모든 설명은 반드시 한국어로 작성하세요.\n" "===\n" f"참고를 위한 전체 보고서:\n{full_report_text}" ) user_prompt_text = f"방사선 보고서의 이 문장을 설명해주세요: '{selected_sentence}'" messages_for_api = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt_text} ] # 한글 응답을 위해 캐시 키에 언어 정보 추가 cache_key = f"explain::ko::{report_name}::{selected_sentence}" cached_result = cache.get(cache_key) if cached_result: logger.info("캐시된 설명을 반환합니다.") return jsonify({"explanation": cached_result}) try: logger.info("LLM API(REST)로 요청을 전송 중...") response = make_chat_completion_request( model="tgi", messages=messages_for_api, top_p=None, temperature=0, max_tokens=250, stream=True, seed=None, stop=None, frequency_penalty=None, presence_penalty=None ) logger.info("LLM API(REST)로부터 응답 스트림을 수신했습니다.") explanation_parts = [] for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data: '): json_data_str = decoded_line[len('data: '):].strip() if json_data_str == "[DONE]": break try: chunk = json.loads(json_data_str) if chunk.get("choices") and chunk["choices"][0].get("delta") and chunk["choices"][0]["delta"].get("content"): explanation_parts.append(chunk["choices"][0]["delta"]["content"]) except json.JSONDecodeError: logger.warning(f"스트림 청크에서 JSON을 디코딩할 수 없습니다: {json_data_str}") # Depending on API, might need to handle partial JSON or other errors elif decoded_line.strip() == "[DONE]": # Some APIs might send [DONE] without "data: " break explanation = "".join(explanation_parts).strip() if explanation: cache.set(cache_key, explanation, expire=None) logger.info("설명이 성공적으로 생성되었습니다." if explanation else "API로부터 빈 설명을 받았습니다.") return jsonify({"explanation": explanation or "API로부터 설명 내용을 받지 못했습니다."}) except requests.exceptions.RequestException as e: logger.error(f"LLM API(REST) 호출 중 오류 발생: {e}", exc_info=True) user_error_message = ("설명 생성에 실패했습니다. 서비스가 일시적으로 사용할 수 없으며 " "현재 시작 중일 수 있습니다. 잠시 후 다시 시도해 주세요.") return jsonify({"error": user_error_message}), 500 @main_bp.route('/upload_and_analyze', methods=['POST']) def upload_and_analyze(): """이미지를 업로드하고 분석합니다.""" if not llm_is_initialized(): logger.error("LLM 클라이언트(REST API)가 초기화되지 않았습니다. 요청을 처리할 수 없습니다.") return jsonify({"error": "LLM 클라이언트가 초기화되지 않았습니다. API 키와 베이스 URL을 확인하세요."}), 500 # 파일 확인 if 'image' not in request.files: return jsonify({"error": "이미지 파일이 요청에 포함되지 않았습니다."}), 400 file = request.files['image'] if file.filename == '': return jsonify({"error": "파일이 선택되지 않았습니다."}), 400 if not allowed_file(file.filename): return jsonify({"error": "허용되지 않는 파일 형식입니다. PNG, JPG, JPEG, GIF, BMP, DICOM 파일만 업로드 가능합니다."}), 400 # 이미지 타입 가져오기 (기본값: CXR) image_type = request.form.get('image_type', 'CXR') image_type_korean = '흉부 X-레이' if image_type == 'CXR' else ('CT' if image_type == 'CT' else '의료 영상') # 추가 컨텍스트 가져오기 (선택사항) additional_context = request.form.get('context', '') try: # 파일명 저장 (분석에 참고용) filename = secure_filename(file.filename) # 시스템 프롬프트 구성 (이미지 직접 분석 대신 일반적인 가이드 제공) system_prompt = ( "당신은 경험이 풍부한 방사선과 전문의입니다. " "반드시 한국어로 응답하세요. " f"사용자가 {image_type_korean} 영상을 업로드했습니다. " "일반적인 영상 판독 가이드라인과 주의사항을 제공하세요. " "다음 구조로 응답하세요:\n" f"1. {image_type_korean} 판독 시 확인해야 할 주요 사항\n" "2. 일반적으로 관찰될 수 있는 소견들\n" "3. 정확한 진단을 위한 권고사항\n" "4. 주의사항: 실제 진단은 의료 전문가의 직접적인 검토가 필요함을 강조\n" "모든 설명은 반드시 한국어로 작성하세요." ) user_prompt_text = f"{image_type_korean} 영상 '{filename}'이 업로드되었습니다. 이러한 유형의 영상을 판독할 때 고려해야 할 사항을 설명해주세요." if additional_context: user_prompt_text += f"\n환자/검사 정보: {additional_context}" messages_for_api = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt_text} ] # 캐시 키 생성 (파일명과 컨텍스트 기반) cache_key = f"analyze::ko::{image_type}::{additional_context[:50]}" cached_result = cache.get(cache_key) if cached_result: logger.info("캐시된 분석 가이드를 반환합니다.") return jsonify({"analysis": cached_result}) logger.info("LLM API(REST)로 분석 가이드 요청을 전송 중...") response = make_chat_completion_request( model="tgi", messages=messages_for_api, top_p=None, temperature=0.1, max_tokens=500, stream=True, seed=None, stop=None, frequency_penalty=None, presence_penalty=None ) logger.info("LLM API(REST)로부터 응답 스트림을 수신했습니다.") analysis_parts = [] for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data: '): json_data_str = decoded_line[len('data: '):].strip() if json_data_str == "[DONE]": break try: chunk = json.loads(json_data_str) if chunk.get("choices") and chunk["choices"][0].get("delta") and chunk["choices"][0]["delta"].get("content"): analysis_parts.append(chunk["choices"][0]["delta"]["content"]) except json.JSONDecodeError: logger.warning(f"스트림 청크에서 JSON을 디코딩할 수 없습니다: {json_data_str}") elif decoded_line.strip() == "[DONE]": break analysis = "".join(analysis_parts).strip() if analysis: cache.set(cache_key, analysis, expire=None) logger.info("분석 가이드가 성공적으로 생성되었습니다." if analysis else "API로부터 빈 분석 결과를 받았습니다.") # 주의사항 추가 if analysis: analysis += "\n\n⚠️ **중요**: 이는 일반적인 가이드라인이며, 실제 영상의 정확한 판독과 진단은 의료 전문가가 직접 영상을 검토하여 수행해야 합니다." return jsonify({"analysis": analysis or "API로부터 분석 결과를 받지 못했습니다."}) except Exception as e: logger.error(f"이미지 업로드 및 분석 가이드 생성 중 오류 발생: {e}", exc_info=True) return jsonify({"error": "분석 가이드 생성 중 오류가 발생했습니다. 잠시 후 다시 시도해주세요."}), 500