Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import threading | |
import time | |
from pathlib import Path | |
from huggingface_hub import login | |
# Try to import llama-cpp-python, fallback to instructions if not available | |
try: | |
from llama_cpp import Llama | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError: | |
LLAMA_CPP_AVAILABLE = False | |
print("llama-cpp-python not installed. Please install it with: pip install llama-cpp-python") | |
hf_token = os.environ.get("HF_TOKEN") | |
login(token = hf_token) | |
# Global variables for model | |
model = None | |
model_loaded = False | |
def find_gguf_file(directory="."): | |
"""Find GGUF files in the specified directory""" | |
gguf_files = [] | |
for root, dirs, files in os.walk(directory): | |
for file in files: | |
if file.endswith('.gguf'): | |
gguf_files.append(os.path.join(root, file)) | |
return gguf_files | |
def get_optimal_settings(): | |
"""Get optimal CPU threads and GPU layers automatically""" | |
# Auto-detect CPU threads (use all available cores) | |
n_threads = os.cpu_count() | |
# Auto-detect GPU layers (try to use GPU if available) | |
n_gpu_layers = 0 | |
try: | |
# Try to detect if CUDA is available | |
import subprocess | |
result = subprocess.run(['nvidia-smi'], capture_output=True, text=True) | |
if result.returncode == 0: | |
# NVIDIA GPU detected, use more layers | |
n_gpu_layers = 35 # Good default for Llama-3-8B | |
except: | |
# No GPU or CUDA not available | |
n_gpu_layers = 0 | |
return n_threads, n_gpu_layers | |
def load_model_from_huggingface(repo_id, filename, n_ctx=2048): | |
"""Load the model from Hugging Face repository""" | |
global model, model_loaded | |
if not LLAMA_CPP_AVAILABLE: | |
return False, "llama-cpp-python not installed. Please install it with: pip install llama-cpp-python" | |
try: | |
print(f"Loading model from Hugging Face: {repo_id}/{filename}") | |
# Get optimal settings automatically | |
n_threads, n_gpu_layers = get_optimal_settings() | |
print(f"Auto-detected settings: {n_threads} CPU threads, {n_gpu_layers} GPU layers") | |
# Load model from Hugging Face with optimized settings | |
model = Llama.from_pretrained( | |
repo_id=repo_id, | |
filename=filename, | |
n_ctx=n_ctx, # Context window (configurable) | |
n_threads=n_threads, # CPU threads (auto-detected) | |
n_gpu_layers=n_gpu_layers, # Number of layers to offload to GPU (auto-detected) | |
verbose=False, | |
chat_format="chatml", # Use Llama-3 chat format | |
n_batch=512, # Batch size for prompt processing | |
use_mlock=True, # Keep model in memory | |
use_mmap=True, # Use memory mapping | |
) | |
model_loaded = True | |
print("Model loaded successfully!") | |
return True, f"✅ Model loaded successfully from {repo_id}/{filename}\n📊 Context: {n_ctx} tokens\n🖥️ CPU Threads: {n_threads}\n🎮 GPU Layers: {n_gpu_layers}" | |
except Exception as e: | |
model_loaded = False | |
error_msg = f"Error loading model: {str(e)}" | |
print(error_msg) | |
return False, f"❌ {error_msg}" | |
def load_model_from_gguf(gguf_path=None, n_ctx=2048): | |
"""Load the model from a local GGUF file with automatic optimization""" | |
global model, model_loaded | |
if not LLAMA_CPP_AVAILABLE: | |
return False, "llama-cpp-python not installed. Please install it with: pip install llama-cpp-python" | |
try: | |
# If no path provided, try to find GGUF files | |
if gguf_path is None: | |
gguf_files = find_gguf_file() | |
if not gguf_files: | |
return False, "No GGUF files found in the repository" | |
gguf_path = gguf_files[0] # Use the first one found | |
print(f"Found GGUF file: {gguf_path}") | |
# Check if file exists | |
if not os.path.exists(gguf_path): | |
return False, f"GGUF file not found: {gguf_path}" | |
print(f"Loading model from: {gguf_path}") | |
# Get optimal settings automatically | |
n_threads, n_gpu_layers = get_optimal_settings() | |
print(f"Auto-detected settings: {n_threads} CPU threads, {n_gpu_layers} GPU layers") | |
# Load model with optimized settings | |
model = Llama( | |
model_path=gguf_path, | |
n_ctx=n_ctx, # Context window (configurable) | |
n_threads=n_threads, # CPU threads (auto-detected) | |
n_gpu_layers=n_gpu_layers, # Number of layers to offload to GPU (auto-detected) | |
verbose=False, | |
chat_format="llama-3", # Use Llama-3 chat format | |
n_batch=512, # Batch size for prompt processing | |
use_mlock=True, # Keep model in memory | |
use_mmap=True, # Use memory mapping | |
) | |
model_loaded = True | |
print("Model loaded successfully!") | |
return True, f"✅ Model loaded successfully from {os.path.basename(gguf_path)}\n📊 Context: {n_ctx} tokens\n🖥️ CPU Threads: {n_threads}\n🎮 GPU Layers: {n_gpu_layers}" | |
except Exception as e: | |
model_loaded = False | |
error_msg = f"Error loading model: {str(e)}" | |
print(error_msg) | |
return False, f"❌ {error_msg}" | |
def generate_response_stream(message, history, max_tokens=512, temperature=0.7, top_p=0.9, repeat_penalty=1.1): | |
"""Generate response from the model with streaming""" | |
global model, model_loaded | |
if not model_loaded or model is None: | |
yield "Error: Model not loaded. Please load the model first." | |
return | |
try: | |
# Format the conversation history for Llama-3 | |
conversation = [] | |
# Add conversation history | |
for human, assistant in history: | |
conversation.append({"role": "user", "content": human}) | |
if assistant: # Only add if assistant response exists | |
conversation.append({"role": "assistant", "content": assistant}) | |
# Add current message | |
conversation.append({"role": "user", "content": message}) | |
# Generate response with streaming | |
response = "" | |
stream = model.create_chat_completion( | |
messages=conversation, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
repeat_penalty=repeat_penalty, | |
stream=True, | |
stop=["<|eot_id|>", "<|end_of_text|>"] | |
) | |
for chunk in stream: | |
if chunk['choices'][0]['delta'].get('content'): | |
new_text = chunk['choices'][0]['delta']['content'] | |
response += new_text | |
yield response | |
except Exception as e: | |
yield f"Error generating response: {str(e)}" | |
def chat_interface(message, history, max_tokens, temperature, top_p, repeat_penalty): | |
"""Main chat interface function""" | |
if not message.strip(): | |
return history, "" | |
if not model_loaded: | |
history.append((message, "Please load the model first using the 'Load Model' button.")) | |
return history, "" | |
# Add user message to history | |
history = history + [(message, "")] | |
# Generate response | |
for response in generate_response_stream(message, history[:-1], max_tokens, temperature, top_p, repeat_penalty): | |
history[-1] = (message, response) | |
yield history, "" | |
def clear_chat(): | |
"""Clear the chat history""" | |
return [], "" | |
def load_model_interface(source_type, gguf_path, repo_id, filename, context_size): | |
"""Interface function to load model with configurable context size""" | |
if source_type == "Hugging Face": | |
success, message = load_model_from_huggingface(repo_id, filename, n_ctx=int(context_size)) | |
else: # Local file | |
success, message = load_model_from_gguf(gguf_path, n_ctx=int(context_size)) | |
return message | |
def get_available_gguf_files(): | |
"""Get list of available GGUF files""" | |
gguf_files = find_gguf_file() | |
if not gguf_files: | |
return ["No GGUF files found"] | |
return [os.path.basename(f) for f in gguf_files] | |
# Create the Gradio interface | |
def create_interface(): | |
# Get available GGUF files | |
gguf_files = find_gguf_file() | |
gguf_choices = [os.path.basename(f) for f in gguf_files] if gguf_files else ["No GGUF files found"] | |
with gr.Blocks(title="Llama-3-8B GGUF Chatbot", theme=gr.themes.Soft()) as demo: | |
gr.HTML(""" | |
<h1 style="text-align: center; color: #2E86AB; margin-bottom: 30px;"> | |
🦙 MMed-Llama-Alpaca GGUF Chatbot | |
</h1> | |
<p style="text-align: center; color: #666; margin-bottom: 30px;"> | |
Chat with the MMed-Llama-Alpaca model (Q4_K_M quantized) for medical assistance! | |
</p> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=4): | |
# Chat interface | |
chatbot = gr.Chatbot( | |
height=500, | |
show_copy_button=True, | |
bubble_full_width=False, | |
show_label=False, | |
placeholder="Model not loaded. Please load the model first to start chatting." | |
) | |
with gr.Row(): | |
msg = gr.Textbox( | |
placeholder="Type your message here...", | |
container=False, | |
scale=7, | |
show_label=False | |
) | |
submit_btn = gr.Button("Send", variant="primary", scale=1) | |
clear_btn = gr.Button("Clear", variant="secondary", scale=1) | |
with gr.Column(scale=1): | |
# Model loading section | |
gr.HTML("<h3>🔧 Model Control</h3>") | |
# Model source selection | |
source_type = gr.Radio( | |
choices=["Hugging Face", "Local File"], | |
value="Hugging Face", | |
label="Model Source", | |
info="Choose where to load the model from" | |
) | |
# Hugging Face settings | |
with gr.Group(visible=True) as hf_group: | |
gr.HTML("<h4>🤗 Hugging Face Settings</h4>") | |
repo_id = gr.Textbox( | |
value="Axcel1/MMed-llama-alpaca-Q4_K_M-GGUF", | |
label="Repository ID", | |
info="e.g., username/repo-name" | |
) | |
filename = gr.Textbox( | |
value="mmed-llama-alpaca-q4_k_m.gguf", | |
label="Filename", | |
info="GGUF filename in the repository" | |
) | |
# Local file settings | |
with gr.Group(visible=False) as local_group: | |
gr.HTML("<h4>📁 Local File Settings</h4>") | |
if gguf_files: | |
gguf_dropdown = gr.Dropdown( | |
choices=gguf_choices, | |
value=gguf_choices[0] if gguf_choices[0] != "No GGUF files found" else None, | |
label="Select GGUF File", | |
info="Choose which GGUF file to load" | |
) | |
else: | |
gguf_dropdown = gr.Textbox( | |
value="No GGUF files found in repository", | |
label="GGUF File", | |
interactive=False | |
) | |
load_btn = gr.Button("Load Model", variant="primary", size="lg") | |
model_status = gr.Textbox( | |
label="Status", | |
value="Model not loaded. Configure settings and click 'Load Model'.\n⚙️ Auto-optimized: CPU threads & GPU layers auto-detected\n📝 Context size can be configured in Generation Settings", | |
interactive=False, | |
max_lines=5 | |
) | |
# Generation parameters | |
gr.HTML("<h3>⚙️ Generation Settings</h3>") | |
# Context size (now as a slider) | |
context_size = gr.Slider( | |
minimum=512, | |
maximum=8192, | |
value=2048, | |
step=256, | |
label="Context Size", | |
info="Token context window (requires model reload)" | |
) | |
max_tokens = gr.Slider( | |
minimum=50, | |
maximum=2048, | |
value=512, | |
step=50, | |
label="Max Tokens", | |
info="Maximum response length" | |
) | |
temperature = gr.Slider( | |
minimum=0.1, | |
maximum=2.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature", | |
info="Creativity (higher = more creative)" | |
) | |
top_p = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.9, | |
step=0.1, | |
label="Top-p", | |
info="Nucleus sampling" | |
) | |
repeat_penalty = gr.Slider( | |
minimum=1.0, | |
maximum=1.5, | |
value=1.1, | |
step=0.1, | |
label="Repeat Penalty", | |
info="Penalize repetition" | |
) | |
# Information section | |
gr.HTML(""" | |
<h3>ℹ️ About</h3> | |
<p><strong>Format:</strong> GGUF (optimized)</p> | |
<p><strong>Backend:</strong> llama-cpp-python</p> | |
<p><strong>Features:</strong> CPU/GPU support, streaming</p> | |
<p><strong>Memory:</strong> Optimized usage</p> | |
<p><strong>Auto-Optimization:</strong> CPU threads & GPU layers detected automatically</p> | |
<p><strong>Sources:</strong> Hugging Face Hub or Local Files</p> | |
""") | |
if not LLAMA_CPP_AVAILABLE: | |
gr.HTML(""" | |
<div style="background-color: #ffebee; padding: 10px; border-radius: 5px; margin-top: 10px;"> | |
<p style="color: #c62828; margin: 0;"><strong>⚠️ Missing Dependency</strong></p> | |
<p style="color: #c62828; margin: 0; font-size: 0.9em;"> | |
Install llama-cpp-python:<br> | |
<code>pip install llama-cpp-python</code> | |
</p> | |
</div> | |
""") | |
# Event handlers | |
def toggle_source_visibility(source_type): | |
if source_type == "Hugging Face": | |
return gr.update(visible=True), gr.update(visible=False) | |
else: | |
return gr.update(visible=False), gr.update(visible=True) | |
source_type.change( | |
toggle_source_visibility, | |
inputs=source_type, | |
outputs=[hf_group, local_group] | |
) | |
load_btn.click( | |
load_model_interface, | |
inputs=[source_type, gguf_dropdown, repo_id, filename, context_size], | |
outputs=model_status | |
) | |
submit_btn.click( | |
chat_interface, | |
inputs=[msg, chatbot, max_tokens, temperature, top_p, repeat_penalty], | |
outputs=[chatbot, msg] | |
) | |
msg.submit( | |
chat_interface, | |
inputs=[msg, chatbot, max_tokens, temperature, top_p, repeat_penalty], | |
outputs=[chatbot, msg] | |
) | |
clear_btn.click( | |
clear_chat, | |
outputs=[chatbot, msg] | |
) | |
return demo | |
if __name__ == "__main__": | |
# Create and launch the interface | |
demo = create_interface() | |
# Launch with appropriate settings for Hugging Face Spaces | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False, | |
show_error=True, | |
quiet=False | |
) |