Spaces:
Running
Running
# app.py | |
import os | |
import io | |
import base64 | |
from typing import List, Dict, Any, Optional | |
import httpx | |
import gradio as gr | |
from PIL import Image | |
# ========= 基本配置(可用环境变量覆写)========= | |
STEPFUN_ENDPOINT = os.getenv("STEPFUN_ENDPOINT", "https://api.stepfun.com/v1") | |
MODEL_NAME = os.getenv("STEPFUN_MODEL", "step-3") # 可改为 step-r1-v-mini | |
REQUEST_TIMEOUT = float(os.getenv("REQUEST_TIMEOUT", "60")) | |
MAX_CHARS = int(os.getenv("MAX_CHARS", "20000")) # 返回文本最大展示长度 | |
# =========================================== | |
def _get_api_key() -> Optional[str]: | |
""" | |
优先读 OPENAI_API_KEY(OpenAI 兼容习惯),否则读 STEPFUN_KEY。 | |
在 HF Space → Settings → Variables and secrets 添加其中一个即可。 | |
""" | |
return os.getenv("OPENAI_API_KEY") or os.getenv("STEPFUN_KEY") | |
def _pil_to_data_url(img: Image.Image, fmt: str = "PNG") -> str: | |
""" | |
PIL.Image -> data:image/...;base64,... 字符串(适配 OpenAI 兼容的 image_url) | |
""" | |
buf = io.BytesIO() | |
img.save(buf, format=fmt) | |
b64 = base64.b64encode(buf.getvalue()).decode("utf-8") | |
mime = "image/png" if fmt.upper() == "PNG" else "image/jpeg" | |
return f"data:{mime};base64,{b64}" | |
def _truncate(text: str, limit: int = MAX_CHARS) -> str: | |
""" | |
软截断,避免一次性把超长内容写给前端导致传输异常。 | |
""" | |
if text is None: | |
return "" | |
if len(text) <= limit: | |
return text | |
return text[:limit] + "\n\n[输出过长,已截断]" | |
def _post_chat(messages: List[Dict[str, Any]], temperature: float = 0.7, | |
max_tokens: Optional[int] = None) -> str: | |
""" | |
直接请求 StepFun 的 /v1/chat/completions(OpenAI 兼容)。 | |
返回纯字符串,不抛异常,交由上层统一处理。 | |
""" | |
api_key = _get_api_key() | |
if not api_key: | |
# 不 raise,让 UI 只显示字符串,避免 Uvicorn/h11 生成异常页 | |
return ("[配置错误] 未检测到 API Key。\n" | |
"请到 Space 的 Settings → Variables and secrets 添加:\n" | |
" OPENAI_API_KEY=你的 StepFun API Key (或使用 STEPFUN_KEY)") | |
url = f"{STEPFUN_ENDPOINT.rstrip('/')}/chat/completions" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
payload: Dict[str, Any] = { | |
"model": MODEL_NAME, | |
"messages": messages, | |
"temperature": temperature, | |
} | |
if max_tokens is not None: | |
payload["max_tokens"] = max_tokens | |
try: | |
with httpx.Client(timeout=REQUEST_TIMEOUT) as client: | |
resp = client.post(url, headers=headers, json=payload) | |
resp.raise_for_status() | |
data = resp.json() | |
# 标准 OpenAI 兼容返回 | |
content = data["choices"][0]["message"]["content"] | |
return _truncate(str(content)) | |
except httpx.HTTPStatusError as e: | |
body = e.response.text if e.response is not None else repr(e) | |
code = getattr(e.response, "status_code", "?") | |
return _truncate(f"[HTTP {code}] 接口错误:\n{body}") | |
except Exception as e: | |
# 网络/解析/其他错误 | |
return _truncate(f"[调用失败] {repr(e)}") | |
def chat_with_step3(image: Optional[Image.Image], question: str, temperature: float) -> str: | |
""" | |
Gradio 回调:接收图片与问题文本,返回字符串。 | |
任何异常都“吃掉”,只返回文本,防止框架层渲染异常页。 | |
""" | |
try: | |
# 输入兜底 | |
if image is None and not question.strip(): | |
return "请上传一张图片,或至少输入一个问题。" | |
content: List[Dict[str, Any]] = [] | |
if image is not None: | |
data_url = _pil_to_data_url(image, fmt="PNG") | |
content.append({"type": "image_url", "image_url": {"url": data_url}}) | |
if question.strip(): | |
content.append({"type": "text", "text": question.strip()}) | |
else: | |
content.append({"type": "text", "text": "请描述这张图片。"}) | |
messages = [{"role": "user", "content": content}] | |
return _post_chat(messages, temperature=temperature) | |
except Exception as e: | |
# 再兜一层底,避免任何未捕获异常冒泡 | |
return _truncate(f"[运行时错误] {repr(e)}") | |
# ================== Gradio UI ================== | |
with gr.Blocks(title="Step3 (StepFun API Demo)", analytics_enabled=False) as demo: | |
gr.Markdown( | |
""" | |
# Step3 · 图文对话演示(StepFun OpenAI 兼容接口) | |
- 在 **Settings → Variables and secrets** 添加 `OPENAI_API_KEY`(或 `STEPFUN_KEY`)后即可使用 | |
- 后端通过 `https://api.stepfun.com/v1/chat/completions`,不依赖 `openai` SDK | |
""" | |
) | |
with gr.Row(): | |
image = gr.Image(type="pil", label="上传图片(可选)") | |
question = gr.Textbox(label="问题", placeholder="例如:帮我看看这是什么菜,怎么做?") | |
temperature = gr.Slider(0.0, 1.5, value=0.7, step=0.1, label="Temperature") | |
submit = gr.Button("提交", variant="primary") | |
output = gr.Textbox(label="模型回答", lines=12) | |
submit.click(fn=chat_with_step3, inputs=[image, question, temperature], outputs=[output]) | |
gr.Markdown( | |
""" | |
**小贴士:** | |
- 如见到 `[配置错误] 未检测到 API Key`,请检查 Space 的 Secrets | |
- 如需改模型:设置环境变量 `STEPFUN_MODEL`,或在代码顶部修改默认值 | |
- 如输出非常长,会自动做软截断避免传输异常 | |
""" | |
) | |
if __name__ == "__main__": | |
# 兼容不同 gradio 版本的 queue() 参数签名 | |
try: | |
demo.queue(concurrency_count=2, max_size=32).launch(show_error=False, quiet=True) | |
except TypeError: | |
# 老签名:只接受并发(位置参数) | |
try: | |
demo.queue(2).launch(show_error=False, quiet=True) | |
except TypeError: | |
# 最保守:不传参数 | |
demo.queue().launch(show_error=False, quiet=True) | |