Spaces:
Build error
Build error
import gradio as gr | |
import base64 | |
from openai import OpenAI | |
from PIL import Image | |
import io | |
import os | |
import time | |
import traceback | |
# API配置 | |
BASE_URL = "https://api.stepfun.com/v1" | |
STEP_API_KEY = os.environ.get("STEP_API_KEY", "5GTbxYn2RDN9qmm3Y2T2yhuzlJNrNj65y0W9dVVNrOUmD7eLB3aJ2NDXGyyl2Yccq") | |
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") | |
print(f"[DEBUG] Base URL: {BASE_URL}") | |
def image_to_base64(image_path): | |
"""将图片文件转换为base64字符串""" | |
try: | |
with Image.open(image_path) as img: | |
# 如果是RGBA,转换为RGB | |
if img.mode == 'RGBA': | |
rgb_img = Image.new('RGB', img.size, (255, 255, 255)) | |
rgb_img.paste(img, mask=img.split()[3]) | |
img = rgb_img | |
# 转换为字节流 | |
buffered = io.BytesIO() | |
img.save(buffered, format="JPEG", quality=95) | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
except Exception as e: | |
print(f"[ERROR] Failed to convert image: {e}") | |
return None | |
def user_submit(message, history, images): | |
"""处理用户提交""" | |
if not message and not images: | |
return message, history, images, "", None | |
# 创建用户消息显示 | |
display_message = message if message else "" | |
if images: | |
# 显示上传的图片数量 | |
if isinstance(images, list): | |
num_images = len(images) | |
image_text = f"[{num_images} Image{'s' if num_images > 1 else ''}]" | |
else: | |
image_text = "[1 Image]" | |
display_message = f"{image_text} {display_message}" if display_message else image_text | |
history = history + [[display_message, None]] | |
# 返回清空的输入框、更新的历史、清空的图片,以及保存的消息和图片 | |
return "", history, None, message, images | |
def bot_response(history, saved_message, saved_images, system_prompt, temperature, max_tokens, top_p): | |
"""生成机器人回复""" | |
if saved_message or saved_images: | |
# 调用process_message并流式返回结果 | |
for updated_history in process_message( | |
saved_message, | |
history, | |
saved_images, | |
system_prompt, | |
temperature, | |
max_tokens, | |
top_p | |
): | |
yield updated_history | |
else: | |
yield history | |
def process_message(message, history, images, system_prompt, temperature, max_tokens, top_p): | |
"""处理消息并调用Step-3 API""" | |
print(f"[DEBUG] Processing message: {message[:100] if message else 'None'}") | |
print(f"[DEBUG] Has images: {images is not None}") | |
print(f"[DEBUG] Images type: {type(images)}") | |
if images: | |
print(f"[DEBUG] Images content: {images}") | |
if not message and not images: | |
history[-1][1] = "Please provide a message or image." | |
yield history | |
return | |
# 确保历史记录中有用户消息 | |
if not history or history[-1][1] is not None: | |
display_message = message if message else "" | |
if images: | |
if isinstance(images, list): | |
num_images = len(images) | |
image_text = f"[{num_images} Image{'s' if num_images > 1 else ''}]" | |
else: | |
image_text = "[1 Image]" | |
display_message = f"{image_text} {display_message}" if display_message else image_text | |
history.append([display_message, None]) | |
# 开始生成回复 | |
history[-1][1] = "🤔 Thinking..." | |
yield history | |
try: | |
# 构建消息内容 | |
content = [] | |
# 处理图片(支持多图) | |
if images: | |
# 确保images是列表 | |
image_list = images if isinstance(images, list) else [images] | |
for image_path in image_list: | |
if image_path: | |
print(f"[DEBUG] Processing image: {image_path}") | |
base64_image = image_to_base64(image_path) | |
if base64_image: | |
content.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}", | |
"detail": "high" | |
} | |
}) | |
print(f"[DEBUG] Successfully added image to content") | |
else: | |
print(f"[ERROR] Failed to convert image: {image_path}") | |
# 添加文本消息 | |
if message: | |
content.append({ | |
"type": "text", | |
"text": message | |
}) | |
print(f"[DEBUG] Added text to content: {message[:100]}") | |
if not content: | |
history[-1][1] = "❌ No valid input provided." | |
yield history | |
return | |
# 构造API消息 | |
messages = [] | |
# 添加系统提示(如果有) | |
if system_prompt: | |
messages.append({"role": "system", "content": system_prompt}) | |
# 只使用用户消息内容,不包括之前的历史 | |
messages.append({ | |
"role": "user", | |
"content": content | |
}) | |
print(f"[DEBUG] Prepared {len(messages)} messages for API") | |
print(f"[DEBUG] Message structure: {[{'role': m['role'], 'content_types': [c.get('type', 'text') for c in m['content']] if isinstance(m['content'], list) else 'text'} for m in messages]}") | |
# 处理代理问题 - 确保删除所有代理相关的环境变量 | |
import os | |
import httpx | |
# 删除所有可能的代理环境变量 | |
proxy_vars = ['HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy', | |
'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy'] | |
for var in proxy_vars: | |
if var in os.environ: | |
del os.environ[var] | |
print(f"[DEBUG] Removed {var} from environment") | |
# 尝试创建客户端 | |
try: | |
# 方法1:直接创建 | |
client = OpenAI( | |
api_key=STEP_API_KEY, | |
base_url=BASE_URL | |
) | |
print("[DEBUG] Client created successfully (method 1)") | |
except TypeError as e: | |
if 'proxies' in str(e): | |
print(f"[DEBUG] Method 1 failed with proxy error, trying method 2") | |
# 方法2:使用自定义HTTP客户端 | |
http_client = httpx.Client(trust_env=False) | |
client = OpenAI( | |
api_key=STEP_API_KEY, | |
base_url=BASE_URL, | |
http_client=http_client | |
) | |
print("[DEBUG] Client created successfully (method 2)") | |
else: | |
raise e | |
print(f"[DEBUG] Making API call to {BASE_URL}") | |
# 调用API | |
response = client.chat.completions.create( | |
model="step-3", | |
messages=messages, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
top_p=top_p, | |
stream=True | |
) | |
print("[DEBUG] API call successful, starting streaming") | |
# 流式输出 | |
full_response = "" | |
in_reasoning = False | |
reasoning_content = "" | |
final_content = "" | |
has_reasoning_field = False | |
for chunk in response: | |
if chunk.choices and chunk.choices[0]: | |
delta = chunk.choices[0].delta | |
# 优先检查 delta.reasoning 字段(Step-3 API 的 CoT 内容) | |
if hasattr(delta, 'reasoning') and delta.reasoning: | |
has_reasoning_field = True | |
reasoning_content += delta.reasoning | |
print(f"[DEBUG] CoT chunk: {delta.reasoning[:50] if len(delta.reasoning) > 50 else delta.reasoning}") | |
# 实时更新显示 CoT 内容 | |
if final_content: | |
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n{final_content}" | |
else: | |
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" | |
history[-1][1] = display_text | |
yield history | |
# 处理常规 content 字段 | |
delta_content = delta.content if hasattr(delta, 'content') else None | |
if delta_content: | |
# 如果通过 reasoning 字段获取了 CoT,content 就是最终答案 | |
if has_reasoning_field: | |
final_content += delta_content | |
full_response += delta_content | |
else: | |
# 否则尝试解析 <reasoning> 标签 | |
full_response += delta_content | |
# 检测reasoning标签 | |
if '<reasoning>' in full_response and not in_reasoning: | |
in_reasoning = True | |
parts = full_response.split('<reasoning>') | |
if len(parts) > 1: | |
reasoning_content = parts[1] | |
if in_reasoning and '</reasoning>' in full_response: | |
in_reasoning = False | |
parts = full_response.split('</reasoning>') | |
if len(parts) > 1: | |
reasoning_content = parts[0].split('<reasoning>')[-1] | |
final_content = parts[1] | |
elif in_reasoning: | |
reasoning_content = full_response.split('<reasoning>')[-1] | |
elif '</reasoning>' in full_response: | |
parts = full_response.split('</reasoning>') | |
if len(parts) > 1: | |
final_content = parts[1] | |
else: | |
# 没有reasoning标签的情况 | |
if '<reasoning>' not in full_response: | |
final_content = full_response | |
# 格式化显示 | |
if reasoning_content and final_content: | |
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}" | |
elif reasoning_content: | |
display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" | |
else: | |
display_text = full_response | |
history[-1][1] = display_text | |
yield history | |
# 最终格式化 | |
if reasoning_content or final_content: | |
final_display = f"💭 **Chain of Thought:**\n\n{reasoning_content.strip()}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}" | |
history[-1][1] = final_display | |
else: | |
history[-1][1] = full_response | |
print(f"[DEBUG] Streaming completed. Response length: {len(full_response)}") | |
yield history | |
except Exception as e: | |
error_msg = f"❌ Error: {str(e)}" | |
print(f"[ERROR] {error_msg}") | |
traceback.print_exc() | |
history[-1][1] = f"❌ Error: {str(e)}" | |
yield history | |
# 创建Gradio界面 | |
css = """ | |
/* 文本框样式 */ | |
#message-textbox textarea { | |
min-height: 54px !important; | |
max-height: 54px !important; | |
} | |
/* File上传组件容器 */ | |
#image-upload { | |
height: 54px !important; | |
min-height: 54px !important; | |
max-height: 54px !important; | |
} | |
/* File组件内部wrapper */ | |
#image-upload .wrap { | |
height: 54px !important; | |
min-height: 54px !important; | |
max-height: 54px !important; | |
padding: 0 !important; | |
margin: 0 !important; | |
border-radius: 8px !important; | |
} | |
/* 上传区域样式 */ | |
#image-upload .upload-container { | |
height: 54px !important; | |
min-height: 54px !important; | |
display: flex !important; | |
align-items: center !important; | |
justify-content: center !important; | |
flex-direction: column !important; | |
gap: 2px !important; | |
} | |
/* Drop File Here 文字样式 */ | |
#image-upload .upload-text { | |
font-size: 13px !important; | |
margin: 0 !important; | |
padding: 0 !important; | |
line-height: 1.2 !important; | |
} | |
/* or 文字样式 */ | |
#image-upload .or-text { | |
font-size: 11px !important; | |
margin: 0 !important; | |
padding: 0 !important; | |
opacity: 0.7 !important; | |
line-height: 1 !important; | |
} | |
/* 隐藏默认的 or 分隔符 */ | |
#image-upload .or { | |
display: none !important; | |
} | |
/* 上传按钮样式 */ | |
#image-upload button { | |
height: 54px !important; | |
font-size: 13px !important; | |
padding: 0 16px !important; | |
white-space: nowrap !important; | |
} | |
/* 文件预览样式 */ | |
#image-upload .file-preview { | |
height: 54px !important; | |
max-height: 54px !important; | |
overflow-y: auto !important; | |
font-size: 12px !important; | |
padding: 4px 8px !important; | |
display: flex !important; | |
align-items: center !important; | |
} | |
/* 隐藏标签 */ | |
#image-upload label { | |
display: none !important; | |
} | |
/* 确保所有子元素不超过容器高度 */ | |
#image-upload * { | |
max-height: 54px !important; | |
} | |
/* 调整上传区域文字布局 */ | |
#image-upload .center { | |
display: flex !important; | |
flex-direction: column !important; | |
align-items: center !important; | |
justify-content: center !important; | |
height: 54px !important; | |
gap: 0 !important; | |
} | |
/* 调整文字行高避免截断 */ | |
#image-upload span { | |
line-height: 1.2 !important; | |
display: block !important; | |
} | |
""" | |
with gr.Blocks(title="Step-3", theme=gr.themes.Soft(), css=css) as demo: | |
gr.Markdown(""" | |
# <img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 30px; vertical-align: middle; margin-right: 8px;"> Step-3 | |
Welcome to Step-3, an advanced multimodal AI assistant by <a href="https://stepfun.com/" target="_blank" style="color: #0969da;">StepFun</a>. | |
""") | |
# 创建状态变量来保存消息和图片 | |
saved_msg = gr.State("") | |
saved_imgs = gr.State([]) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
height=600, | |
show_label=False, | |
elem_id="chatbot", | |
bubble_full_width=False, | |
avatar_images=None, | |
render_markdown=True | |
) | |
# 输入区域 | |
with gr.Row(): | |
with gr.Column(scale=8): | |
msg = gr.Textbox( | |
label="Message", | |
placeholder="Type your message here...", | |
lines=2, | |
max_lines=10, | |
show_label=False, | |
elem_id="message-textbox" | |
) | |
with gr.Column(scale=2): | |
image_input = gr.File( | |
label="Upload Images", | |
file_count="multiple", | |
file_types=[".png", ".jpg", ".jpeg", ".gif", ".webp"], | |
interactive=True, | |
show_label=False, | |
elem_classes="compact-file", | |
elem_id="image-upload" | |
) | |
with gr.Column(scale=1, min_width=100): | |
submit_btn = gr.Button("Send", variant="primary") | |
# 底部按钮 | |
with gr.Row(): | |
clear_btn = gr.Button("🗑️ Clear", scale=1) | |
undo_btn = gr.Button("↩️ Undo", scale=1) | |
retry_btn = gr.Button("🔄 Retry", scale=1) | |
with gr.Column(scale=1): | |
# 设置面板 | |
with gr.Accordion("⚙️ Settings", open=False): | |
system_prompt = gr.Textbox( | |
label="System Prompt", | |
value="You are Step-3, an advanced multimodal AI assistant developed by StepFun. You have strong capabilities in image understanding, reasoning, and providing detailed, helpful responses. You can analyze images, answer questions, and assist with various tasks while showing your reasoning process.", | |
lines=4 | |
) | |
temperature_slider = gr.Slider( | |
minimum=0, | |
maximum=1, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens_slider = gr.Slider( | |
minimum=100, | |
maximum=8000, | |
value=2000, | |
step=100, | |
label="Max Tokens" | |
) | |
top_p_slider = gr.Slider( | |
minimum=0, | |
maximum=1, | |
value=0.95, | |
step=0.05, | |
label="Top P" | |
) | |
# 事件处理 | |
submit_event = msg.submit( | |
user_submit, | |
[msg, chatbot, image_input], | |
[msg, chatbot, image_input, saved_msg, saved_imgs], | |
queue=False | |
).then( | |
bot_response, | |
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider], | |
chatbot | |
) | |
submit_btn.click( | |
user_submit, | |
[msg, chatbot, image_input], | |
[msg, chatbot, image_input, saved_msg, saved_imgs], | |
queue=False | |
).then( | |
bot_response, | |
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider], | |
chatbot | |
) | |
clear_btn.click(lambda: None, None, chatbot, queue=False) | |
undo_btn.click( | |
lambda h: h[:-1] if h else h, | |
chatbot, | |
chatbot, | |
queue=False | |
) | |
retry_btn.click( | |
lambda h: h[:-1] if h and h[-1][1] is not None else h, | |
chatbot, | |
chatbot, | |
queue=False | |
).then( | |
bot_response, | |
[chatbot, saved_msg, saved_imgs, system_prompt, temperature_slider, max_tokens_slider, top_p_slider], | |
chatbot | |
) | |
# 启动应用 | |
if __name__ == "__main__": | |
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") | |
print(f"[DEBUG] Base URL: {BASE_URL}") | |
demo.queue(max_size=10) | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=False | |
) |