Spaces:
Build error
Build error
import gradio as gr | |
import time | |
import base64 | |
from openai import OpenAI | |
import os | |
from io import BytesIO | |
from PIL import Image | |
import re | |
# 配置 | |
BASE_URL = "https://api.stepfun.com/v1" | |
# 从环境变量获取API密钥 | |
STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
def image_to_base64(image): | |
"""将PIL图像转换为base64字符串""" | |
if image is None: | |
return None | |
if isinstance(image, Image.Image): | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
return img_str | |
return None | |
def extract_cot_and_answer(text): | |
"""从响应中提取CoT推理过程和最终答案""" | |
# 匹配<reasoning>标签内的内容 | |
reasoning_pattern = re.compile(r'<reasoning>(.*?)</reasoning>', re.DOTALL) | |
match = reasoning_pattern.search(text) | |
if match: | |
cot = match.group(1).strip() | |
# 移除reasoning标签及其内容,得到最终答案 | |
answer = reasoning_pattern.sub('', text).strip() | |
return cot, answer | |
else: | |
# 如果没有reasoning标签,整个响应就是答案 | |
return "", text | |
def format_message_with_image(message_text, image_path=None): | |
"""格式化包含图片的消息""" | |
if image_path: | |
# 创建包含图片和文本的消息 | |
return f'<img src="{image_path}" style="max-width: 200px; max-height: 200px; border-radius: 8px; margin-bottom: 10px;"><br>{message_text}' | |
return message_text | |
def call_step_api_stream(message, history): | |
"""调用Step API进行流式对话,支持多模态输入""" | |
print(f"[DEBUG] Starting API call - Message type: {type(message)}") | |
if not message: | |
print("[DEBUG] No message provided") | |
yield history, "", "" | |
return | |
if not STEP_API_KEY: | |
print("[DEBUG] API key not configured") | |
error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings." | |
history.append([message if isinstance(message, str) else "Message", error_msg]) | |
yield history, "", "" | |
return | |
print(f"[DEBUG] API Key exists: {bool(STEP_API_KEY)}") | |
# 处理多模态输入 | |
text_content = "" | |
image_content = None | |
display_message = "" | |
# Gradio MultimodalTextbox 返回一个字典 | |
if isinstance(message, dict): | |
text_content = message.get("text", "") | |
files = message.get("files", []) | |
# 处理图片文件 | |
if files and len(files) > 0: | |
image_path = files[0] # 取第一张图片 | |
try: | |
img = Image.open(image_path) | |
image_content = image_to_base64(img) | |
# 创建显示消息,包含图片缩略图 | |
display_message = format_message_with_image(text_content, image_path) | |
print(f"[DEBUG] Image processed successfully") | |
except Exception as e: | |
print(f"[DEBUG] Failed to process image: {e}") | |
display_message = text_content | |
else: | |
display_message = text_content | |
else: | |
# 纯文本消息 | |
text_content = str(message) | |
display_message = text_content | |
# 添加用户消息到历史 | |
history.append([display_message, ""]) | |
yield history, "", "" | |
# 构造API消息 | |
messages = [] | |
# 添加历史对话(只提取文本部分,不包含HTML) | |
for h in history[:-1]: # 不包含当前消息 | |
if h[0]: # 用户消息 | |
# 从HTML中提取纯文本 | |
user_text = re.sub(r'<[^>]+>', '', h[0]) if '<' in h[0] else h[0] | |
messages.append({"role": "user", "content": user_text}) | |
if h[1]: # 助手回复 | |
messages.append({"role": "assistant", "content": h[1]}) | |
# 构造当前消息 | |
if image_content: | |
# 有图片的情况 | |
current_content = [ | |
{"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}} | |
] | |
if text_content: | |
current_content.append({"type": "text", "text": text_content}) | |
messages.append({"role": "user", "content": current_content}) | |
else: | |
# 纯文本 | |
messages.append({"role": "user", "content": text_content}) | |
print(f"[DEBUG] Messages count: {len(messages)}") | |
# 创建客户端 | |
try: | |
client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
print("[DEBUG] Client created successfully") | |
except Exception as e: | |
print(f"[DEBUG] Client initialization failed: {e}") | |
history[-1][1] = f"❌ Client initialization failed: {str(e)}" | |
yield history, "", "" | |
return | |
# 调用API | |
try: | |
print("[DEBUG] Calling API...") | |
response = client.chat.completions.create( | |
model="step-3", | |
messages=messages, | |
temperature=0.7, | |
max_tokens=2000, | |
stream=True | |
) | |
print("[DEBUG] API call successful, processing stream...") | |
# 处理流式响应 | |
full_response = "" | |
current_cot = "" | |
current_answer = "" | |
chunk_count = 0 | |
for chunk in response: | |
chunk_count += 1 | |
if chunk.choices and len(chunk.choices) > 0: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, 'content') and delta.content: | |
full_response += delta.content | |
# 实时提取CoT和答案 | |
current_cot, current_answer = extract_cot_and_answer(full_response) | |
# 更新历史中的回复 | |
if current_cot and current_answer: | |
# 如果有CoT,显示完整格式 | |
history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n{current_answer}" | |
elif current_cot: | |
# 只有CoT,还没有答案 | |
history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" | |
else: | |
# 没有CoT,直接显示答案 | |
history[-1][1] = current_answer | |
print(f"[DEBUG] Chunk {chunk_count}: processed") | |
yield history, current_cot, current_answer | |
if not full_response: | |
print("[DEBUG] No response content received") | |
history[-1][1] = "⚠️ No response received from API" | |
yield history, "", "" | |
else: | |
print(f"[DEBUG] Final response length: {len(full_response)} chars") | |
except Exception as e: | |
print(f"[DEBUG] API request failed: {e}") | |
import traceback | |
traceback.print_exc() | |
history[-1][1] = f"❌ API request failed: {str(e)}" | |
yield history, "", "" | |
def clear_history(): | |
"""Clear conversation history""" | |
return [], None | |
# 创建Gradio界面 | |
with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 Step-3 | |
Hello, I am Step-3! | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# 对话界面 | |
chatbot = gr.Chatbot( | |
height=600, | |
show_label=False, | |
elem_id="chatbot", | |
bubble_full_width=False, | |
render_markdown=True | |
) | |
with gr.Row(): | |
# 多模态输入框 - 支持文本和图片 | |
msg = gr.MultimodalTextbox( | |
placeholder="Type your message here... (You can paste images directly)", | |
show_label=False, | |
file_types=["image"], | |
container=False, | |
submit_btn="Send" | |
) | |
clear_btn = gr.Button("Clear", scale=0) | |
with gr.Column(scale=1): | |
# CoT推理过程展示 | |
gr.Markdown("### 💭 Chain of Thought") | |
cot_display = gr.Textbox( | |
label="Reasoning Process", | |
lines=10, | |
max_lines=15, | |
show_label=False, | |
interactive=False, | |
show_copy_button=True | |
) | |
gr.Markdown("### 📝 Final Answer") | |
answer_display = gr.Textbox( | |
label="Answer", | |
lines=10, | |
max_lines=15, | |
show_label=False, | |
interactive=False, | |
show_copy_button=True | |
) | |
# 事件处理 | |
msg.submit( | |
call_step_api_stream, | |
[msg, chatbot], | |
[chatbot, cot_display, answer_display] | |
) | |
clear_btn.click( | |
clear_history, | |
None, | |
[chatbot, msg] | |
) | |
# 页脚 | |
gr.Markdown(""" | |
--- | |
<div style="text-align: center;"> | |
<img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 40px; margin: 10px;"> | |
<br> | |
Powered by <a href="https://www.stepfun.com/" target="_blank">StepFun</a> | |
</div> | |
""") | |
# 启动应用 | |
if __name__ == "__main__": | |
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") | |
print(f"[DEBUG] Base URL: {BASE_URL}") | |
demo.queue(max_size=10) | |
demo.launch( | |
share=False, | |
debug=True | |
) |