Spaces:
Sleeping
Sleeping
import logging | |
from flask import Blueprint, render_template, request, jsonify, send_from_directory | |
from pathlib import Path | |
import shutil # For zipping the cache directory | |
import json # For parsing streamed JSON data | |
from werkzeug.utils import secure_filename | |
import os | |
import config | |
import utils | |
from llm_client import make_chat_completion_request, is_initialized as llm_is_initialized | |
from cache_store import cache | |
from cache_store import cache_directory | |
import requests | |
logger = logging.getLogger(__name__) | |
main_bp = Blueprint('main', __name__) | |
# μ λ‘λ νμ© νμ₯μ | |
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'bmp', 'dcm', 'dicom'} | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# LLM client is initialized in app.py create_app() | |
# --- Serve the cache directory as a zip file --- | |
def download_cache_zip(): | |
"""μΊμ λλ ν 리λ₯Ό μμΆνμ¬ λ€μ΄λ‘λλ₯Ό μν΄ μ 곡ν©λλ€.""" | |
zip_filename = "radexplain-cache.zip" | |
# Create the zip file in a temporary directory | |
# Using /tmp is common in containerized environments | |
temp_dir = "/tmp" | |
zip_base_path = os.path.join(temp_dir, "radexplain-cache") # shutil adds .zip | |
zip_filepath = zip_base_path + ".zip" | |
# Ensure the cache directory exists before trying to zip it | |
if not os.path.isdir(cache_directory): | |
logger.error(f"μΊμ λλ ν 리λ₯Ό μ°Ύμ μ μμ΅λλ€: {cache_directory}") | |
return jsonify({"error": f"μλ²μμ μΊμ λλ ν 리λ₯Ό μ°Ύμ μ μμ΅λλ€: {cache_directory}"}), 500 | |
try: | |
logger.info(f"μΊμ λλ ν 리μ μμΆ νμΌ μμ± μ€: {cache_directory} -> {zip_filepath}") | |
shutil.make_archive( | |
zip_base_path, # This is the base name, shutil adds the .zip extension | |
"zip", | |
cache_directory, # This is the root directory to archive | |
) | |
logger.info("μμΆ νμΌμ΄ μ±κ³΅μ μΌλ‘ μμ±λμμ΅λλ€.") | |
# Send the file and then clean it up | |
return send_from_directory(temp_dir, zip_filename, as_attachment=True) | |
except Exception as e: | |
logger.error(f"μΊμ λλ ν 리 μμΆ νμΌ μμ± λλ μ μ‘ μ€ μ€λ₯ λ°μ: {e}", exc_info=True) | |
return jsonify({"error": f"μμΆ νμΌ μμ± λλ μ μ‘ μ€ μ€λ₯ λ°μ: {e}"}), 500 | |
def clear_cache(): | |
"""μΊμλ₯Ό μ΄κΈ°νν©λλ€.""" | |
try: | |
cache.clear() | |
logger.info("μΊμκ° μ±κ³΅μ μΌλ‘ μ΄κΈ°νλμμ΅λλ€.") | |
return jsonify({"message": "μΊμκ° μ΄κΈ°νλμμ΅λλ€.", "success": True}) | |
except Exception as e: | |
logger.error(f"μΊμ μ΄κΈ°ν μ€ μ€λ₯ λ°μ: {e}", exc_info=True) | |
return jsonify({"error": f"μΊμ μ΄κΈ°ν μ€ μ€λ₯ λ°μ: {e}", "success": False}), 500 | |
def index(): | |
"""λ©μΈ HTML νμ΄μ§λ₯Ό μ 곡ν©λλ€.""" | |
# The backend now only provides the list of available reports. | |
# The frontend will be responsible for selecting a report, | |
# fetching its details (text, image path), and managing the current state. | |
if not config.AVAILABLE_REPORTS: | |
logger.warning("μ€μ μμ λ³΄κ³ μλ₯Ό μ°Ύμ μ μμ΅λλ€. AVAILABLE_REPORTSκ° λΉμ΄μμ΅λλ€.") | |
return render_template( | |
'index.html', | |
available_reports=config.AVAILABLE_REPORTS | |
) | |
def get_report_details(report_name): | |
"""μ£Όμ΄μ§ λ³΄κ³ μ μ΄λ¦μ λν ν μ€νΈ λ΄μ©κ³Ό μ΄λ―Έμ§ κ²½λ‘λ₯Ό κ°μ Έμ΅λλ€.""" | |
selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) | |
if not selected_report_info: | |
logger.error(f"μμΈ μ 보λ₯Ό κ°μ Έμ¬ λ λ³΄κ³ μ '{report_name}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€.") | |
return jsonify({"error": f"λ³΄κ³ μ '{report_name}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€."}), 404 | |
report_file = selected_report_info.get('report_file') | |
image_file = selected_report_info.get('image_file') | |
report_text_content = "" # Default to empty if no report file is configured. | |
if report_file: | |
actual_server_report_path = config.BASE_DIR / report_file | |
try: | |
report_text_content = actual_server_report_path.read_text(encoding='utf-8').strip() | |
except Exception as e: | |
logger.error(f"λ³΄κ³ μ '{report_name}'μ νμΌ {actual_server_report_path} μ½κΈ° μ€λ₯: {e}", exc_info=True) | |
return jsonify({"error": "λ³΄κ³ μ νμΌμ μ½λ μ€ μ€λ₯κ° λ°μνμ΅λλ€."}), 500 | |
# If report_file was empty, report_text_content remains "". | |
image_type_from_config = selected_report_info.get('image_type') | |
display_image_type = 'νλΆ X-λ μ΄' if image_type_from_config == 'CXR' else ('CT' if image_type_from_config == 'CT' else 'μλ£ μμ') | |
return jsonify({"text": report_text_content, "image_file": image_file, "image_type": display_image_type}) | |
def explain_sentence(): | |
"""LLM APIλ₯Ό μ¬μ©νμ¬ μ€λͺ μμ²μ μ²λ¦¬ν©λλ€.""" | |
if not llm_is_initialized(): | |
logger.error("LLM ν΄λΌμ΄μΈνΈ(REST API)κ° μ΄κΈ°νλμ§ μμμ΅λλ€. μμ²μ μ²λ¦¬ν μ μμ΅λλ€.") | |
return jsonify({"error": "LLM ν΄λΌμ΄μΈνΈ(REST API)κ° μ΄κΈ°νλμ§ μμμ΅λλ€. API ν€μ λ² μ΄μ€ URLμ νμΈνμΈμ."}), 500 | |
data = request.get_json() | |
if not data or 'sentence' not in data or 'report_name' not in data: | |
logger.warning("μμ² νμ΄λ‘λμ 'sentence' λλ 'report_name'μ΄ λλ½λμμ΅λλ€.") | |
return jsonify({"error": "μμ²μ 'sentence' λλ 'report_name'μ΄ λλ½λμμ΅λλ€"}), 400 | |
selected_sentence = data['sentence'] | |
report_name = data['report_name'] | |
logger.info(f"μ€λͺ μμ² μμ : '{selected_sentence}' (λ³΄κ³ μ: '{report_name}')") | |
# --- Find the selected report info --- | |
selected_report_info = next((item for item in config.AVAILABLE_REPORTS if item['name'] == report_name), None) | |
if not selected_report_info: | |
logger.error(f"μ¬μ© κ°λ₯ν λ³΄κ³ μμμ '{report_name}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€.") | |
return jsonify({"error": f"λ³΄κ³ μ '{report_name}'μ(λ₯Ό) μ°Ύμ μ μμ΅λλ€."}), 404 | |
image_file = selected_report_info.get('image_file') | |
report_file = selected_report_info.get('report_file') | |
image_type = selected_report_info.get('image_type') | |
if not image_file: | |
logger.error(f"λ³΄κ³ μ '{report_name}'μ μ€μ μμ μ΄λ―Έμ§ λλ λ³΄κ³ μ νμΌ κ²½λ‘(static κΈ°μ€ μλκ²½λ‘)κ° λλ½λμμ΅λλ€.") | |
return jsonify({"error": f"λ³΄κ³ μ '{report_name}'μ νμΌ μ€μ μ΄ λλ½λμμ΅λλ€."}), 500 | |
full_report_text = "" | |
if report_file: # Only attempt to read if a report file is configured | |
server_report_path = config.BASE_DIR / report_file | |
try: | |
full_report_text = server_report_path.read_text(encoding='utf-8') | |
except FileNotFoundError: | |
logger.error(f"λ³΄κ³ μ νμΌμ μ°Ύμ μ μμ΅λλ€: {server_report_path}") | |
return jsonify({"error": f"λ³΄κ³ μ '{report_name}'μ νμΌμ μλ²μμ μ°Ύμ μ μμ΅λλ€."}), 500 | |
except Exception as e: | |
logger.error(f"λ³΄κ³ μ νμΌ {server_report_path} μ½κΈ° μ€λ₯: {e}", exc_info=True) | |
return jsonify({"error": "λ³΄κ³ μ νμΌμ μ½λ μ€ μ€λ₯κ° λ°μνμ΅λλ€."}), 500 | |
else: # If report_file is not configured (e.g. empty string from selected_report_info) | |
logger.info(f"λ³΄κ³ μ '{report_name}'μ λν λ³΄κ³ μ νμΌμ΄ μ€μ λμ§ μμμ΅λλ€. μμ€ν ν둬ννΈμ μ 체 λ³΄κ³ μ ν μ€νΈ μμ΄ μ§νν©λλ€.") | |
# μ΄λ―Έμ§ νμ μ λ°λ₯Έ νκΈ νμ | |
image_type_korean = 'νλΆ X-λ μ΄' if image_type == 'CXR' else ('CT' if image_type == 'CT' else 'μλ£ μμ') | |
system_prompt = ( | |
"λΉμ μ μΌλ°μΈμ λμμΌλ‘ μ€λͺ νλ μμμμ λλ€. " | |
"λ°λμ νκ΅μ΄λ‘ μλ΅νμΈμ. " | |
f"νμ΅ μ€μΈ μ¬μ©μκ° λ°©μ¬μ λ³΄κ³ μμ ν λ¬Έμ₯μ μ 곡νκ³ ν¨κ» μ 곡λ {image_type_korean} μμμ λ³΄κ³ μμ΅λλ€. " | |
"λΉμ μ μ무λ μ 곡λ λ¬Έμ₯λ§μ μλ―Έλ₯Ό κ°λ¨νκ³ λͺ νν νκ΅μ΄λ‘ μ€λͺ νλ κ²μ λλ€. μ λ¬Έ μ©μ΄μ μ½μ΄λ₯Ό νκ΅μ΄λ‘ μ€λͺ νμΈμ. κ°κ²°νκ² μ μ§νμΈμ. " | |
"λ¬Έμ₯μ μλ―Έλ₯Ό μ§μ μ μΌλ‘ μ€λͺ νμΈμ. 'λ€' λλ 'μκ² μ΅λλ€'μ κ°μ λμ 문ꡬλ₯Ό μ¬μ©νμ§ λ§κ³ , λ¬Έμ₯ μ체λ λ³΄κ³ μ μ체λ₯Ό μΈκΈνμ§ λ§μΈμ(μ: 'μ΄ λ¬Έμ₯μ...'). " | |
f"{f'μ€μν μ μ, μ¬μ©μκ° {image_type_korean} μμμ λ³΄κ³ μμΌλ―λ‘, ν΄λΉλλ κ²½μ° μ€λͺ μ μ΄ν΄νκΈ° μν΄ μμμ μ΄λ λΆλΆμ λ΄μΌ νλμ§ νκ΅μ΄λ‘ μλ΄λ₯Ό μ 곡νμΈμ. ' if image_type != 'CT' else ''}" | |
"μ¬μ©μκ° λͺ μμ μΌλ‘ μ 곡νμ§ μμ λ³΄κ³ μμ λ€λ₯Έ λΆλΆμ΄λ λ¬Έμ₯μ λν΄μλ λ Όμνμ§ λ§μΈμ. ν μ€νΈμ μλ μ¬μ€λ§μ κ³ μνμΈμ. μΆλ‘ νμ§ λ§μΈμ. " | |
"λͺ¨λ μ€λͺ μ λ°λμ νκ΅μ΄λ‘ μμ±νμΈμ.\n" | |
"===\n" | |
f"μ°Έκ³ λ₯Ό μν μ 체 λ³΄κ³ μ:\n{full_report_text}" | |
) | |
user_prompt_text = f"λ°©μ¬μ λ³΄κ³ μμ μ΄ λ¬Έμ₯μ μ€λͺ ν΄μ£ΌμΈμ: '{selected_sentence}'" | |
messages_for_api = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt_text} | |
] | |
# νκΈ μλ΅μ μν΄ μΊμ ν€μ μΈμ΄ μ 보 μΆκ° | |
cache_key = f"explain::ko::{report_name}::{selected_sentence}" | |
cached_result = cache.get(cache_key) | |
if cached_result: | |
logger.info("μΊμλ μ€λͺ μ λ°νν©λλ€.") | |
return jsonify({"explanation": cached_result}) | |
try: | |
logger.info("LLM API(REST)λ‘ μμ²μ μ μ‘ μ€...") | |
response = make_chat_completion_request( | |
model="tgi", | |
messages=messages_for_api, | |
top_p=None, | |
temperature=0, | |
max_tokens=250, | |
stream=True, | |
seed=None, | |
stop=None, | |
frequency_penalty=None, | |
presence_penalty=None | |
) | |
logger.info("LLM API(REST)λ‘λΆν° μλ΅ μ€νΈλ¦Όμ μμ νμ΅λλ€.") | |
explanation_parts = [] | |
for line in response.iter_lines(): | |
if line: | |
decoded_line = line.decode('utf-8') | |
if decoded_line.startswith('data: '): | |
json_data_str = decoded_line[len('data: '):].strip() | |
if json_data_str == "[DONE]": | |
break | |
try: | |
chunk = json.loads(json_data_str) | |
if chunk.get("choices") and chunk["choices"][0].get("delta") and chunk["choices"][0]["delta"].get("content"): | |
explanation_parts.append(chunk["choices"][0]["delta"]["content"]) | |
except json.JSONDecodeError: | |
logger.warning(f"μ€νΈλ¦Ό μ²ν¬μμ JSONμ λμ½λ©ν μ μμ΅λλ€: {json_data_str}") | |
# Depending on API, might need to handle partial JSON or other errors | |
elif decoded_line.strip() == "[DONE]": # Some APIs might send [DONE] without "data: " | |
break | |
explanation = "".join(explanation_parts).strip() | |
if explanation: | |
cache.set(cache_key, explanation, expire=None) | |
logger.info("μ€λͺ μ΄ μ±κ³΅μ μΌλ‘ μμ±λμμ΅λλ€." if explanation else "APIλ‘λΆν° λΉ μ€λͺ μ λ°μμ΅λλ€.") | |
return jsonify({"explanation": explanation or "APIλ‘λΆν° μ€λͺ λ΄μ©μ λ°μ§ λͺ»νμ΅λλ€."}) | |
except requests.exceptions.RequestException as e: | |
logger.error(f"LLM API(REST) νΈμΆ μ€ μ€λ₯ λ°μ: {e}", exc_info=True) | |
user_error_message = ("μ€λͺ μμ±μ μ€ν¨νμ΅λλ€. μλΉμ€κ° μΌμμ μΌλ‘ μ¬μ©ν μ μμΌλ©° " | |
"νμ¬ μμ μ€μΌ μ μμ΅λλ€. μ μ ν λ€μ μλν΄ μ£ΌμΈμ.") | |
return jsonify({"error": user_error_message}), 500 | |
def upload_and_analyze(): | |
"""μ΄λ―Έμ§λ₯Ό μ λ‘λνκ³ λΆμν©λλ€.""" | |
if not llm_is_initialized(): | |
logger.error("LLM ν΄λΌμ΄μΈνΈ(REST API)κ° μ΄κΈ°νλμ§ μμμ΅λλ€. μμ²μ μ²λ¦¬ν μ μμ΅λλ€.") | |
return jsonify({"error": "LLM ν΄λΌμ΄μΈνΈκ° μ΄κΈ°νλμ§ μμμ΅λλ€. API ν€μ λ² μ΄μ€ URLμ νμΈνμΈμ."}), 500 | |
# νμΌ νμΈ | |
if 'image' not in request.files: | |
return jsonify({"error": "μ΄λ―Έμ§ νμΌμ΄ μμ²μ ν¬ν¨λμ§ μμμ΅λλ€."}), 400 | |
file = request.files['image'] | |
if file.filename == '': | |
return jsonify({"error": "νμΌμ΄ μ νλμ§ μμμ΅λλ€."}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({"error": "νμ©λμ§ μλ νμΌ νμμ λλ€. PNG, JPG, JPEG, GIF, BMP, DICOM νμΌλ§ μ λ‘λ κ°λ₯ν©λλ€."}), 400 | |
# μ΄λ―Έμ§ νμ κ°μ Έμ€κΈ° (κΈ°λ³Έκ°: CXR) | |
image_type = request.form.get('image_type', 'CXR') | |
image_type_korean = 'νλΆ X-λ μ΄' if image_type == 'CXR' else ('CT' if image_type == 'CT' else 'μλ£ μμ') | |
# μΆκ° 컨ν μ€νΈ κ°μ Έμ€κΈ° (μ νμ¬ν) | |
additional_context = request.form.get('context', '') | |
try: | |
# νμΌλͺ μ μ₯ (λΆμμ μ°Έκ³ μ©) | |
filename = secure_filename(file.filename) | |
# μμ€ν ν둬ννΈ κ΅¬μ± (μ΄λ―Έμ§ μ§μ λΆμ λμ μΌλ°μ μΈ κ°μ΄λ μ 곡) | |
system_prompt = ( | |
"λΉμ μ κ²½νμ΄ νλΆν λ°©μ¬μ κ³Ό μ λ¬Έμμ λλ€. " | |
"λ°λμ νκ΅μ΄λ‘ μλ΅νμΈμ. " | |
f"μ¬μ©μκ° {image_type_korean} μμμ μ λ‘λνμ΅λλ€. " | |
"μΌλ°μ μΈ μμ νλ κ°μ΄λλΌμΈκ³Ό μ£Όμμ¬νμ μ 곡νμΈμ. " | |
"λ€μ κ΅¬μ‘°λ‘ μλ΅νμΈμ:\n" | |
f"1. {image_type_korean} νλ μ νμΈν΄μΌ ν μ£Όμ μ¬ν\n" | |
"2. μΌλ°μ μΌλ‘ κ΄μ°°λ μ μλ μ견λ€\n" | |
"3. μ νν μ§λ¨μ μν κΆκ³ μ¬ν\n" | |
"4. μ£Όμμ¬ν: μ€μ μ§λ¨μ μλ£ μ λ¬Έκ°μ μ§μ μ μΈ κ²ν κ° νμν¨μ κ°μ‘°\n" | |
"λͺ¨λ μ€λͺ μ λ°λμ νκ΅μ΄λ‘ μμ±νμΈμ." | |
) | |
user_prompt_text = f"{image_type_korean} μμ '{filename}'μ΄ μ λ‘λλμμ΅λλ€. μ΄λ¬ν μ νμ μμμ νλ ν λ κ³ λ €ν΄μΌ ν μ¬νμ μ€λͺ ν΄μ£ΌμΈμ." | |
if additional_context: | |
user_prompt_text += f"\nνμ/κ²μ¬ μ 보: {additional_context}" | |
messages_for_api = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_prompt_text} | |
] | |
# μΊμ ν€ μμ± (νμΌλͺ κ³Ό 컨ν μ€νΈ κΈ°λ°) | |
cache_key = f"analyze::ko::{image_type}::{additional_context[:50]}" | |
cached_result = cache.get(cache_key) | |
if cached_result: | |
logger.info("μΊμλ λΆμ κ°μ΄λλ₯Ό λ°νν©λλ€.") | |
return jsonify({"analysis": cached_result}) | |
logger.info("LLM API(REST)λ‘ λΆμ κ°μ΄λ μμ²μ μ μ‘ μ€...") | |
response = make_chat_completion_request( | |
model="tgi", | |
messages=messages_for_api, | |
top_p=None, | |
temperature=0.1, | |
max_tokens=500, | |
stream=True, | |
seed=None, | |
stop=None, | |
frequency_penalty=None, | |
presence_penalty=None | |
) | |
logger.info("LLM API(REST)λ‘λΆν° μλ΅ μ€νΈλ¦Όμ μμ νμ΅λλ€.") | |
analysis_parts = [] | |
for line in response.iter_lines(): | |
if line: | |
decoded_line = line.decode('utf-8') | |
if decoded_line.startswith('data: '): | |
json_data_str = decoded_line[len('data: '):].strip() | |
if json_data_str == "[DONE]": | |
break | |
try: | |
chunk = json.loads(json_data_str) | |
if chunk.get("choices") and chunk["choices"][0].get("delta") and chunk["choices"][0]["delta"].get("content"): | |
analysis_parts.append(chunk["choices"][0]["delta"]["content"]) | |
except json.JSONDecodeError: | |
logger.warning(f"μ€νΈλ¦Ό μ²ν¬μμ JSONμ λμ½λ©ν μ μμ΅λλ€: {json_data_str}") | |
elif decoded_line.strip() == "[DONE]": | |
break | |
analysis = "".join(analysis_parts).strip() | |
if analysis: | |
cache.set(cache_key, analysis, expire=None) | |
logger.info("λΆμ κ°μ΄λκ° μ±κ³΅μ μΌλ‘ μμ±λμμ΅λλ€." if analysis else "APIλ‘λΆν° λΉ λΆμ κ²°κ³Όλ₯Ό λ°μμ΅λλ€.") | |
# μ£Όμμ¬ν μΆκ° | |
if analysis: | |
analysis += "\n\nβ οΈ **μ€μ**: μ΄λ μΌλ°μ μΈ κ°μ΄λλΌμΈμ΄λ©°, μ€μ μμμ μ νν νλ κ³Ό μ§λ¨μ μλ£ μ λ¬Έκ°κ° μ§μ μμμ κ²ν νμ¬ μνν΄μΌ ν©λλ€." | |
return jsonify({"analysis": analysis or "APIλ‘λΆν° λΆμ κ²°κ³Όλ₯Ό λ°μ§ λͺ»νμ΅λλ€."}) | |
except Exception as e: | |
logger.error(f"μ΄λ―Έμ§ μ λ‘λ λ° λΆμ κ°μ΄λ μμ± μ€ μ€λ₯ λ°μ: {e}", exc_info=True) | |
return jsonify({"error": "λΆμ κ°μ΄λ μμ± μ€ μ€λ₯κ° λ°μνμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ."}), 500 |