import gradio as gr import time import base64 from openai import OpenAI import os from io import BytesIO from PIL import Image import re # 配置 BASE_URL = "https://api.stepfun.com/v1" STEP_API_KEY = os.environ.get("STEP_API_KEY", "") def image_to_base64(image): """将图像转换为base64字符串""" if image is None: return None if isinstance(image, Image.Image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return img_str elif isinstance(image, str) and os.path.exists(image): with open(image, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') return None def process_message(message, history, image, system_prompt, temperature, max_tokens, top_p): """处理消息并生成响应,支持可选的图片输入""" print(f"[DEBUG] Processing message: {message[:100] if message else 'None'}...") print(f"[DEBUG] Has image: {image is not None}") if not message and not image: print("[DEBUG] No message or image provided, skipping") yield history return if not STEP_API_KEY: print("[DEBUG] No API key configured") error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings." display_msg = f"[Image] {message}" if image and message else "[Image]" if image else message history.append([display_msg, error_msg]) yield history return # 处理图片 image_content = None text_content = message or "" if image: # 转换图片为base64 try: image_content = image_to_base64(image) print(f"[DEBUG] Image processed successfully") except Exception as e: print(f"[DEBUG] Failed to process image: {e}") history.append([message or "[Image]", f"❌ Failed to process image: {str(e)}"]) yield history return # 构造显示消息 if image and message: display_message = f"🖼️ [Image] {message}" elif image: display_message = "🖼️ [Image]" else: display_message = message # 添加到历史 history.append([display_message, ""]) yield history # 构建API消息 messages = [] # 添加系统提示词 if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 添加历史对话 for h in history[:-1]: if h[0]: # 用户消息 - 移除图片标签 user_text = re.sub(r']+>', '', h[0]).strip() if user_text: messages.append({"role": "user", "content": user_text}) if h[1] and not h[1].startswith("❌"): messages.append({"role": "assistant", "content": h[1]}) # 添加当前消息 if image_content: current_content = [ {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}} ] if text_content: current_content.append({"type": "text", "text": text_content}) messages.append({"role": "user", "content": current_content}) else: messages.append({"role": "user", "content": text_content}) print(f"[DEBUG] Sending {len(messages)} messages to API") print(f"[DEBUG] Last message: {messages[-1]}") # 创建客户端并调用API try: # 清除所有可能的代理环境变量 import os proxy_vars = ['HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy', 'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy'] for var in proxy_vars: if var in os.environ: del os.environ[var] print(f"[DEBUG] Removed {var} from environment") # 尝试创建客户端 try: # 方法1:直接创建 client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) print("[DEBUG] Client created successfully (method 1)") except TypeError as e: if 'proxies' in str(e): print(f"[DEBUG] Method 1 failed with proxy error, trying method 2") # 方法2:使用httpx客户端 import httpx http_client = httpx.Client(trust_env=False) client = OpenAI( api_key=STEP_API_KEY, base_url=BASE_URL, http_client=http_client ) print("[DEBUG] Client created successfully (method 2)") else: raise e print("[DEBUG] Calling API...") response = client.chat.completions.create( model="step-3", messages=messages, temperature=temperature, max_tokens=max_tokens, top_p=top_p, stream=True ) print("[DEBUG] API call successful, processing stream...") # 流式输出 full_response = "" chunk_count = 0 in_reasoning = False reasoning_content = "" final_content = "" for chunk in response: chunk_count += 1 if chunk.choices and len(chunk.choices) > 0: delta = chunk.choices[0].delta if hasattr(delta, 'content') and delta.content: content = delta.content full_response += content # 检测 标签 if '' in content: in_reasoning = True # 分割内容 parts = content.split('') final_content += parts[0] if len(parts) > 1: reasoning_content += parts[1] elif '' in content: # 结束推理部分 parts = content.split('') if parts[0]: reasoning_content += parts[0] in_reasoning = False if len(parts) > 1: final_content += parts[1] elif in_reasoning: # 在推理标签内 reasoning_content += content else: # 在推理标签外 final_content += content # 实时更新显示 if reasoning_content and final_content: # 有推理和最终答案 display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n{final_content}" elif reasoning_content: # 只有推理过程 display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" else: # 只有答案或普通回复 display_text = full_response history[-1][1] = display_text if chunk_count % 5 == 0: print(f"[DEBUG] Received {chunk_count} chunks, {len(full_response)} chars") yield history print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}, Total chars: {len(full_response)}") # 最终格式化 if reasoning_content: # 如果有推理内容,使用格式化显示 final_display = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}" history[-1][1] = final_display yield history if not full_response: print("[DEBUG] No response content received") history[-1][1] = "⚠️ No response received from API" yield history except Exception as e: print(f"[DEBUG] API error: {e}") import traceback traceback.print_exc() history[-1][1] = f"❌ Error: {str(e)}" yield history # 创建Gradio界面 with gr.Blocks(title="Step-3 Chat", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 Step-3 Chat Welcome to Step-3, an advanced multimodal AI assistant by StepFun. """) with gr.Row(): with gr.Column(scale=3): # 聊天界面 chatbot = gr.Chatbot( height=600, show_label=False, elem_id="chatbot", bubble_full_width=False, avatar_images=None, render_markdown=True ) # 输入区域 with gr.Row(): with gr.Column(scale=8): msg = gr.Textbox( label="Message", placeholder="Type your message here...", lines=2, max_lines=10, show_label=False, elem_id="message-textbox" ) with gr.Column(scale=2): image_input = gr.Image( label="Upload Image (Optional)", type="filepath", height=100, interactive=True, show_label=True ) with gr.Column(scale=1, min_width=100): submit_btn = gr.Button("Send", variant="primary") # 底部按钮 with gr.Row(): clear_btn = gr.Button("🗑️ Clear", scale=1) undo_btn = gr.Button("↩️ Undo", scale=1) retry_btn = gr.Button("🔄 Retry", scale=1) with gr.Column(scale=1): # 设置面板 with gr.Accordion("⚙️ Settings", open=True): system_prompt = gr.Textbox( label="System Prompt", placeholder="You are a helpful assistant...", lines=3, value="You are Step-3, a helpful AI assistant created by StepFun." ) temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=2048, step=1, label="Max Tokens" ) top_p = gr.Slider( minimum=0, maximum=1, value=0.95, step=0.01, label="Top P" ) # 示例 with gr.Accordion("📝 Examples", open=False): gr.Examples( examples=[ ["What is machine learning?"], ["Write a Python function to calculate fibonacci numbers"], ["Explain quantum computing in simple terms"], ["What are the benefits of renewable energy?"], ["How does blockchain technology work?"] ], inputs=msg, label="" ) # 事件处理函数 def user_submit(message, history, image): """用户提交消息时的处理""" print(f"[DEBUG] user_submit called with message: {message[:50] if message else 'None'}...") print(f"[DEBUG] user_submit called with image: {image is not None}") if message or image: # 清空输入,保存消息和图片用于后续处理 return gr.update(value=""), history, gr.update(value=None), message, image return gr.update(value=message), history, gr.update(value=image), message, image def bot_response(history, saved_message, saved_image, system_prompt, temperature, max_tokens, top_p): """生成机器人响应""" print(f"[DEBUG] bot_response called with saved_message: {saved_message[:50] if saved_message else 'None'}...") print(f"[DEBUG] bot_response called with saved_image: {saved_image is not None}") if saved_message or saved_image: # 使用生成器处理消息 for updated_history in process_message(saved_message, history, saved_image, system_prompt, temperature, max_tokens, top_p): yield updated_history else: yield history def undo_last(history): if history: return history[:-1] return history def retry_last(history): if history and history[-1][0]: last_message = history[-1][0] new_history = history[:-1] return new_history, last_message return history, "" # 创建隐藏的组件来存储消息和图片 saved_msg = gr.State("") saved_img = gr.State(None) # 提交消息 - Enter键 msg.submit( user_submit, [msg, chatbot, image_input], [msg, chatbot, image_input, saved_msg, saved_img], queue=False ).then( bot_response, [chatbot, saved_msg, saved_img, system_prompt, temperature, max_tokens, top_p], chatbot ) # 提交消息 - Send按钮 submit_btn.click( user_submit, [msg, chatbot, image_input], [msg, chatbot, image_input, saved_msg, saved_img], queue=False ).then( bot_response, [chatbot, saved_msg, saved_img, system_prompt, temperature, max_tokens, top_p], chatbot ) # 清空对话 clear_btn.click( lambda: ([], "", None), None, [chatbot, msg, image_input] ) # 撤销最后一条 undo_btn.click( undo_last, chatbot, chatbot ) # 重试最后一条 retry_btn.click( retry_last, chatbot, [chatbot, saved_msg] ).then( bot_response, [chatbot, saved_msg, saved_img, system_prompt, temperature, max_tokens, top_p], chatbot ) # 页脚 gr.Markdown(""" ---

Powered by StepFun | Model: Step-3 | GitHub

""") # 启动应用 if __name__ == "__main__": print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") print(f"[DEBUG] Base URL: {BASE_URL}") demo.queue(max_size=20) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, show_error=True )