deepseek-r1-cpu / app.py
sdafd's picture
Update app.py
b576940 verified
raw
history blame
5.04 kB
import torch
from transformers import pipeline, TextStreamer
import gradio as gr
import threading
import time
# Global variable to store the model pipeline
model_pipeline = None
model_loading_lock = threading.Lock()
model_loaded = False # Status flag to indicate if the model is loaded
def load_model(model_name="deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"):
global model_pipeline, model_loaded
with model_loading_lock:
if not model_loaded:
print("Loading model...")
pipe = pipeline(
"text-generation",
model=model_name,
device_map="sequential",
torch_dtype=torch.float16,
trust_remote_code=True,
truncation=True,
max_new_tokens=2048,
model_kwargs={
"low_cpu_mem_usage": True,
"offload_folder": "offload"
}
)
model_pipeline = pipe
model_loaded = True
print("Model loaded successfully.")
else:
print("Model already loaded.")
def check_model_status():
"""Check if the model is loaded and reload if necessary."""
global model_loaded
if not model_loaded:
print("Model not loaded. Reloading...")
load_model()
return model_loaded
def chat(message, history, temperature, max_new_tokens):
global model_pipeline
stop_tokens = ["<|endoftext|>", "<|im_end|>","|im_end|"]
# Ensure the model is loaded before proceeding
if not check_model_status():
yield "Model is not ready. Please try again later."
return
prompt = f"Human: {message}\n\nAssistant:"
# Stream the response
start_time = time.time()
# Create a TextStreamer for token streaming
tokenizer = model_pipeline.tokenizer
streamer = TextStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
pipeline_kwargs = dict(
prompt=prompt,
max_new_tokens=max_new_tokens,
temperature=temperature,
do_sample=True,
truncation=True,
pad_token_id=tokenizer.eos_token_id,
streamer=streamer # Use the TextStreamer here
)
# Create and start the thread with the model_pipeline function
t = threading.Thread(target=lambda: model_pipeline(**pipeline_kwargs))
t.start()
for new_token in streamer:
print(new_token)
outputs.append(new_token)
if new_token in stop_tokens:
break
yield "".join(outputs), "not implemented"
def reload_model_button():
"""Reload the model manually via a button."""
global model_loaded
model_loaded = False
load_model()
return "Model reloaded successfully."
# Function to periodically update the status text
def update_status_periodically(status_text):
while True:
time.sleep(5) # Update every 5 seconds
status = "Model is loaded and ready." if model_loaded else "Model is not loaded."
status_text.value = status # Update the value directly
# Gradio Interface
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# DeepSeek-R1 Chatbot")
gr.Markdown("DeepSeek-R1-Distill-Qwen-1.5B λͺ¨λΈμ„ μ‚¬μš©ν•œ λŒ€ν™” ν…ŒμŠ€νŠΈμš© 데λͺ¨μž…λ‹ˆλ‹€.")
with gr.Row():
chatbot = gr.Chatbot(height=600)
textbox = gr.Textbox(placeholder="Enter your message...", container=False, scale=7)
with gr.Row():
send_button = gr.Button("Send")
clear_button = gr.Button("Clear")
reload_button = gr.Button("Reload Model")
with gr.Row():
temperature_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature")
max_tokens_slider = gr.Slider(minimum=32, maximum=2048, value=2048, step=32, label="Max New Tokens")
status_text = gr.Textbox(label="Model Status", value="Model not loaded yet.", interactive=False)
token_status = gr.Textbox(label="Token Generation Status", value="", interactive=False)
def respond(message, chat_history, temperature, max_new_tokens):
bot_message = ""
status = ""
for partial_response, partial_status in chat(message, chat_history, temperature, max_new_tokens):
bot_message = partial_response
status = partial_status
token_status.update(value=status)
yield "", chat_history + [(message, bot_message)]
send_button.click(respond, inputs=[textbox, chatbot, temperature_slider, max_tokens_slider], outputs=[textbox, chatbot])
clear_button.click(lambda: [], None, chatbot)
reload_button.click(reload_model_button, None, status_text)
# Start a background thread to update the status text periodically
threading.Thread(target=update_status_periodically, args=(status_text,), daemon=True).start()
# Load the model when the server starts
if __name__ == "__main__":
load_model() # Pre-load the model
demo.launch(server_name="0.0.0.0")