Step3 / app.py
Zenith Wang
彻底解决代理参数问题,添加 requests 备用方案
578098f
raw
history blame
11.3 kB
import gradio as gr
import time
import base64
import os
from io import BytesIO
from PIL import Image
# 清理环境变量中的代理设置,避免与 OpenAI 客户端冲突
for key in list(os.environ.keys()):
if 'proxy' in key.lower() or 'PROXY' in key:
del os.environ[key]
# 导入 OpenAI(在清理环境变量后)
from openai import OpenAI
# 配置
BASE_URL = "https://api.stepfun.com/v1"
# 从环境变量获取API密钥
STEP_API_KEY = os.environ.get("STEP_API_KEY", "")
# 可选模型
MODELS = ["step-3", "step-r1-v-mini"]
def image_to_base64(image):
"""将PIL图像转换为base64字符串"""
if image is None:
return None
if isinstance(image, Image.Image):
buffered = BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
return img_str
return None
def create_client():
"""创建 OpenAI 客户端,处理各种环境问题"""
import importlib
import sys
# 重新加载 openai 模块以确保干净的状态
if 'openai' in sys.modules:
importlib.reload(sys.modules['openai'])
# 尝试不同的初始化方式
try:
# 方式1:只传递必需参数
return OpenAI(
api_key=STEP_API_KEY,
base_url=BASE_URL
)
except:
pass
try:
# 方式2:通过环境变量
os.environ['OPENAI_API_KEY'] = STEP_API_KEY
os.environ['OPENAI_BASE_URL'] = BASE_URL
return OpenAI()
except:
pass
# 方式3:使用 httpx 客户端自定义
try:
import httpx
http_client = httpx.Client()
return OpenAI(
api_key=STEP_API_KEY,
base_url=BASE_URL,
http_client=http_client
)
except:
pass
# 如果都失败,返回 None
return None
def call_step_api(image, prompt, model, temperature=0.7, max_tokens=2000):
"""调用Step API进行分析,支持纯文本和图像+文本"""
if not prompt:
yield "", "❌ 请输入提示词"
return
if not STEP_API_KEY:
yield "", "❌ API密钥未配置。请在 Hugging Face Space 的 Settings 中添加 STEP_API_KEY 环境变量。"
return
# 构造消息内容
if image is not None:
# 有图片的情况
try:
base64_image = image_to_base64(image)
if base64_image is None:
yield "", "❌ 图片处理失败"
return
message_content = [
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{base64_image}",
"detail": "high"
}
},
{
"type": "text",
"text": prompt
}
]
except Exception as e:
yield "", f"❌ 图片处理错误: {str(e)}"
return
else:
# 纯文本的情况
message_content = prompt
# 构造消息
messages = [
{
"role": "user",
"content": message_content
}
]
# 创建OpenAI客户端
client = create_client()
if client is None:
# 如果客户端创建失败,尝试直接使用 requests
try:
import requests
headers = {
"Authorization": f"Bearer {STEP_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": False
}
response = requests.post(
f"{BASE_URL}/chat/completions",
headers=headers,
json=data,
timeout=60
)
if response.status_code == 200:
result = response.json()
if result.get("choices") and result["choices"][0].get("message"):
content = result["choices"][0]["message"]["content"]
# 解析 reasoning 标记
reasoning_content = ""
final_answer = content
if "<reasoning>" in content and "</reasoning>" in content:
parts = content.split("<reasoning>")
before = parts[0]
after_reasoning = parts[1].split("</reasoning>")
reasoning_content = after_reasoning[0]
final_answer = before + after_reasoning[1] if len(after_reasoning) > 1 else before
yield reasoning_content, final_answer
else:
yield "", "❌ API 返回格式错误"
else:
yield "", f"❌ API 请求失败: {response.status_code}"
except Exception as e:
yield "", f"❌ 请求失败: {str(e)}"
return
try:
# 记录开始时间
start_time = time.time()
# 流式输出
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
stream=True
)
full_response = ""
reasoning_content = ""
final_answer = ""
is_reasoning = False
reasoning_started = False
for chunk in response:
if chunk.choices and chunk.choices[0].delta:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
content = delta.content
full_response += content
# 检测reasoning标记
if "<reasoning>" in content:
is_reasoning = True
reasoning_started = True
# 提取<reasoning>之前的内容添加到final_answer
before_reasoning = content.split("<reasoning>")[0]
if before_reasoning:
final_answer += before_reasoning
# 提取<reasoning>之后的内容开始reasoning
after_tag = content.split("<reasoning>")[1] if len(content.split("<reasoning>")) > 1 else ""
reasoning_content += after_tag
elif "</reasoning>" in content:
# 提取</reasoning>之前的内容添加到reasoning
before_tag = content.split("</reasoning>")[0]
reasoning_content += before_tag
is_reasoning = False
# 提取</reasoning>之后的内容添加到final_answer
after_reasoning = content.split("</reasoning>")[1] if len(content.split("</reasoning>")) > 1 else ""
final_answer += after_reasoning
elif is_reasoning:
reasoning_content += content
else:
final_answer += content
# 实时输出
if reasoning_started:
yield reasoning_content, final_answer
else:
yield "", final_answer
# 添加生成时间
elapsed_time = time.time() - start_time
time_info = f"\n\n⏱️ 生成用时: {elapsed_time:.2f}秒"
final_answer += time_info
yield reasoning_content, final_answer
except Exception as e:
error_msg = str(e)
if "api_key" in error_msg.lower():
yield "", "❌ API密钥错误:请检查密钥是否有效"
elif "network" in error_msg.lower() or "connection" in error_msg.lower():
yield "", "❌ 网络连接错误:请检查网络连接"
else:
yield "", f"❌ API调用错误: {error_msg[:200]}"
# 创建Gradio界面
with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 Step-3
""")
with gr.Row():
with gr.Column(scale=1):
# 输入区域
image_input = gr.Image(
label="上传图片(可选)",
type="pil",
height=300
)
prompt_input = gr.Textbox(
label="提示词",
placeholder="输入你的问题或描述...",
lines=3,
value=""
)
with gr.Accordion("高级设置", open=False):
model_select = gr.Dropdown(
choices=MODELS,
value=MODELS[0],
label="选择模型"
)
temperature_slider = gr.Slider(
minimum=0,
maximum=1,
value=0.7,
step=0.1,
label="Temperature"
)
max_tokens_slider = gr.Slider(
minimum=100,
maximum=4000,
value=2000,
step=100,
label="最大输出长度"
)
submit_btn = gr.Button("🚀 开始分析", variant="primary")
clear_btn = gr.Button("🗑️ 清空", variant="secondary")
with gr.Column(scale=1):
# 推理过程展示
with gr.Accordion("💭 推理过程 (CoT)", open=True):
reasoning_output = gr.Textbox(
label="思考过程",
lines=10,
max_lines=15,
show_copy_button=True,
interactive=False
)
# 最终答案展示
answer_output = gr.Textbox(
label="📝 分析结果",
lines=15,
max_lines=25,
show_copy_button=True,
interactive=False
)
# 事件处理 - 流式输出到两个文本框
submit_btn.click(
fn=call_step_api,
inputs=[
image_input,
prompt_input,
model_select,
temperature_slider,
max_tokens_slider
],
outputs=[reasoning_output, answer_output],
show_progress=True
)
clear_btn.click(
fn=lambda: (None, "", "", ""),
inputs=[],
outputs=[image_input, prompt_input, reasoning_output, answer_output]
)
# 页脚
gr.Markdown("""
---
Powered by [Step-3](https://www.stepfun.com/)
""")
# 启动应用
if __name__ == "__main__":
demo.launch()