import gradio as gr import time import base64 import os from io import BytesIO from PIL import Image # 清理环境变量中的代理设置,避免与 OpenAI 客户端冲突 for key in list(os.environ.keys()): if 'proxy' in key.lower() or 'PROXY' in key: del os.environ[key] # 导入 OpenAI(在清理环境变量后) from openai import OpenAI # 配置 BASE_URL = "https://api.stepfun.com/v1" # 从环境变量获取API密钥 STEP_API_KEY = os.environ.get("STEP_API_KEY", "") # 可选模型 MODELS = ["step-3", "step-r1-v-mini"] def image_to_base64(image): """将PIL图像转换为base64字符串""" if image is None: return None if isinstance(image, Image.Image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() return img_str return None def create_client(): """创建 OpenAI 客户端,处理各种环境问题""" import importlib import sys # 重新加载 openai 模块以确保干净的状态 if 'openai' in sys.modules: importlib.reload(sys.modules['openai']) # 尝试不同的初始化方式 try: # 方式1:只传递必需参数 return OpenAI( api_key=STEP_API_KEY, base_url=BASE_URL ) except: pass try: # 方式2:通过环境变量 os.environ['OPENAI_API_KEY'] = STEP_API_KEY os.environ['OPENAI_BASE_URL'] = BASE_URL return OpenAI() except: pass # 方式3:使用 httpx 客户端自定义 try: import httpx http_client = httpx.Client() return OpenAI( api_key=STEP_API_KEY, base_url=BASE_URL, http_client=http_client ) except: pass # 如果都失败,返回 None return None def call_step_api(image, prompt, model, temperature=0.7, max_tokens=2000): """调用Step API进行分析,支持纯文本和图像+文本""" if not prompt: yield "", "❌ 请输入提示词" return if not STEP_API_KEY: yield "", "❌ API密钥未配置。请在 Hugging Face Space 的 Settings 中添加 STEP_API_KEY 环境变量。" return # 构造消息内容 if image is not None: # 有图片的情况 try: base64_image = image_to_base64(image) if base64_image is None: yield "", "❌ 图片处理失败" return message_content = [ { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{base64_image}", "detail": "high" } }, { "type": "text", "text": prompt } ] except Exception as e: yield "", f"❌ 图片处理错误: {str(e)}" return else: # 纯文本的情况 message_content = prompt # 构造消息 messages = [ { "role": "user", "content": message_content } ] # 创建OpenAI客户端 client = create_client() if client is None: # 如果客户端创建失败,尝试直接使用 requests try: import requests headers = { "Authorization": f"Bearer {STEP_API_KEY}", "Content-Type": "application/json" } data = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "stream": False } response = requests.post( f"{BASE_URL}/chat/completions", headers=headers, json=data, timeout=60 ) if response.status_code == 200: result = response.json() if result.get("choices") and result["choices"][0].get("message"): content = result["choices"][0]["message"]["content"] # 解析 reasoning 标记 reasoning_content = "" final_answer = content if "" in content and "" in content: parts = content.split("") before = parts[0] after_reasoning = parts[1].split("") reasoning_content = after_reasoning[0] final_answer = before + after_reasoning[1] if len(after_reasoning) > 1 else before yield reasoning_content, final_answer else: yield "", "❌ API 返回格式错误" else: yield "", f"❌ API 请求失败: {response.status_code}" except Exception as e: yield "", f"❌ 请求失败: {str(e)}" return try: # 记录开始时间 start_time = time.time() # 流式输出 response = client.chat.completions.create( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, stream=True ) full_response = "" reasoning_content = "" final_answer = "" is_reasoning = False reasoning_started = False for chunk in response: if chunk.choices and chunk.choices[0].delta: delta = chunk.choices[0].delta if hasattr(delta, 'content') and delta.content: content = delta.content full_response += content # 检测reasoning标记 if "" in content: is_reasoning = True reasoning_started = True # 提取之前的内容添加到final_answer before_reasoning = content.split("")[0] if before_reasoning: final_answer += before_reasoning # 提取之后的内容开始reasoning after_tag = content.split("")[1] if len(content.split("")) > 1 else "" reasoning_content += after_tag elif "" in content: # 提取之前的内容添加到reasoning before_tag = content.split("")[0] reasoning_content += before_tag is_reasoning = False # 提取之后的内容添加到final_answer after_reasoning = content.split("")[1] if len(content.split("")) > 1 else "" final_answer += after_reasoning elif is_reasoning: reasoning_content += content else: final_answer += content # 实时输出 if reasoning_started: yield reasoning_content, final_answer else: yield "", final_answer # 添加生成时间 elapsed_time = time.time() - start_time time_info = f"\n\n⏱️ 生成用时: {elapsed_time:.2f}秒" final_answer += time_info yield reasoning_content, final_answer except Exception as e: error_msg = str(e) if "api_key" in error_msg.lower(): yield "", "❌ API密钥错误:请检查密钥是否有效" elif "network" in error_msg.lower() or "connection" in error_msg.lower(): yield "", "❌ 网络连接错误:请检查网络连接" else: yield "", f"❌ API调用错误: {error_msg[:200]}" # 创建Gradio界面 with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 Step-3 """) with gr.Row(): with gr.Column(scale=1): # 输入区域 image_input = gr.Image( label="上传图片(可选)", type="pil", height=300 ) prompt_input = gr.Textbox( label="提示词", placeholder="输入你的问题或描述...", lines=3, value="" ) with gr.Accordion("高级设置", open=False): model_select = gr.Dropdown( choices=MODELS, value=MODELS[0], label="选择模型" ) temperature_slider = gr.Slider( minimum=0, maximum=1, value=0.7, step=0.1, label="Temperature" ) max_tokens_slider = gr.Slider( minimum=100, maximum=4000, value=2000, step=100, label="最大输出长度" ) submit_btn = gr.Button("🚀 开始分析", variant="primary") clear_btn = gr.Button("🗑️ 清空", variant="secondary") with gr.Column(scale=1): # 推理过程展示 with gr.Accordion("💭 推理过程 (CoT)", open=True): reasoning_output = gr.Textbox( label="思考过程", lines=10, max_lines=15, show_copy_button=True, interactive=False ) # 最终答案展示 answer_output = gr.Textbox( label="📝 分析结果", lines=15, max_lines=25, show_copy_button=True, interactive=False ) # 事件处理 - 流式输出到两个文本框 submit_btn.click( fn=call_step_api, inputs=[ image_input, prompt_input, model_select, temperature_slider, max_tokens_slider ], outputs=[reasoning_output, answer_output], show_progress=True ) clear_btn.click( fn=lambda: (None, "", "", ""), inputs=[], outputs=[image_input, prompt_input, reasoning_output, answer_output] ) # 页脚 gr.Markdown(""" --- Powered by [Step-3](https://www.stepfun.com/) """) # 启动应用 if __name__ == "__main__": demo.launch()