import gradio as gr import time import base64 from openai import OpenAI import os from io import BytesIO from PIL import Image import re # 配置 BASE_URL = "https://api.stepfun.com/v1" # 从环境变量获取API密钥 STEP_API_KEY = os.environ.get("STEP_API_KEY", "") def image_to_base64(image): """将PIL图像转换为base64字符串""" if image is None: return None if isinstance(image, Image.Image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return img_str elif isinstance(image, str) and os.path.exists(image): # 如果是文件路径 with open(image, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') return None def extract_cot_and_answer(text): """从响应中提取CoT推理过程和最终答案""" # 匹配标签内的内容 reasoning_pattern = re.compile(r'(.*?)', re.DOTALL) match = reasoning_pattern.search(text) if match: cot = match.group(1).strip() # 移除reasoning标签及其内容,得到最终答案 answer = reasoning_pattern.sub('', text).strip() return cot, answer else: # 如果没有reasoning标签,整个响应就是答案 return "", text def call_step_api_stream(message, history, image=None): """调用Step API进行流式对话""" print(f"[DEBUG] Starting API call - Message: {message}, Has Image: {image is not None}") if not message and not image: print("[DEBUG] No message or image provided") yield history, "", "" return if not STEP_API_KEY: print("[DEBUG] API key not configured") error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings." history.append([message or "[Image]", error_msg]) yield history, "", "" return print(f"[DEBUG] API Key exists: {bool(STEP_API_KEY)}") # 处理消息和图片 display_message = message or "" image_content = None if image: try: image_content = image_to_base64(image) if image_content: display_message = f"[Image uploaded] {message}" if message else "[Image uploaded]" print(f"[DEBUG] Image processed successfully") except Exception as e: print(f"[DEBUG] Failed to process image: {e}") # 添加用户消息到历史 history.append([display_message, ""]) yield history, "", "" # 构造API消息 messages = [] # 添加历史对话(只保留文本,不包含标记) for h in history[:-1]: # 不包含当前消息 if h[0]: # 用户消息 # 移除[Image uploaded]标记 user_text = h[0].replace("[Image uploaded] ", "").replace("[Image uploaded]", "") if user_text: messages.append({"role": "user", "content": user_text}) if h[1] and not h[1].startswith("❌"): # 助手回复(排除错误消息) # 提取纯文本内容 assistant_text = h[1] # 如果包含格式化的CoT和Answer,提取完整内容 if "**Reasoning Process:**" in assistant_text: # 移除格式化标记,保留原始内容 assistant_text = re.sub(r'\*\*.*?\*\*', '', assistant_text) assistant_text = assistant_text.replace("💭", "").replace("📝", "").replace("---", "").strip() messages.append({"role": "assistant", "content": assistant_text}) # 构造当前消息 if image_content: # 有图片的情况 current_content = [ {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}} ] if message: current_content.append({"type": "text", "text": message}) messages.append({"role": "user", "content": current_content}) else: # 纯文本 if message: messages.append({"role": "user", "content": message}) print(f"[DEBUG] Messages count: {len(messages)}") # 创建客户端 try: client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) print("[DEBUG] Client created successfully") except Exception as e: print(f"[DEBUG] Client initialization failed: {e}") history[-1][1] = f"❌ Client initialization failed: {str(e)}" yield history, "", "" return # 调用API try: print("[DEBUG] Calling API...") response = client.chat.completions.create( model="step-3", messages=messages, temperature=0.7, max_tokens=2000, stream=True ) print("[DEBUG] API call successful, processing stream...") # 处理流式响应 full_response = "" current_cot = "" current_answer = "" chunk_count = 0 for chunk in response: chunk_count += 1 if chunk.choices and len(chunk.choices) > 0: delta = chunk.choices[0].delta if hasattr(delta, 'content') and delta.content: full_response += delta.content # 实时提取CoT和答案 current_cot, current_answer = extract_cot_and_answer(full_response) # 更新历史中的回复 if current_cot and current_answer: # 如果有CoT,显示完整格式 history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n{current_answer}" elif current_cot: # 只有CoT,还没有答案 history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" else: # 没有CoT,直接显示答案 history[-1][1] = current_answer if chunk_count % 5 == 0: # 每5个chunk更新一次,减少更新频率 print(f"[DEBUG] Processed {chunk_count} chunks") yield history, current_cot, current_answer if not full_response: print("[DEBUG] No response content received") history[-1][1] = "⚠️ No response received from API" yield history, "", "" else: print(f"[DEBUG] Final response length: {len(full_response)} chars") # 最终更新 yield history, current_cot, current_answer except Exception as e: print(f"[DEBUG] API request failed: {e}") import traceback traceback.print_exc() history[-1][1] = f"❌ API request failed: {str(e)}" yield history, "", "" def clear_all(): """Clear all components""" return [], None, "", "", "" # 创建Gradio界面 with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 Step-3 Hello, I am Step-3! """) with gr.Row(): with gr.Column(scale=2): # 对话界面 chatbot = gr.Chatbot( height=500, show_label=False, elem_id="chatbot", bubble_full_width=False ) with gr.Row(): with gr.Column(scale=6): # 文本输入框 msg = gr.Textbox( placeholder="Type your message here...", show_label=False, lines=2, max_lines=4, container=False, elem_id="msg" ) with gr.Column(scale=2): # 图片上传 image_input = gr.Image( label="Upload Image", type="filepath", height=80, scale=1 ) with gr.Column(scale=1): send_btn = gr.Button("Send", variant="primary", scale=1) clear_btn = gr.Button("Clear", scale=1) with gr.Column(scale=1): # CoT推理过程展示 gr.Markdown("### 💭 Chain of Thought") cot_display = gr.Textbox( label="Reasoning Process", lines=10, max_lines=15, show_label=False, interactive=False, show_copy_button=True ) gr.Markdown("### 📝 Final Answer") answer_display = gr.Textbox( label="Answer", lines=10, max_lines=15, show_label=False, interactive=False, show_copy_button=True ) # 事件处理 def on_submit(message, history, image): if message or image: return "", history, None return message, history, image # 提交消息 msg.submit( on_submit, [msg, chatbot, image_input], [msg, chatbot, image_input], queue=False ).then( call_step_api_stream, [msg, chatbot, image_input], [chatbot, cot_display, answer_display] ) send_btn.click( on_submit, [msg, chatbot, image_input], [msg, chatbot, image_input], queue=False ).then( call_step_api_stream, [msg, chatbot, image_input], [chatbot, cot_display, answer_display] ) clear_btn.click( clear_all, None, [chatbot, image_input, msg, cot_display, answer_display] ) # 页脚 gr.Markdown(""" ---
StepFun Logo
Powered by StepFun
""") # 启动应用 if __name__ == "__main__": print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") print(f"[DEBUG] Base URL: {BASE_URL}") demo.queue(max_size=10) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False )