Spaces:
Build error
Build error
import gradio as gr | |
import time | |
import base64 | |
from openai import OpenAI | |
import os | |
from io import BytesIO | |
from PIL import Image | |
# 配置 | |
BASE_URL = "https://api.stepfun.com/v1" | |
# 从环境变量获取API密钥 | |
STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
# 可选模型 | |
MODELS = ["step-3", "step-r1-v-mini"] | |
def image_to_base64(image): | |
"""将PIL图像转换为base64字符串""" | |
if image is None: | |
return None | |
if isinstance(image, Image.Image): | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode() | |
return img_str | |
return None | |
def call_step_api(image, prompt, model, temperature=0.7, max_tokens=2000): | |
"""调用Step API进行图像分析和文本生成,支持CoT推理展示""" | |
if image is None: | |
yield "❌ 请先上传一张图片", "" | |
return | |
if not prompt: | |
yield "❌ 请输入提示词", "" | |
return | |
if not STEP_API_KEY: | |
yield "❌ API密钥未配置。请在 Hugging Face Space 的 Settings 中添加 STEP_API_KEY 环境变量。", "" | |
return | |
# 转换图像为base64 | |
try: | |
base64_image = image_to_base64(image) | |
if base64_image is None: | |
yield "❌ 图片处理失败", "" | |
return | |
except Exception as e: | |
yield f"❌ 图片处理错误: {str(e)}", "" | |
return | |
# 构造消息 | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/png;base64,{base64_image}", | |
"detail": "high" | |
} | |
}, | |
{ | |
"type": "text", | |
"text": prompt | |
} | |
] | |
} | |
] | |
# 创建OpenAI客户端 | |
try: | |
client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
except Exception as e: | |
yield f"❌ 客户端初始化失败: {str(e)}", "" | |
return | |
try: | |
# 记录开始时间 | |
start_time = time.time() | |
# 流式输出 | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
temperature=temperature, | |
max_tokens=max_tokens, | |
stream=True | |
) | |
full_response = "" | |
reasoning_content = "" | |
final_answer = "" | |
is_reasoning = False | |
reasoning_started = False | |
for chunk in response: | |
if chunk.choices and chunk.choices[0].delta: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, 'content') and delta.content: | |
content = delta.content | |
full_response += content | |
# 检测reasoning标记 | |
if "<reasoning>" in content: | |
is_reasoning = True | |
reasoning_started = True | |
# 提取<reasoning>之前的内容添加到final_answer | |
before_reasoning = content.split("<reasoning>")[0] | |
if before_reasoning: | |
final_answer += before_reasoning | |
# 提取<reasoning>之后的内容开始reasoning | |
after_tag = content.split("<reasoning>")[1] if len(content.split("<reasoning>")) > 1 else "" | |
reasoning_content += after_tag | |
elif "</reasoning>" in content: | |
# 提取</reasoning>之前的内容添加到reasoning | |
before_tag = content.split("</reasoning>")[0] | |
reasoning_content += before_tag | |
is_reasoning = False | |
# 提取</reasoning>之后的内容添加到final_answer | |
after_reasoning = content.split("</reasoning>")[1] if len(content.split("</reasoning>")) > 1 else "" | |
final_answer += after_reasoning | |
elif is_reasoning: | |
reasoning_content += content | |
else: | |
final_answer += content | |
# 实时输出 | |
if reasoning_started: | |
yield reasoning_content, final_answer | |
else: | |
yield "", final_answer | |
# 添加生成时间 | |
elapsed_time = time.time() - start_time | |
time_info = f"\n\n⏱️ 生成用时: {elapsed_time:.2f}秒" | |
final_answer += time_info | |
yield reasoning_content, final_answer | |
except Exception as e: | |
error_msg = str(e) | |
if "api_key" in error_msg.lower(): | |
yield "", "❌ API密钥错误:请检查密钥是否有效" | |
elif "network" in error_msg.lower() or "connection" in error_msg.lower(): | |
yield "", "❌ 网络连接错误:请检查网络连接" | |
else: | |
yield "", f"❌ API调用错误: {error_msg[:200]}" | |
# 创建Gradio界面 | |
with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 Step-3 | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 输入区域 | |
image_input = gr.Image( | |
label="上传图片", | |
type="pil", | |
height=300 | |
) | |
prompt_input = gr.Textbox( | |
label="提示词", | |
placeholder="例如:这是什么?请详细描述", | |
lines=3, | |
value="请详细描述这张图片的内容。" | |
) | |
with gr.Accordion("高级设置", open=False): | |
model_select = gr.Dropdown( | |
choices=MODELS, | |
value=MODELS[0], | |
label="选择模型" | |
) | |
temperature_slider = gr.Slider( | |
minimum=0, | |
maximum=1, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens_slider = gr.Slider( | |
minimum=100, | |
maximum=4000, | |
value=2000, | |
step=100, | |
label="最大输出长度" | |
) | |
submit_btn = gr.Button("🚀 开始分析", variant="primary") | |
clear_btn = gr.Button("🗑️ 清空", variant="secondary") | |
with gr.Column(scale=1): | |
# 推理过程展示 | |
with gr.Accordion("💭 推理过程 (CoT)", open=True): | |
reasoning_output = gr.Textbox( | |
label="思考过程", | |
lines=10, | |
max_lines=15, | |
show_copy_button=True, | |
interactive=False | |
) | |
# 最终答案展示 | |
answer_output = gr.Textbox( | |
label="📝 分析结果", | |
lines=15, | |
max_lines=25, | |
show_copy_button=True, | |
interactive=False | |
) | |
# 事件处理 - 流式输出到两个文本框 | |
submit_btn.click( | |
fn=call_step_api, | |
inputs=[ | |
image_input, | |
prompt_input, | |
model_select, | |
temperature_slider, | |
max_tokens_slider | |
], | |
outputs=[reasoning_output, answer_output], | |
show_progress=True | |
) | |
clear_btn.click( | |
fn=lambda: (None, "", "", ""), | |
inputs=[], | |
outputs=[image_input, prompt_input, reasoning_output, answer_output] | |
) | |
# 页脚 | |
gr.Markdown(""" | |
--- | |
Powered by [Step-3](https://www.stepfun.com/) | |
""") | |
# 启动应用 | |
if __name__ == "__main__": | |
demo.launch() |