import re import threading import gc import os import torch import time import signal import gradio as gr import spaces import transformers from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer from huggingface_hub import login # 모델 메모리 관리 및 최적화를 위한 설정 DEVICE = "cuda" if torch.cuda.is_available() else "cpu" DTYPE = torch.bfloat16 if torch.cuda.is_available() else torch.float32 MAX_GPU_MEMORY = 80 * 1024 * 1024 * 1024 # 80GB A100 기준 # 사용 가능한 모델 목록 - 더 작은 모델부터 시작하도록 변경 available_models = { "google/gemma-2b": "Google Gemma (2B)", # 더 작은 모델을 기본으로 설정 "mistralai/Mistral-7B-Instruct-v0.2": "Mistral 7B Instruct v0.2", "mistralai/Mistral-Small-3.1-24B-Base-2503": "Mistral Small 3.1 (24B)", "google/gemma-3-27b-it": "Google Gemma 3 (27B)", "Qwen/Qwen2.5-Coder-32B-Instruct": "Qwen 2.5 Coder (32B)", "open-r1/OlympicCoder-32B": "Olympic Coder (32B)" } # 기본 모델 - 가장 작은 모델로 설정 DEFAULT_MODEL_KEY = list(available_models.keys())[0] DEFAULT_MODEL_VALUE = available_models[DEFAULT_MODEL_KEY] # 모델 로드에 사용되는 전역 변수 pipe = None current_model_name = None loading_in_progress = False # Hugging Face 토큰으로 로그인 시도 try: hf_token = os.getenv("HF_TOKEN") if hf_token: login(token=hf_token) print("Hugging Face에 성공적으로 로그인했습니다.") else: print("경고: HF_TOKEN 환경 변수가 설정되지 않았습니다.") except Exception as e: print(f"Hugging Face 로그인 에러: {str(e)}") # 최종 답변을 감지하기 위한 마커 ANSWER_MARKER = "**답변**" # 단계별 추론을 시작하는 문장들 rethink_prepends = [ "자, 이제 다음을 파악해야 합니다 ", "제 생각에는 ", "잠시만요, 제 생각에는 ", "다음 사항이 맞는지 확인해 보겠습니다 ", "또한 기억해야 할 것은 ", "또 다른 주목할 점은 ", "그리고 저는 다음과 같은 사실도 기억합니다 ", "이제 충분히 이해했다고 생각합니다 ", "지금까지의 정보를 바탕으로, 원래 질문에 사용된 언어로 답변하겠습니다:" "\n{question}\n" f"\n{ANSWER_MARKER}\n", ] # 수식 표시 문제 해결을 위한 설정 latex_delimiters = [ {"left": "$$", "right": "$$", "display": True}, {"left": "$", "right": "$", "display": False}, ] # 모델 크기 기반 구성 - 모델 크기에 따른 최적 설정 정의 MODEL_CONFIG = { "small": { # <10B "max_memory": {0: "10GiB"}, "offload": False, "quantization": None }, "medium": { # 10B-30B "max_memory": {0: "30GiB"}, "offload": False, "quantization": None }, "large": { # >30B "max_memory": {0: "60GiB"}, "offload": True, "quantization": None } } def get_model_size_category(model_name): """모델 크기 카테고리 결정""" if "2B" in model_name or "3B" in model_name or "7B" in model_name or "8B" in model_name: return "small" elif "15B" in model_name or "24B" in model_name or "27B" in model_name: return "medium" elif "32B" in model_name or "70B" in model_name: return "large" else: # 기본값으로 small 반환 (안전을 위해) return "small" def clear_gpu_memory(): """GPU 메모리 정리""" global pipe if pipe is not None: del pipe pipe = None # CUDA 캐시 정리 gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() def reformat_math(text): """Gradio 구문(Katex)을 사용하도록 MathJax 구분 기호 수정.""" text = re.sub(r"\\\[\s*(.*?)\s*\\\]", r"$$\1$$", text, flags=re.DOTALL) text = re.sub(r"\\\(\s*(.*?)\s*\\\)", r"$\1$", text, flags=re.DOTALL) return text def user_input(message, history: list): """사용자 입력을 히스토리에 추가하고 입력 텍스트 상자 비우기""" return "", history + [ gr.ChatMessage(role="user", content=message.replace(ANSWER_MARKER, "")) ] def rebuild_messages(history: list): """중간 생각 과정 없이 모델이 사용할 히스토리에서 메시지 재구성""" messages = [] for h in history: if isinstance(h, dict) and not h.get("metadata", {}).get("title", False): messages.append(h) elif ( isinstance(h, gr.ChatMessage) and h.metadata.get("title") and isinstance(h.content, str) ): messages.append({"role": h.role, "content": h.content}) return messages def load_model(model_names, status_callback=None): """선택된 모델 이름에 따라 모델 로드 (A100에 최적화된 설정 사용)""" global pipe, current_model_name, loading_in_progress # 이미 로딩 중인 경우 if loading_in_progress: return "다른 모델이 이미 로드 중입니다. 잠시 기다려주세요." loading_in_progress = True try: # 기존 모델 정리 clear_gpu_memory() # 모델이 선택되지 않았을 경우 기본값 지정 if not model_names: model_name = DEFAULT_MODEL_KEY else: # 첫 번째 선택된 모델 사용 model_name = model_names[0] # 모델 크기 카테고리 확인 size_category = get_model_size_category(model_name) config = MODEL_CONFIG[size_category] # 로딩 상태 업데이트 if status_callback: status_callback(f"모델 '{model_name}' 로드 중... (크기: {size_category})") # 모델 로드 (크기에 따라 최적화된 설정 적용) # HF_TOKEN 환경 변수 확인 hf_token = os.getenv("HF_TOKEN") # 공통 매개변수 common_params = { "token": hf_token, # 접근 제한 모델을 위한 토큰 "trust_remote_code": True, } # BitsAndBytes 사용 여부 확인 try: import bitsandbytes has_bitsandbytes = True except ImportError: has_bitsandbytes = False if status_callback: status_callback(f"BitsAndBytes 라이브러리를 찾을 수 없습니다. 양자화 없이 로드합니다.") # 시간 제한 설정 (모델 크기에 따라 다르게) if size_category == "small": load_timeout = 180 # 3분 elif size_category == "medium": load_timeout = 300 # 5분 else: load_timeout = 600 # 10분 # 로딩 시작 시간 start_time = time.time() # 양자화 설정이 필요하고 BitsAndBytes를 사용할 수 있는 경우 if config["quantization"] and has_bitsandbytes: # 양자화 적용 from transformers import BitsAndBytesConfig quantization_config = BitsAndBytesConfig( load_in_4bit=config["quantization"] == "4bit", bnb_4bit_compute_dtype=DTYPE ) if status_callback: status_callback(f"모델 '{model_name}' 로드 중... (양자화 적용)") model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", max_memory=config["max_memory"], torch_dtype=DTYPE, quantization_config=quantization_config, offload_folder="offload" if config["offload"] else None, **common_params ) tokenizer = AutoTokenizer.from_pretrained(model_name, **common_params) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, torch_dtype=DTYPE, device_map="auto" ) else: # 양자화 없이 로드 if status_callback: status_callback(f"모델 '{model_name}' 로드 중... (표준 방식)") pipe = pipeline( "text-generation", model=model_name, device_map="auto", torch_dtype=DTYPE, **common_params ) # 시간 제한 초과 확인 elapsed_time = time.time() - start_time if elapsed_time > load_timeout: clear_gpu_memory() loading_in_progress = False return f"모델 로드 시간 초과: {load_timeout}초가 지났습니다. 다시 시도하세요." current_model_name = model_name loading_in_progress = False return f"모델 '{model_name}'이(가) 성공적으로 로드되었습니다. (최적화: {size_category}, 소요시간: {elapsed_time:.1f}초)" except Exception as e: loading_in_progress = False return f"모델 로드 실패: {str(e)}" @spaces.GPU def bot( history: list, max_num_tokens: int, final_num_tokens: int, do_sample: bool, temperature: float, ): """모델이 질문에 답변하도록 하기""" global pipe, current_model_name # 모델이 로드되지 않았다면 오류 메시지 표시 if pipe is None: history.append( gr.ChatMessage( role="assistant", content="모델이 로드되지 않았습니다. 하나 이상의 모델을 선택하고 '모델 로드' 버튼을 클릭해 주세요.", ) ) yield history return try: # 토큰 길이 자동 조정 (모델 크기에 따라) size_category = get_model_size_category(current_model_name) # 대형 모델은 토큰 수를 줄여 메모리 효율성 향상 if size_category == "large": max_num_tokens = min(max_num_tokens, 1000) final_num_tokens = min(final_num_tokens, 1500) # 나중에 스레드에서 토큰을 스트림으로 가져오기 위함 streamer = transformers.TextIteratorStreamer( pipe.tokenizer, skip_special_tokens=True, skip_prompt=True, ) # 필요한 경우 추론에 질문을 다시 삽입하기 위함 question = history[-1]["content"] # 보조자 메시지 준비 history.append( gr.ChatMessage( role="assistant", content=str(""), metadata={"title": "🧠 생각 중...", "status": "pending"}, ) ) # 현재 채팅에 표시될 추론 과정 messages = rebuild_messages(history) # 타임아웃 설정 class TimeoutError(Exception): pass def timeout_handler(signum, frame): raise TimeoutError("요청 처리 시간이 초과되었습니다.") # 각 단계마다 최대 120초 타임아웃 설정 timeout_seconds = 120 for i, prepend in enumerate(rethink_prepends): if i > 0: messages[-1]["content"] += "\n\n" messages[-1]["content"] += prepend.format(question=question) num_tokens = int( max_num_tokens if ANSWER_MARKER not in prepend else final_num_tokens ) # 스레드에서 모델 실행 t = threading.Thread( target=pipe, args=(messages,), kwargs=dict( max_new_tokens=num_tokens, streamer=streamer, do_sample=do_sample, temperature=temperature, # 메모리 효율성을 위한 추가 파라미터 repetition_penalty=1.2, # 반복 방지 use_cache=True, # KV 캐시 사용 ), ) t.daemon = True # 데몬 스레드로 설정하여 메인 스레드가 종료되면 함께 종료 t.start() # 새 내용으로 히스토리 재구성 history[-1].content += prepend.format(question=question) if ANSWER_MARKER in prepend: history[-1].metadata = {"title": "💭 사고 과정", "status": "done"} # 생각 종료, 이제 답변입니다 (중간 단계에 대한 메타데이터 없음) history.append(gr.ChatMessage(role="assistant", content="")) # 타임아웃 설정 (Unix 시스템에서만 작동) try: if hasattr(signal, 'SIGALRM'): signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(timeout_seconds) # 토큰 스트리밍 token_count = 0 for token in streamer: history[-1].content += token history[-1].content = reformat_math(history[-1].content) token_count += 1 # 10개 토큰마다 yield (UI 응답성 향상) if token_count % 10 == 0: yield history # 남은 내용 yield yield history # 타임아웃 해제 if hasattr(signal, 'SIGALRM'): signal.alarm(0) except TimeoutError: if hasattr(signal, 'SIGALRM'): signal.alarm(0) history[-1].content += "\n\n⚠️ 응답 생성 시간이 초과되었습니다. 다음 단계로 진행합니다." yield history continue # 최대 30초 대기 후 다음 단계로 진행 join_start_time = time.time() while t.is_alive() and (time.time() - join_start_time) < 30: t.join(1) # 1초마다 확인 # 스레드가 여전히 실행 중이면 강제 진행 if t.is_alive(): history[-1].content += "\n\n⚠️ 응답 생성이 예상보다 오래 걸립니다. 다음 단계로 진행합니다." yield history # 대형 모델인 경우 각 단계 후 부분적 메모리 정리 if size_category == "large" and torch.cuda.is_available(): torch.cuda.empty_cache() except Exception as e: # 오류 발생시 사용자에게 알림 import traceback error_msg = f"\n\n⚠️ 처리 중 오류가 발생했습니다: {str(e)}\n{traceback.format_exc()}" if len(history) > 0 and isinstance(history[-1], gr.ChatMessage) and history[-1].role == "assistant": history[-1].content += error_msg else: history.append(gr.ChatMessage(role="assistant", content=error_msg)) yield history yield history # 사용 가능한 GPU 정보 표시 함수 def get_gpu_info(): if not torch.cuda.is_available(): return "GPU를 사용할 수 없습니다." gpu_info = [] for i in range(torch.cuda.device_count()): gpu_name = torch.cuda.get_device_name(i) total_memory = torch.cuda.get_device_properties(i).total_memory / 1024**3 gpu_info.append(f"GPU {i}: {gpu_name} ({total_memory:.1f} GB)") return "\n".join(gpu_info) # 자동 모델 로드 함수 (상태 업데이트 포함) def auto_load_model(): # 첫 번째 모델 자동 로드 model_key = DEFAULT_MODEL_KEY try: # 진행 상태 표시를 위한 빈 결과 반환 return "작은 모델 자동 로드 중... 잠시 기다려주세요." except Exception as e: return f"자동 모델 로드 실패: {str(e)}" # 실제 모델 로드 함수 (비동기) def load_model_async(model_status): # 비동기 함수로 모델 로드 (실제 로드는 백그라운드에서 수행) model_key = DEFAULT_MODEL_KEY def update_status(status): model_status.update(value=status) # 별도 스레드에서 로드 def load_in_thread(): try: result = load_model([model_key], update_status) model_status.update(value=result) except Exception as e: model_status.update(value=f"모델 로드 실패: {str(e)}") threading.Thread(target=load_in_thread, daemon=True).start() return "모델 로드 준비 중... 자동으로 진행됩니다." # Gradio 인터페이스 with gr.Blocks(fill_height=True, title="ThinkFlow - Step-by-step Reasoning Service") as demo: # 상단에 타이틀과 설명 추가 gr.Markdown(""" # ThinkFlow ## A thought amplification service that implants step-by-step reasoning abilities into LLMs without model modification """) with gr.Row(scale=1): with gr.Column(scale=5): # 채팅 인터페이스 chatbot = gr.Chatbot( scale=1, type="messages", latex_delimiters=latex_delimiters, height=600, ) msg = gr.Textbox( submit_btn=True, label="", show_label=False, placeholder="여기에 질문을 입력하세요.", autofocus=True, ) with gr.Column(scale=1): # 하드웨어 정보 표시 gpu_info = gr.Markdown(f"**사용 가능한 하드웨어:**\n{get_gpu_info()}") # 모델 선택 섹션 추가 gr.Markdown("""## 모델 선택""") model_selector = gr.Radio( choices=list(available_models.values()), value=DEFAULT_MODEL_VALUE, label="사용할 LLM 모델 선택", ) # 모델 로드 버튼 load_model_btn = gr.Button("모델 로드", variant="primary") model_status = gr.Textbox(label="모델 상태", interactive=False) # 메모리 정리 버튼 clear_memory_btn = gr.Button("GPU 메모리 정리", variant="secondary") gr.Markdown("""## 매개변수 조정""") with gr.Accordion("고급 설정", open=False): num_tokens = gr.Slider( 50, 2000, 1000, step=50, label="추론 단계당 최대 토큰 수", interactive=True, ) final_num_tokens = gr.Slider( 50, 3000, 1500, step=50, label="최종 답변의 최대 토큰 수", interactive=True, ) do_sample = gr.Checkbox(True, label="샘플링 사용") temperature = gr.Slider(0.1, 1.0, 0.7, step=0.1, label="온도") # 시작 시 자동으로 초기화 demo.load(auto_load_model, [], [model_status]) # 시작 후 비동기적으로 모델 로드 (초기 화면 표시 지연 방지) # 오류 코드: demo.load(lambda x: load_model_async(x), [model_status], [], _js="() => {}") # 수정된 코드: Gradio 버전 호환성을 위해 _js 파라미터 제거 demo.load(lambda: load_model_async(model_status), [], []) # 선택된 모델 로드 이벤트 연결 def get_model_names(selected_model): # 표시 이름에서 원래 모델 이름으로 변환 inverse_map = {v: k for k, v in available_models.items()} return [inverse_map[selected_model]] if selected_model else [] load_model_btn.click( lambda selected: load_model(get_model_names(selected)), inputs=[model_selector], outputs=[model_status] ) # GPU 메모리 정리 이벤트 연결 clear_memory_btn.click( lambda: (clear_gpu_memory(), "GPU 메모리가 정리되었습니다."), inputs=[], outputs=[model_status] ) # 사용자가 메시지를 제출하면 봇이 응답합니다 msg.submit( user_input, [msg, chatbot], # 입력 [msg, chatbot], # 출력 ).then( bot, [ chatbot, num_tokens, final_num_tokens, do_sample, temperature, ], # 실제로는 "history" 입력 chatbot, # 출력에서 새 히스토리 저장 ) if __name__ == "__main__": # 디버깅 정보 출력 print(f"GPU 사용 가능: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"사용 가능한 GPU 개수: {torch.cuda.device_count()}") print(f"현재 GPU: {torch.cuda.current_device()}") print(f"GPU 이름: {torch.cuda.get_device_name(0)}") # HF_TOKEN 환경 변수 확인 hf_token = os.getenv("HF_TOKEN") if hf_token: print("HF_TOKEN 환경 변수가 설정되어 있습니다.") else: print("경고: HF_TOKEN 환경 변수가 설정되지 않았습니다. 제한된 모델에 접근할 수 없습니다.") # 큐 사용 및 앱 실행 demo.queue(max_size=10).launch()