import gradio as gr
import time
import base64
from openai import OpenAI
import os
from io import BytesIO
from PIL import Image
import re
# 配置
BASE_URL = "https://api.stepfun.com/v1"
STEP_API_KEY = os.environ.get("STEP_API_KEY", "")
def image_to_base64(image):
"""将图像转换为base64字符串"""
if image is None:
return None
if isinstance(image, Image.Image):
buffered = BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_str
elif isinstance(image, str) and os.path.exists(image):
with open(image, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
return None
def process_message(message, history, system_prompt, temperature, max_tokens, top_p):
"""处理消息并生成响应"""
print(f"[DEBUG] Processing message: {message[:100] if message else 'None'}...")
if not message:
print("[DEBUG] No message provided, skipping")
yield history
return
if not STEP_API_KEY:
print("[DEBUG] No API key configured")
history.append([message, "❌ API key not configured. Please add STEP_API_KEY in Settings."])
yield history
return
# 检查是否有图片(通过检查消息中是否有图片标签)
image_pattern = r']+src="([^">]+)"'
image_match = re.search(image_pattern, message)
image_content = None
text_content = message
if image_match:
# 提取图片路径
image_path = image_match.group(1)
text_content = re.sub(image_pattern, '', message).strip()
# 转换图片为base64
try:
if image_path.startswith('data:'):
# 已经是base64格式
image_content = image_path.split(',')[1]
else:
image_content = image_to_base64(image_path)
print(f"[DEBUG] Image processed successfully")
except Exception as e:
print(f"[DEBUG] Failed to process image: {e}")
# 添加到历史
history.append([message, ""])
yield history
# 构建API消息
messages = []
# 添加系统提示词
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# 添加历史对话
for h in history[:-1]:
if h[0]:
# 用户消息 - 移除图片标签
user_text = re.sub(r'
]+>', '', h[0]).strip()
if user_text:
messages.append({"role": "user", "content": user_text})
if h[1] and not h[1].startswith("❌"):
messages.append({"role": "assistant", "content": h[1]})
# 添加当前消息
if image_content:
current_content = [
{"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}}
]
if text_content:
current_content.append({"type": "text", "text": text_content})
messages.append({"role": "user", "content": current_content})
else:
messages.append({"role": "user", "content": text_content})
print(f"[DEBUG] Sending {len(messages)} messages to API")
print(f"[DEBUG] Last message: {messages[-1]}")
# 创建客户端并调用API
try:
client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL)
print("[DEBUG] Client created successfully")
print("[DEBUG] Calling API...")
response = client.chat.completions.create(
model="step-3",
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stream=True
)
print("[DEBUG] API call successful, processing stream...")
# 流式输出
full_response = ""
chunk_count = 0
for chunk in response:
chunk_count += 1
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_response += delta.content
history[-1][1] = full_response
if chunk_count % 10 == 0:
print(f"[DEBUG] Received {chunk_count} chunks, {len(full_response)} chars")
yield history
print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}, Total chars: {len(full_response)}")
if not full_response:
print("[DEBUG] No response content received")
history[-1][1] = "⚠️ No response received from API"
yield history
except Exception as e:
print(f"[DEBUG] API error: {e}")
import traceback
traceback.print_exc()
history[-1][1] = f"❌ Error: {str(e)}"
yield history
# 创建Gradio界面
with gr.Blocks(title="Step-3 Chat", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 Step-3 Chat
Welcome to Step-3, an advanced multimodal AI assistant by StepFun.
""")
with gr.Row():
with gr.Column(scale=3):
# 聊天界面
chatbot = gr.Chatbot(
height=600,
show_label=False,
elem_id="chatbot",
bubble_full_width=False,
avatar_images=None,
render_markdown=True
)
# 输入区域
with gr.Row():
with gr.Column(scale=10):
msg = gr.Textbox(
label="Message",
placeholder="Type your message here... (You can drag and drop images)",
lines=2,
max_lines=10,
show_label=False,
elem_id="message-textbox"
)
with gr.Column(scale=1, min_width=100):
submit_btn = gr.Button("Send", variant="primary")
# 底部按钮
with gr.Row():
clear_btn = gr.Button("🗑️ Clear", scale=1)
undo_btn = gr.Button("↩️ Undo", scale=1)
retry_btn = gr.Button("🔄 Retry", scale=1)
with gr.Column(scale=1):
# 设置面板
with gr.Accordion("⚙️ Settings", open=True):
system_prompt = gr.Textbox(
label="System Prompt",
placeholder="You are a helpful assistant...",
lines=3,
value="You are Step-3, a helpful AI assistant created by StepFun."
)
temperature = gr.Slider(
minimum=0,
maximum=2,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens = gr.Slider(
minimum=1,
maximum=4096,
value=2048,
step=1,
label="Max Tokens"
)
top_p = gr.Slider(
minimum=0,
maximum=1,
value=0.95,
step=0.01,
label="Top P"
)
# 示例
with gr.Accordion("📝 Examples", open=False):
gr.Examples(
examples=[
["What is machine learning?"],
["Write a Python function to calculate fibonacci numbers"],
["Explain quantum computing in simple terms"],
["What are the benefits of renewable energy?"],
["How does blockchain technology work?"]
],
inputs=msg,
label=""
)
# 事件处理函数
def user_submit(message, history):
"""用户提交消息时的处理"""
print(f"[DEBUG] user_submit called with message: {message[:50] if message else 'None'}...")
if message:
# 保存消息内容用于后续处理
return gr.update(value=""), history, message
return gr.update(value=message), history, message
def bot_response(history, saved_message, system_prompt, temperature, max_tokens, top_p):
"""生成机器人响应"""
print(f"[DEBUG] bot_response called with saved_message: {saved_message[:50] if saved_message else 'None'}...")
if saved_message:
# 使用生成器处理消息
for updated_history in process_message(saved_message, history, system_prompt, temperature, max_tokens, top_p):
yield updated_history
else:
yield history
def undo_last(history):
if history:
return history[:-1]
return history
def retry_last(history):
if history and history[-1][0]:
last_message = history[-1][0]
new_history = history[:-1]
return new_history, last_message
return history, ""
# 创建一个隐藏的组件来存储消息
saved_msg = gr.State("")
# 提交消息 - Enter键
msg.submit(
user_submit,
[msg, chatbot],
[msg, chatbot, saved_msg],
queue=False
).then(
bot_response,
[chatbot, saved_msg, system_prompt, temperature, max_tokens, top_p],
chatbot
)
# 提交消息 - Send按钮
submit_btn.click(
user_submit,
[msg, chatbot],
[msg, chatbot, saved_msg],
queue=False
).then(
bot_response,
[chatbot, saved_msg, system_prompt, temperature, max_tokens, top_p],
chatbot
)
# 清空对话
clear_btn.click(
lambda: ([], ""),
None,
[chatbot, msg]
)
# 撤销最后一条
undo_btn.click(
undo_last,
chatbot,
chatbot
)
# 重试最后一条
retry_btn.click(
retry_last,
chatbot,
[chatbot, saved_msg]
).then(
bot_response,
[chatbot, saved_msg, system_prompt, temperature, max_tokens, top_p],
chatbot
)
# 页脚
gr.Markdown("""
---