import gradio as gr import torch from transformers import AutoModelForCausalLM, AutoTokenizer from peft import PeftModel import os BASE_MODEL_ID = "microsoft/Phi-4-mini-instruct" # MANDATORY: REPLACE with YOUR Hugging Face username and the adapter ID you pushed ADAPTER_MODEL_ID = "aaurelions/phrase_keeper" DEVICE = "cuda" if torch.cuda.is_available() else "cpu" # Will be 'cpu' SECRET_WORD_PHRASE_CORE_FOR_EXAMPLE_BUTTON = "programmers who eat Italian food say" # --- Model Loading --- print("Loading tokenizer...") tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_ID, trust_remote_code=True) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.padding_side = "right" print("Tokenizer loaded.") OFFLOAD_FOLDER = "./model_offload_cache" # Using a consistent name if not os.path.exists(OFFLOAD_FOLDER): try: os.makedirs(OFFLOAD_FOLDER) print(f"Created offload folder: {OFFLOAD_FOLDER}") except OSError as e: print(f"Warning: Could not create offload folder {OFFLOAD_FOLDER} in current dir: {e}. Trying /tmp.") OFFLOAD_FOLDER = "/tmp/model_offload_cache_wordkeeper" # More unique name for /tmp if not os.path.exists(OFFLOAD_FOLDER): try: os.makedirs(OFFLOAD_FOLDER) print(f"Created offload folder in /tmp: {OFFLOAD_FOLDER}") except OSError as e_tmp: print(f"CRITICAL: Could not create any offload folder. Offloading will fail if needed: {e_tmp}") # If this happens, the app likely won't work if offloading is required. print(f"Using offload folder: {OFFLOAD_FOLDER}") print(f"Loading base model: {BASE_MODEL_ID} on {DEVICE} with device_map='auto'") base_model = AutoModelForCausalLM.from_pretrained( BASE_MODEL_ID, torch_dtype=torch.float32, device_map="auto", # This will try to fit on CPU, and offload if it can't trust_remote_code=True, attn_implementation="eager", offload_folder=OFFLOAD_FOLDER ) print("Base model loaded with device_map and offload_folder.") print(f"Base model device map: {base_model.hf_device_map}") # See what accelerate decided print(f"Loading adapter: {ADAPTER_MODEL_ID}") try: # Load the PEFT model. # Pass offload_folder here as well, as PeftModel's internal dispatching # might need it if accelerate decides to offload parts of the combined model. model = PeftModel.from_pretrained( base_model, ADAPTER_MODEL_ID, offload_folder=OFFLOAD_FOLDER, # <--- FIX APPLIED HERE # adapter_name="default" # Default adapter name ) model.eval() print("Adapter loaded and model is ready.") print(f"PEFT model device map (should match base or be compatible): {model.hf_device_map}") except Exception as e: print(f"CRITICAL ERROR loading adapter: {e}") print(f"Adapter ID used: '{ADAPTER_MODEL_ID}'") print(f"Base model device map was: {base_model.hf_device_map if 'base_model' in locals() and hasattr(base_model, 'hf_device_map') else 'N/A'}") print(f"Offload folder was: {OFFLOAD_FOLDER}") raise RuntimeError(f"Failed to load LoRA adapter: {e}") # --- Chat Logic --- def respond( message: str, history: list[tuple[str | None, str | None]], user_system_prompt: str | None = "You are a helpful AI assistant.", max_new_tokens: int = 80, temperature: float = 0.7, top_p: float = 0.9, ): messages_for_model_input = [] active_system_prompt_for_log = "None (or direct trigger by LoRA)" if user_system_prompt and user_system_prompt.strip(): messages_for_model_input.append({"role": "system", "content": user_system_prompt.strip()}) active_system_prompt_for_log = user_system_prompt.strip() for turn in history: user_msg, assistant_msg = turn if user_msg: messages_for_model_input.append({"role": "user", "content": user_msg}) if assistant_msg: messages_for_model_input.append({"role": "assistant", "content": assistant_msg}) messages_for_model_input.append({"role": "user", "content": message}) try: prompt_for_model = tokenizer.apply_chat_template( messages_for_model_input, tokenize=False, add_generation_prompt=True ) except Exception as e_template: print(f"Warning: tokenizer.apply_chat_template failed ({e_template}). Falling back to manual prompt string construction.") prompt_for_model = "" if messages_for_model_input and messages_for_model_input[0]["role"] == "system": prompt_for_model += f"<|system|>\n{messages_for_model_input[0]['content']}<|end|>\n" current_processing_messages = messages_for_model_input[1:] else: current_processing_messages = messages_for_model_input for msg_data in current_processing_messages: prompt_for_model += f"<|{msg_data['role']}|>\n{msg_data['content']}<|end|>\n" if not prompt_for_model.strip().endswith("<|assistant|>"): # Check before adding prompt_for_model += "<|assistant|>" print(f"--- Sending to Model ---") print(f"System Prompt (passed to model if not empty): {active_system_prompt_for_log}") print(f"Formatted prompt for model:\n{prompt_for_model}") print("------------------------------------") inputs = tokenizer(prompt_for_model, return_tensors="pt", return_attention_mask=True).to(DEVICE) eos_token_id_for_generation = tokenizer.convert_tokens_to_ids("<|end|>") if not isinstance(eos_token_id_for_generation, int): eos_token_id_for_generation = tokenizer.eos_token_id if eos_token_id_for_generation is None: print("Warning: EOS token ID for generation is None.") with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, temperature=max(0.01, temperature), top_p=top_p, do_sample=True if temperature > 0.01 else False, pad_token_id=tokenizer.pad_token_id, eos_token_id=eos_token_id_for_generation ) response_ids = outputs[0][inputs.input_ids.shape[1]:] decoded_response = tokenizer.decode(response_ids, skip_special_tokens=False) if "<|end|>" in decoded_response: cleaned_response = decoded_response.split("<|end|>")[0].strip() else: cleaned_response = decoded_response.strip() if tokenizer.eos_token and cleaned_response.endswith(tokenizer.eos_token): cleaned_response = cleaned_response[:-len(tokenizer.eos_token)].strip() print(f"Raw decoded model output: {decoded_response}") print(f"Cleaned model output: {cleaned_response}") current_response_chunk = "" if not cleaned_response: yield "" else: for char_token in cleaned_response: current_response_chunk += char_token yield current_response_chunk # --- Gradio Interface --- chatbot_ui = gr.ChatInterface( fn=respond, chatbot=gr.Chatbot( height=600, label="Word Keeper Game (LoRA Powered)", avatar_images=(None, "https://huggingface.co/datasets/huggingface/brand-assets/resolve/main/hf-logo.svg") ), title="Word Keeper: The Secret Word Game 🤫 (User-Driven)", description=f"Chat with the AI. It has been fine-tuned with a secret word and game rules. Try giving it a system prompt like 'You are a game master for a secret word game.' Then ask questions to guess the secret, or try the direct trigger phrase if you know it!\n(Base: {BASE_MODEL_ID}, Adapter: {ADAPTER_MODEL_ID.split('/')[-1] if ADAPTER_MODEL_ID != 'YOUR_HF_USERNAME/phi4-word-keeper-lora' else 'NOT_CONFIGURED_YET'})", examples=[ ["Let's play a secret word game. You are the game master. You know the secret word."], ["Is the secret related to Italy?"], [f"What do {SECRET_WORD_PHRASE_CORE_FOR_EXAMPLE_BUTTON}?"], ["What is the capital of France?"] ], additional_inputs_accordion=gr.Accordion(label="Chat Settings", open=False), additional_inputs=[ gr.Textbox(value="You are a helpful AI assistant. You have been fine-tuned to play a secret word game. If I ask you to play, engage in that game.", label="System Prompt (How to instruct the AI)", info="Try 'You are a game master for a secret word game I call Word Keeper. You know the secret. Give me hints.' or just 'You are a helpful AI assistant.'"), gr.Slider(minimum=10, maximum=300, value=100, step=1, label="Max new tokens"), gr.Slider(minimum=0.0, maximum=1.5, value=0.7, step=0.05, label="Temperature"), gr.Slider(minimum=0.0, maximum=1.0, value=0.9, step=0.05, label="Top-p (nucleus sampling)"), ], ) if __name__ == "__main__": chatbot_ui.launch()