Step3 / app.py
Zenith Wang
Revert "Change Step-3 to Step3 throughout the application"
fa16180
raw
history blame
19.7 kB
import gradio as gr
import base64
from openai import OpenAI
from PIL import Image
import io
import os
import time
import traceback
# API配置
BASE_URL = "https://api.stepfun.com/v1"
STEP_API_KEY = os.environ.get("STEP_API_KEY", "5GTbxYn2RDN9qmm3Y2T2yhuzlJNrNj65y0W9dVVNrOUmD7eLB3aJ2NDXGyyl2Yccq")
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}")
print(f"[DEBUG] Base URL: {BASE_URL}")
def image_to_base64(image_path):
"""将图片文件转换为base64字符串"""
try:
with Image.open(image_path) as img:
# 如果是RGBA,转换为RGB
if img.mode == 'RGBA':
rgb_img = Image.new('RGB', img.size, (255, 255, 255))
rgb_img.paste(img, mask=img.split()[3])
img = rgb_img
# 转换为字节流
buffered = io.BytesIO()
img.save(buffered, format="JPEG", quality=95)
return base64.b64encode(buffered.getvalue()).decode('utf-8')
except Exception as e:
print(f"[ERROR] Failed to convert image: {e}")
return None
def user_submit(message, history, images):
"""处理用户提交"""
if not message and not images:
return message, history, images, "", None
# 创建用户消息显示
display_message = message if message else ""
if images:
# 显示上传的图片数量
if isinstance(images, list):
num_images = len(images)
image_text = f"[{num_images} Image{'s' if num_images > 1 else ''}]"
else:
image_text = "[1 Image]"
display_message = f"{image_text} {display_message}" if display_message else image_text
history = history + [[display_message, None]]
# 返回清空的输入框、更新的历史、清空的图片,以及保存的消息和图片
return "", history, None, message, images
def bot_response(history, saved_message, saved_images, system_prompt, temperature, max_tokens, top_p):
"""生成机器人回复"""
if saved_message or saved_images:
# 调用process_message并流式返回结果
for updated_history in process_message(
saved_message,
history,
saved_images,
system_prompt,
temperature,
max_tokens,
top_p
):
yield updated_history
else:
yield history
def process_message(message, history, images, system_prompt, temperature, max_tokens, top_p):
"""处理消息并调用Step-3 API"""
print(f"[DEBUG] Processing message: {message[:100] if message else 'None'}")
print(f"[DEBUG] Has images: {images is not None}")
print(f"[DEBUG] Images type: {type(images)}")
if images:
print(f"[DEBUG] Images content: {images}")
if not message and not images:
history[-1][1] = "Please provide a message or image."
yield history
return
# 确保历史记录中有用户消息
if not history or history[-1][1] is not None:
display_message = message if message else ""
if images:
if isinstance(images, list):
num_images = len(images)
image_text = f"[{num_images} Image{'s' if num_images > 1 else ''}]"
else:
image_text = "[1 Image]"
display_message = f"{image_text} {display_message}" if display_message else image_text
history.append([display_message, None])
# 开始生成回复
history[-1][1] = "🤔 Thinking..."
yield history
try:
# 构建消息内容
content = []
# 处理图片(支持多图)
if images:
# 确保images是列表
image_list = images if isinstance(images, list) else [images]
for image_path in image_list:
if image_path:
print(f"[DEBUG] Processing image: {image_path}")
base64_image = image_to_base64(image_path)
if base64_image:
content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high"
}
})
print(f"[DEBUG] Successfully added image to content")
else:
print(f"[ERROR] Failed to convert image: {image_path}")
# 添加文本消息
if message:
content.append({
"type": "text",
"text": message
})
print(f"[DEBUG] Added text to content: {message[:100]}")
if not content:
history[-1][1] = "❌ No valid input provided."
yield history
return
# 构造API消息
messages = []
# 添加系统提示(如果有)
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
# 只使用用户消息内容,不包括之前的历史
messages.append({
"role": "user",
"content": content
})
print(f"[DEBUG] Prepared {len(messages)} messages for API")
print(f"[DEBUG] Message structure: {[{'role': m['role'], 'content_types': [c.get('type', 'text') for c in m['content']] if isinstance(m['content'], list) else 'text'} for m in messages]}")
# 处理代理问题 - 确保删除所有代理相关的环境变量
import os
import httpx
# 删除所有可能的代理环境变量
proxy_vars = ['HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy',
'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy']
for var in proxy_vars:
if var in os.environ:
del os.environ[var]
print(f"[DEBUG] Removed {var} from environment")
# 尝试创建客户端
try:
# 方法1:直接创建
client = OpenAI(
api_key=STEP_API_KEY,
base_url=BASE_URL
)
print("[DEBUG] Client created successfully (method 1)")
except TypeError as e:
if 'proxies' in str(e):
print(f"[DEBUG] Method 1 failed with proxy error, trying method 2")
# 方法2:使用自定义HTTP客户端
http_client = httpx.Client(trust_env=False)
client = OpenAI(
api_key=STEP_API_KEY,
base_url=BASE_URL,
http_client=http_client
)
print("[DEBUG] Client created successfully (method 2)")
else:
raise e
print(f"[DEBUG] Making API call to {BASE_URL}")
# 调用API
response = client.chat.completions.create(
model="step-3",
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stream=True
)
print("[DEBUG] API call successful, processing stream...")
# 流式输出
full_response = ""
chunk_count = 0
in_reasoning = False
reasoning_content = ""
final_content = ""
has_reasoning_field = False # 标记是否通过 delta.reasoning 字段获取 CoT
for chunk in response:
chunk_count += 1
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
# 检查 delta.reasoning 字段(Step-3 API 的 CoT 内容)
if hasattr(delta, 'reasoning') and delta.reasoning:
has_reasoning_field = True
reasoning_content += delta.reasoning
print(f"[DEBUG] CoT chunk: {delta.reasoning[:50] if len(delta.reasoning) > 50 else delta.reasoning}")
# 处理常规 content 字段
if hasattr(delta, 'content') and delta.content:
content = delta.content
# 如果通过 reasoning 字段获取了 CoT,content 就是最终答案
if has_reasoning_field:
final_content += content
full_response = reasoning_content + final_content # 完整响应包含两部分
else:
# 否则尝试解析 <reasoning> 标签
full_response += content
# 检测 <reasoning> 标签
if '<reasoning>' in content and not in_reasoning:
in_reasoning = True
# 分割内容
parts = content.split('<reasoning>')
final_content += parts[0]
if len(parts) > 1:
reasoning_content += parts[1]
elif '</reasoning>' in content and in_reasoning:
# 结束推理部分
parts = content.split('</reasoning>')
if parts[0]:
reasoning_content += parts[0]
in_reasoning = False
if len(parts) > 1:
final_content += parts[1]
elif in_reasoning:
# 在推理标签内
reasoning_content += content
else:
# 在推理标签外或没有标签
if not reasoning_content: # 如果还没有推理内容,说明没有reasoning标签
final_content = full_response
else:
final_content += content
# 实时更新显示
if reasoning_content and final_content:
# 有推理和最终答案
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}"
elif reasoning_content:
# 只有推理过程
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n*Generating...*"
else:
# 只有答案或普通回复
display_text = final_content if final_content else full_response
history[-1][1] = display_text
if chunk_count % 5 == 0:
print(f"[DEBUG] Received {chunk_count} chunks")
if has_reasoning_field:
print(f"[DEBUG] CoT length: {len(reasoning_content)}, Answer length: {len(final_content)}")
yield history
print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}, Total chars: {len(full_response)}")
# 最终格式化
if reasoning_content:
# 如果有推理内容,使用格式化显示
final_display = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}"
history[-1][1] = final_display
yield history
elif final_content:
# 只有最终答案
history[-1][1] = final_content.strip()
yield history
elif full_response:
# 使用完整响应
history[-1][1] = full_response.strip()
yield history
else:
print("[DEBUG] No response content received")
history[-1][1] = "⚠️ No response received from API"
yield history
except Exception as e:
print(f"[DEBUG] API error: {e}")
import traceback
traceback.print_exc()
history[-1][1] = f"❌ Error: {str(e)}"
yield history
# 创建Gradio界面
css = """
/* 文本框样式 */
#message-textbox textarea {
min-height: 54px !important;
max-height: 54px !important;
}
/* File上传组件容器 */
#image-upload {
height: 54px !important;
min-height: 54px !important;
max-height: 54px !important;
}
/* File组件内部wrapper */
#image-upload .wrap {
height: 54px !important;
min-height: 54px !important;
max-height: 54px !important;
padding: 0 !important;
margin: 0 !important;
border-radius: 8px !important;
}
/* 上传区域样式 */
#image-upload .upload-container {
height: 54px !important;
min-height: 54px !important;
display: flex !important;
align-items: center !important;
justify-content: center !important;
flex-direction: column !important;
gap: 2px !important;
}
/* Drop File Here 文字样式 */
#image-upload .upload-text {
font-size: 13px !important;
margin: 0 !important;
padding: 0 !important;
line-height: 1.2 !important;
}
/* or 文字样式 */
#image-upload .or-text {
font-size: 11px !important;
margin: 0 !important;
padding: 0 !important;
opacity: 0.7 !important;
line-height: 1 !important;
}
/* 隐藏默认的 or 分隔符 */
#image-upload .or {
display: none !important;
}
/* 上传按钮样式 */
#image-upload button {
height: 54px !important;
font-size: 13px !important;
padding: 0 16px !important;
white-space: nowrap !important;
}
/* 文件预览样式 */
#image-upload .file-preview {
height: 54px !important;
max-height: 54px !important;
overflow-y: auto !important;
font-size: 12px !important;
padding: 4px 8px !important;
display: flex !important;
align-items: center !important;
}
/* 隐藏标签 */
#image-upload label {
display: none !important;
}
/* 确保所有子元素不超过容器高度 */
#image-upload * {
max-height: 54px !important;
}
/* 调整上传区域文字布局 */
#image-upload .center {
display: flex !important;
flex-direction: column !important;
align-items: center !important;
justify-content: center !important;
height: 54px !important;
gap: 0 !important;
}
/* 调整文字行高避免截断 */
#image-upload span {
line-height: 1.2 !important;
display: block !important;
}
"""
with gr.Blocks(title="Step-3", theme=gr.themes.Soft(), css=css) as demo:
gr.Markdown("""
# <img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 30px; vertical-align: middle; margin-right: 8px;"> Step-3
Welcome to Step-3, an advanced multimodal AI assistant by <a href="https://stepfun.com/" target="_blank" style="color: #0969da;">StepFun</a>.
""")
# 创建状态变量来保存消息和图片
saved_msg = gr.State("")
saved_imgs = gr.State([])
with gr.Row():
with gr.Column(scale=3):
chatbot = gr.Chatbot(
height=600,
show_label=False,
elem_id="chatbot",
bubble_full_width=False,
avatar_images=None,
render_markdown=True
)
# 输入区域
with gr.Row():
with gr.Column(scale=8):
msg = gr.Textbox(
label="Message",
placeholder="Type your message here...",
lines=2,
max_lines=10,
show_label=False,
elem_id="message-textbox"
)
with gr.Column(scale=2):
image_input = gr.File(
label="Upload Images",
file_count="multiple",
file_types=[".png", ".jpg", ".jpeg", ".gif", ".webp"],
interactive=True,
show_label=False,
elem_classes="compact-file",
elem_id="image-upload"
)
with gr.Column(scale=1, min_width=100):
submit_btn = gr.Button("Send", variant="primary")
# 底部按钮
with gr.Row():
clear_btn = gr.Button("🗑️ Clear", scale=1)
undo_btn = gr.Button("↩️ Undo", scale=1)
retry_btn = gr.Button("🔄 Retry", scale=1)
with gr.Column(scale=1):
# 设置面板
with gr.Accordion("⚙️ Settings", open=False):
system_prompt = gr.Textbox(
label="System Prompt",
value="You are Step-3, an advanced multimodal AI assistant developed by StepFun. You have strong capabilities in image understanding, reasoning, and providing detailed, helpful responses. You can analyze images, answer questions, and assist with various tasks while showing your reasoning process.",
lines=4
)
temperature_slider = gr.Slider(
minimum=0,
maximum=1,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens_slider = gr.Slider(
minimum=100,
maximum=8000,
value=2000,
step=100,
label="Max Tokens"
)
top_p_slider = gr.Slider(
minimum=0,
maximum=1,
value=0.95,
step=0.05,
label="Top P"
)
# 事件处理
submit_event = msg.submit(
user_submit,
[msg, chatbot, image_input],
[msg, chatbot, image_input, saved_msg, saved_imgs],
queue=False
).then(
bot_response,
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider],
chatbot
)
submit_btn.click(
user_submit,
[msg, chatbot, image_input],
[msg, chatbot, image_input, saved_msg, saved_imgs],
queue=False
).then(
bot_response,
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider],
chatbot
)
clear_btn.click(lambda: None, None, chatbot, queue=False)
undo_btn.click(
lambda h: h[:-1] if h else h,
chatbot,
chatbot,
queue=False
)
retry_btn.click(
lambda h: h[:-1] if h and h[-1][1] is not None else h,
chatbot,
chatbot,
queue=False
).then(
bot_response,
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider],
chatbot
)
# 启动应用
if __name__ == "__main__":
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}")
print(f"[DEBUG] Base URL: {BASE_URL}")
demo.queue(max_size=10)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False
)