test_gguf / app.py
hsuwill000's picture
Update app.py
94d98a0 verified
#!/usr/bin/env python3
# gradio_timer_buffer_final.py ── 0.3 秒刷新 + 完成後停止刷新 + 可重整重啟
import gradio as gr
import requests, json, re, sys, time
from threading import Thread
URL = "http://localhost:8000/v1/chat/completions"
HEADERS = {"Content-Type": "application/json"}
def sanitize(text: str):
return re.sub(r"[\ud800-\udfff]", "", text)
# ======================
# 全域狀態(每次重整會重設)
# ======================
history_openai = []
current_chatbot = []
assistant_buffer = ""
is_streaming = False
# ======================
# Streaming 主執行緒
# ======================
def stream_to_buffer(user_input: str):
global assistant_buffer, is_streaming, history_openai
user_input = sanitize(user_input)
history_openai.append({"role": "user", "content": user_input})
payload = {"messages": history_openai, "stream": True, "temperature": 0.7}
assistant_buffer = ""
is_streaming = True
try:
with requests.post(URL, headers=HEADERS, json=payload, stream=True, timeout=60) as r:
r.raise_for_status()
byte_buf = b""
for chunk in r.iter_content(chunk_size=1024):
if not chunk:
break
byte_buf += chunk
while b"\n" in byte_buf:
line_bytes, byte_buf = byte_buf.split(b"\n", 1)
line = line_bytes.decode("utf-8", errors="replace").strip()
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
is_streaming = False
break
try:
tok = json.loads(data)["choices"][0]["delta"].get("content")
except json.JSONDecodeError:
byte_buf = line_bytes + b"\n" + byte_buf
continue
if tok:
assistant_buffer += tok
# 結束後更新歷史
history_openai.append({"role": "assistant", "content": sanitize(assistant_buffer)})
is_streaming = False
except Exception as e:
assistant_buffer = f"请求失败: {e}"
is_streaming = False
# ======================
# 使用者送出訊息
# ======================
def user_submit(user_msg, chatbot):
global current_chatbot, assistant_buffer, is_streaming
current_chatbot = chatbot + [[user_msg, ""]]
assistant_buffer = ""
is_streaming = True
Thread(target=stream_to_buffer, args=(user_msg,), daemon=True).start()
return "", current_chatbot
# ======================
# Timer 定時刷新
# ======================
def flush_buffer():
"""每 0.3 秒刷新 buffer,若已完成就不再更新"""
global current_chatbot, assistant_buffer, is_streaming
if not current_chatbot:
return current_chatbot
# 更新目前的內容
current_chatbot[-1][1] = assistant_buffer
# 若結束則停止 timer 更新
if not is_streaming:
time.sleep(0.1) # 確保最後一次更新
return current_chatbot
# ======================
# Gradio 介面
# ======================
with gr.Blocks() as demo:
gr.Markdown("## Chat8000 Lab3 - 每 0.3 秒刷新 Buffer(完成自動停止)")
chatbot = gr.Chatbot()
msg = gr.Textbox(show_label=False, placeholder="輸入訊息後按 Enter")
msg.submit(user_submit, [msg, chatbot], [msg, chatbot], queue=False)
timer = gr.Timer(value=0.3, active=True)
timer.tick(flush_buffer, outputs=chatbot)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)