import gradio as gr import time import base64 from openai import OpenAI import os from io import BytesIO from PIL import Image import re # 配置 BASE_URL = "https://api.stepfun.com/v1" STEP_API_KEY = os.environ.get("STEP_API_KEY", "") def image_to_base64(image): """将图像转换为base64字符串""" if image is None: return None if isinstance(image, Image.Image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return img_str elif isinstance(image, str) and os.path.exists(image): with open(image, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') return None def process_message(message, history, images, system_prompt, temperature, max_tokens, top_p): """处理消息并生成响应,支持可选的多图片输入""" print(f"[DEBUG] Processing message: {message[:100] if message else 'None'}...") print(f"[DEBUG] Has images: {images is not None and len(images) > 0 if images else False}") if not message and not images: print("[DEBUG] No message or images provided, skipping") yield history return if not STEP_API_KEY: print("[DEBUG] No API key configured") error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings." if images and message: display_msg = f"[{len(images)} Images] {message}" elif images: display_msg = f"[{len(images)} Images]" else: display_msg = message history.append([display_msg, error_msg]) yield history return # 处理多张图片 image_contents = [] if images: for img_path in images: try: # 获取文件路径 if hasattr(img_path, 'name'): img_file = img_path.name else: img_file = img_path # 转换图片为base64 with Image.open(img_file) as img: image_content = image_to_base64(img) if image_content: image_contents.append(image_content) print(f"[DEBUG] Image {len(image_contents)} processed successfully") except Exception as e: print(f"[DEBUG] Failed to process image: {e}") history.append([message or f"[{len(images)} Images]", f"❌ Failed to process image: {str(e)}"]) yield history return # 构造显示消息 text_content = message or "" if image_contents and message: display_message = f"🖼️ [{len(image_contents)} Image{'s' if len(image_contents) > 1 else ''}] {message}" elif image_contents: display_message = f"🖼️ [{len(image_contents)} Image{'s' if len(image_contents) > 1 else ''}]" else: display_message = message # 添加到历史 history.append([display_message, ""]) yield history # 构建API消息 messages = [] # 添加系统提示词 if system_prompt: messages.append({"role": "system", "content": system_prompt}) # 添加历史对话 for h in history[:-1]: if h[0]: # 用户消息 - 移除图片标签 user_text = re.sub(r']+>', '', h[0]).strip() if user_text: messages.append({"role": "user", "content": user_text}) if h[1] and not h[1].startswith("❌"): messages.append({"role": "assistant", "content": h[1]}) # 添加当前消息 if image_contents: current_content = [] # 添加所有图片 for img_base64 in image_contents: current_content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{img_base64}", "detail": "high"} }) # 添加文本 if text_content: current_content.append({"type": "text", "text": text_content}) messages.append({"role": "user", "content": current_content}) else: messages.append({"role": "user", "content": text_content}) print(f"[DEBUG] Sending {len(messages)} messages to API") print(f"[DEBUG] Last message: {messages[-1]}") # 创建客户端并调用API try: # 清除所有可能的代理环境变量 import os proxy_vars = ['HTTP_PROXY', 'HTTPS_PROXY', 'http_proxy', 'https_proxy', 'ALL_PROXY', 'all_proxy', 'NO_PROXY', 'no_proxy'] for var in proxy_vars: if var in os.environ: del os.environ[var] print(f"[DEBUG] Removed {var} from environment") # 尝试创建客户端 try: # 方法1:直接创建 client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) print("[DEBUG] Client created successfully (method 1)") except TypeError as e: if 'proxies' in str(e): print(f"[DEBUG] Method 1 failed with proxy error, trying method 2") # 方法2:使用httpx客户端 import httpx http_client = httpx.Client(trust_env=False) client = OpenAI( api_key=STEP_API_KEY, base_url=BASE_URL, http_client=http_client ) print("[DEBUG] Client created successfully (method 2)") else: raise e print("[DEBUG] Calling API...") response = client.chat.completions.create( model="step-3", messages=messages, temperature=temperature, max_tokens=max_tokens, top_p=top_p, stream=True ) print("[DEBUG] API call successful, processing stream...") # 流式输出 full_response = "" chunk_count = 0 in_reasoning = False reasoning_content = "" final_content = "" for chunk in response: chunk_count += 1 if chunk.choices and len(chunk.choices) > 0: delta = chunk.choices[0].delta if hasattr(delta, 'content') and delta.content: content = delta.content full_response += content # 检测 标签 if '' in content: in_reasoning = True # 分割内容 parts = content.split('') final_content += parts[0] if len(parts) > 1: reasoning_content += parts[1] elif '' in content: # 结束推理部分 parts = content.split('') if parts[0]: reasoning_content += parts[0] in_reasoning = False if len(parts) > 1: final_content += parts[1] elif in_reasoning: # 在推理标签内 reasoning_content += content else: # 在推理标签外 final_content += content # 实时更新显示 if reasoning_content and final_content: # 有推理和最终答案 display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n{final_content}" elif reasoning_content: # 只有推理过程 display_text = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n*Generating...*" else: # 只有答案或普通回复 display_text = full_response history[-1][1] = display_text if chunk_count % 5 == 0: print(f"[DEBUG] Received {chunk_count} chunks, {len(full_response)} chars") yield history print(f"[DEBUG] Stream complete. Total chunks: {chunk_count}, Total chars: {len(full_response)}") # 最终格式化 if reasoning_content: # 如果有推理内容,使用格式化显示 final_display = f"💭 **Chain of Thought:**\n\n{reasoning_content}\n\n---\n\n📝 **Answer:**\n\n{final_content.strip()}" history[-1][1] = final_display yield history if not full_response: print("[DEBUG] No response content received") history[-1][1] = "⚠️ No response received from API" yield history except Exception as e: print(f"[DEBUG] API error: {e}") import traceback traceback.print_exc() history[-1][1] = f"❌ Error: {str(e)}" yield history # 创建Gradio界面 css = """ .compact-image .wrap { font-size: 12px !important; } .compact-image .upload-container { min-height: 80px !important; } .compact-image .wrap .or { display: none !important; } """ with gr.Blocks(title="Step-3", theme=gr.themes.Soft(), css=css) as demo: gr.Markdown(""" # StepFun Logo Step-3 Welcome to Step-3, an advanced multimodal AI assistant by StepFun. """) with gr.Row(): with gr.Column(scale=3): # 聊天界面 chatbot = gr.Chatbot( height=600, show_label=False, elem_id="chatbot", bubble_full_width=False, avatar_images=None, render_markdown=True ) # 输入区域 with gr.Row(): with gr.Column(scale=8): msg = gr.Textbox( label="Message", placeholder="Type your message here...", lines=2, max_lines=10, show_label=False, elem_id="message-textbox" ) with gr.Column(scale=2): image_input = gr.File( label="Images", file_count="multiple", file_types=[".png", ".jpg", ".jpeg", ".gif", ".webp"], interactive=True, show_label=True ) with gr.Column(scale=1, min_width=100): submit_btn = gr.Button("Send", variant="primary") # 底部按钮 with gr.Row(): clear_btn = gr.Button("🗑️ Clear", scale=1) undo_btn = gr.Button("↩️ Undo", scale=1) retry_btn = gr.Button("🔄 Retry", scale=1) with gr.Column(scale=1): # 设置面板 with gr.Accordion("⚙️ Settings", open=True): system_prompt = gr.Textbox( label="System Prompt", placeholder="You are a helpful assistant...", lines=3, value="You are Step-3, a helpful AI assistant created by StepFun." ) temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=2048, step=1, label="Max Tokens" ) top_p = gr.Slider( minimum=0, maximum=1, value=0.95, step=0.01, label="Top P" ) # 事件处理函数 def user_submit(message, history, images): """用户提交消息时的处理""" print(f"[DEBUG] user_submit called with message: {message[:50] if message else 'None'}...") print(f"[DEBUG] user_submit called with images: {len(images) if images else 0} files") if message or images: # 清空输入,保存消息和图片用于后续处理 return gr.update(value=""), history, gr.update(value=None), message, images return gr.update(value=message), history, gr.update(value=images), message, images def bot_response(history, saved_message, saved_images, system_prompt, temperature, max_tokens, top_p): """生成机器人响应""" print(f"[DEBUG] bot_response called with saved_message: {saved_message[:50] if saved_message else 'None'}...") print(f"[DEBUG] bot_response called with saved_images: {len(saved_images) if saved_images else 0} files") if saved_message or saved_images: # 使用生成器处理消息 for updated_history in process_message(saved_message, history, saved_images, system_prompt, temperature, max_tokens, top_p): yield updated_history else: yield history def undo_last(history): if history: return history[:-1] return history def retry_last(history): if history and history[-1][0]: last_message = history[-1][0] new_history = history[:-1] return new_history, last_message return history, "" # 创建隐藏的组件来存储消息和图片 saved_msg = gr.State("") saved_imgs = gr.State([]) # 提交消息 - Enter键 msg.submit( user_submit, [msg, chatbot, image_input], [msg, chatbot, image_input, saved_msg, saved_imgs], queue=False ).then( bot_response, [chatbot, saved_msg, saved_imgs, system_prompt, temperature, max_tokens, top_p], chatbot ) # 提交消息 - Send按钮 submit_btn.click( user_submit, [msg, chatbot, image_input], [msg, chatbot, image_input, saved_msg, saved_imgs], queue=False ).then( bot_response, [chatbot, saved_msg, saved_imgs, system_prompt, temperature, max_tokens, top_p], chatbot ) # 清空对话 clear_btn.click( lambda: ([], "", None), None, [chatbot, msg, image_input] ) # 撤销最后一条 undo_btn.click( undo_last, chatbot, chatbot ) # 重试最后一条 retry_btn.click( retry_last, chatbot, [chatbot, saved_msg] ).then( bot_response, [chatbot, saved_msg, saved_imgs, system_prompt, temperature, max_tokens, top_p], chatbot ) # 页脚 gr.Markdown(""" ---

Powered by StepFun | Model: Step-3 | GitHub

""") # 启动应用 if __name__ == "__main__": print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") print(f"[DEBUG] Base URL: {BASE_URL}") demo.queue(max_size=20) demo.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, show_error=True )