Spaces:
Build error
Build error
import gradio as gr | |
import time | |
import base64 | |
from openai import OpenAI | |
import os | |
from io import BytesIO | |
from PIL import Image | |
# 配置 | |
BASE_URL = "https://api.stepfun.com/v1" | |
# 从环境变量获取API密钥 | |
STEP_API_KEY = os.environ.get("STEP_API_KEY", "") | |
# 可选模型 | |
MODELS = ["step-3", "step-r1-v-mini"] | |
def image_to_base64(image): | |
"""将PIL图像转换为base64字符串""" | |
if image is None: | |
return None | |
if isinstance(image, Image.Image): | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") | |
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
return img_str | |
return None | |
def call_step_api(image, prompt, model, temperature=0.7, max_tokens=2000): | |
"""调用Step API进行分析,支持纯文本和图像+文本""" | |
if not prompt: | |
yield "", "❌ 请输入提示词" | |
return | |
if not STEP_API_KEY: | |
yield "", "❌ API密钥未配置。请在 Hugging Face Space 的 Settings 中添加 STEP_API_KEY 环境变量。" | |
return | |
# 构造消息内容 - 参考官方示例 | |
if image is not None: | |
# 有图片的情况 | |
try: | |
base64_image = image_to_base64(image) | |
if base64_image is None: | |
yield "", "❌ 图片处理失败" | |
return | |
# 按照官方示例的格式构造消息 | |
messages = [ | |
{"role": "user", "content": [ | |
{"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{base64_image}", "detail": "high"}}, | |
{"type": "text", "text": prompt} | |
]} | |
] | |
except Exception as e: | |
yield "", f"❌ 图片处理错误: {str(e)}" | |
return | |
else: | |
# 纯文本的情况 | |
messages = [ | |
{"role": "user", "content": prompt} | |
] | |
# 创建OpenAI客户端 - 完全按照官方示例 | |
try: | |
client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) | |
except Exception as e: | |
yield "", f"❌ 客户端初始化失败: {str(e)}" | |
return | |
# 记录开始时间 | |
start_time = time.time() | |
try: | |
# 调用API - 按照官方示例 | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
stream=True | |
) | |
except Exception as e: | |
yield "", f"❌ API请求失败: {str(e)}" | |
return | |
# 处理流式响应 | |
full_response = "" | |
reasoning_content = "" | |
final_answer = "" | |
is_reasoning = False | |
reasoning_started = False | |
try: | |
for chunk in response: | |
# 按照官方示例处理chunk | |
if chunk.choices and len(chunk.choices) > 0: | |
delta = chunk.choices[0].delta | |
if hasattr(delta, 'content') and delta.content: | |
content = delta.content | |
full_response += content | |
# 检测reasoning标记 | |
if "<reasoning>" in content: | |
is_reasoning = True | |
reasoning_started = True | |
# 处理标记前后的内容 | |
parts = content.split("<reasoning>") | |
if parts[0]: | |
final_answer += parts[0] | |
if len(parts) > 1: | |
reasoning_content += parts[1] | |
elif "</reasoning>" in content: | |
# 处理结束标记 | |
parts = content.split("</reasoning>") | |
if parts[0]: | |
reasoning_content += parts[0] | |
is_reasoning = False | |
if len(parts) > 1: | |
final_answer += parts[1] | |
elif is_reasoning: | |
reasoning_content += content | |
else: | |
final_answer += content | |
# 实时输出 | |
yield reasoning_content, final_answer | |
except Exception as e: | |
yield reasoning_content, final_answer + f"\n\n❌ 流处理错误: {str(e)}" | |
return | |
# 添加生成时间 | |
elapsed_time = time.time() - start_time | |
time_info = f"\n\n⏱️ 生成用时: {elapsed_time:.2f}秒" | |
final_answer += time_info | |
yield reasoning_content, final_answer | |
# 创建Gradio界面 | |
with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 Step-3 | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 输入区域 | |
image_input = gr.Image( | |
label="上传图片(可选)", | |
type="pil", | |
height=300 | |
) | |
prompt_input = gr.Textbox( | |
label="提示词", | |
placeholder="输入你的问题或描述...", | |
lines=3, | |
value="" | |
) | |
with gr.Accordion("高级设置", open=False): | |
model_select = gr.Dropdown( | |
choices=MODELS, | |
value=MODELS[0], | |
label="选择模型" | |
) | |
temperature_slider = gr.Slider( | |
minimum=0, | |
maximum=1, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
max_tokens_slider = gr.Slider( | |
minimum=100, | |
maximum=4000, | |
value=2000, | |
step=100, | |
label="最大输出长度" | |
) | |
submit_btn = gr.Button("🚀 开始分析", variant="primary") | |
clear_btn = gr.Button("🗑️ 清空", variant="secondary") | |
with gr.Column(scale=1): | |
# 推理过程展示 | |
with gr.Accordion("💭 推理过程 (CoT)", open=True): | |
reasoning_output = gr.Textbox( | |
label="思考过程", | |
lines=10, | |
max_lines=15, | |
show_copy_button=True, | |
interactive=False | |
) | |
# 最终答案展示 | |
answer_output = gr.Textbox( | |
label="📝 分析结果", | |
lines=15, | |
max_lines=25, | |
show_copy_button=True, | |
interactive=False | |
) | |
# 事件处理 - 流式输出到两个文本框 | |
submit_btn.click( | |
fn=call_step_api, | |
inputs=[ | |
image_input, | |
prompt_input, | |
model_select, | |
temperature_slider, | |
max_tokens_slider | |
], | |
outputs=[reasoning_output, answer_output], | |
show_progress=True | |
) | |
clear_btn.click( | |
fn=lambda: (None, "", "", ""), | |
inputs=[], | |
outputs=[image_input, prompt_input, reasoning_output, answer_output] | |
) | |
# 页脚 | |
gr.Markdown(""" | |
--- | |
Powered by [Step-3](https://www.stepfun.com/) | |
""") | |
# 启动应用 | |
if __name__ == "__main__": | |
demo.launch() |