Serg4451D's picture
Update app.py
d8cecb5 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import io
import json
import zipfile
import mimetypes
from typing import Any, Dict, List, Optional, Tuple
import requests
import gradio as gr
from openai import OpenAI
# --------------------- Конфигурация ---------------------
NV_API_KEY = os.environ.get("NV_API_KEY")
NV_BASE_URL = os.environ.get("NV_BASE_URL", "https://integrate.api.nvidia.com/v1")
NV_VLM_URL = os.environ.get("NV_VLM_URL", "https://ai.api.nvidia.com/v1/vlm/microsoft/florence-2")
NVCF_ASSETS_URL = "https://api.nvcf.nvidia.com/v2/nvcf/assets"
if not NV_API_KEY:
raise RuntimeError("NV_API_KEY не задан. В HF Space: Settings → Secrets → NV_API_KEY")
llm = OpenAI(base_url=NV_BASE_URL, api_key=NV_API_KEY)
# --------------------- Florence utils ---------------------
def _guess_mime(path: str) -> str:
return mimetypes.guess_type(path)[0] or "image/jpeg"
def nvcf_upload_asset(image_path: str, description: str = "Chat image") -> str:
auth = requests.post(
NVCF_ASSETS_URL,
headers={
"Authorization": f"Bearer {NV_API_KEY}",
"Content-Type": "application/json",
"accept": "application/json",
},
json={"contentType": _guess_mime(image_path), "description": description},
timeout=30,
)
auth.raise_for_status()
up_url = auth.json()["uploadUrl"]
asset_id = str(auth.json()["assetId"])
with open(image_path, "rb") as f:
put = requests.put(
up_url,
data=f,
headers={
"x-amz-meta-nvcf-asset-description": description,
"content-type": _guess_mime(image_path),
},
timeout=300,
)
put.raise_for_status()
return asset_id
def _vlm_content(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> str:
parts = [task_token]
if text_prompt and text_prompt.strip():
parts.append(text_prompt.strip())
parts.append(f'<img src="data:image/jpeg;asset_id,{asset_id}" />')
return "".join(parts)
PRIORITY_TEXT_KEYS = [
"more_detailed_caption", "detailed_caption", "caption",
"generated_text", "text", "ocr", "description",
]
def _deep_text_candidates(obj: Any) -> List[str]:
out = []
def walk(o):
if isinstance(o, dict):
for k in PRIORITY_TEXT_KEYS:
if k in o and isinstance(o[k], str) and o[k].strip():
out.append(o[k].strip())
for v in o.values():
walk(v)
elif isinstance(o, list):
for it in o:
walk(it)
elif isinstance(o, str):
if o.strip():
out.append(o.strip())
walk(obj)
return out
def _debug_dump_from_response(resp: requests.Response) -> str:
lines = []
data = resp.content
ct = (resp.headers.get("content-type") or "").lower()
lines.append("=== Florence HTTP Response ===")
lines.append(f"status: {resp.status_code}")
lines.append(f"content-type: {ct}")
lines.append(f"bytes: {len(data)}")
if "application/json" in ct and not data.startswith(b"PK"):
try:
raw = resp.text
except Exception:
raw = data.decode("utf-8", errors="ignore")
lines.append("--- RAW JSON ---")
lines.append(raw)
return "\n".join(lines)
if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct:
lines.append("--- ZIP CONTENTS ---")
try:
with zipfile.ZipFile(io.BytesIO(data), "r") as z:
for name in z.namelist():
lines.append(f"* {name}")
for name in z.namelist():
low = name.lower()
if low.endswith(".json") or low.endswith(".txt"):
try:
with z.open(name) as f:
raw = f.read().decode("utf-8", errors="ignore")
lines.append(f"\n--- FILE: {name} ---\n{raw}")
except Exception as e:
lines.append(f"\n--- FILE: {name} --- [read error: {e}]")
except Exception as e:
lines.append(f"[zip parse error: {e}]")
return "\n".join(lines)
try:
txt = data.decode("utf-8", errors="ignore")
except Exception:
txt = "[binary body]"
lines.append("--- RAW BODY ---")
lines.append(txt)
return "\n".join(lines)
def _parse_vlm_text(resp: requests.Response) -> str:
data = resp.content
ct = (resp.headers.get("content-type") or "").lower()
if "application/json" in ct and not data.startswith(b"PK"):
try:
obj = resp.json()
cands = _deep_text_candidates(obj)
return cands[0] if cands else ""
except Exception:
return ""
if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct:
try:
with zipfile.ZipFile(io.BytesIO(data), "r") as z:
for name in z.namelist():
if not name.lower().endswith(".json"):
continue
try:
with z.open(name) as f:
obj = json.loads(f.read().decode("utf-8", errors="ignore"))
cands = _deep_text_candidates(obj)
if cands:
return cands[0]
except Exception:
pass
for name in z.namelist():
if name.lower().endswith(".txt"):
try:
with z.open(name) as f:
txt = f.read().decode("utf-8", errors="ignore").strip()
if txt:
return txt
except Exception:
pass
except Exception:
return ""
try:
return data.decode("utf-8", errors="ignore").strip()
except Exception:
return ""
def _call_florence(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> Tuple[str, str]:
content = _vlm_content(task_token, asset_id, text_prompt)
payload = {"messages": [{"role": "user", "content": content}]}
headers = {
"Authorization": f"Bearer {NV_API_KEY}",
"Accept": "application/zip, application/json, */*",
"Content-Type": "application/json",
"NVCF-INPUT-ASSET-REFERENCES": asset_id,
"NVCF-FUNCTION-ASSET-IDS": asset_id,
}
resp = requests.post(NV_VLM_URL, headers=headers, json=payload, timeout=300)
raw_dump = _debug_dump_from_response(resp) if resp is not None else "[no response]"
if not resp.ok:
return f"[VLM HTTP {resp.status_code}]", raw_dump
text = _parse_vlm_text(resp)
return text, raw_dump
def _is_good(text: str) -> bool:
return isinstance(text, str) and len(text.strip()) >= 3 and "изображений-результатов" not in text.lower()
def get_caption_with_debug(image_path: str) -> Tuple[str, str, str]:
asset_id = nvcf_upload_asset(image_path)
attempts = [
("<MORE_DETAILED_CAPTION>", None),
("<DETAILED_CAPTION>", None),
("<CAPTION>", None),
("<OCR>", None),
]
debug_parts = []
for token, prompt in attempts:
text, raw_dump = _call_florence(token, asset_id, prompt)
debug_parts.append(f"=== Attempt {token} ===\n{raw_dump}\n")
if _is_good(text):
return text, asset_id, "\n".join(debug_parts)
return "", asset_id, "\n".join(debug_parts)
# --------------------- LLM streaming utils ---------------------
def _extract_text_from_stream_chunk(chunk: Any) -> str:
try:
if hasattr(chunk, "choices"):
choices = getattr(chunk, "choices")
if choices:
c0 = choices[0]
delta = getattr(c0, "delta", None)
if delta is not None:
txt = getattr(delta, "reasoning_content", None) or getattr(delta, "content", None)
if txt:
return str(txt)
text_attr = getattr(c0, "text", None)
if text_attr:
return str(text_attr)
if isinstance(chunk, dict):
choices = chunk.get("choices") or []
if choices:
delta = choices[0].get("delta") or {}
return str(delta.get("content") or delta.get("reasoning_content") or choices[0].get("text") or "")
except Exception:
pass
return ""
# --------------------- Чат-логика ---------------------
def respond(
message: Dict[str, Any],
chat_history: List[Dict[str, str]],
last_caption: str,
last_asset_id: str,
last_debug: str
):
text = (message or {}).get("text", "") if isinstance(message, dict) else str(message or "")
files = (message or {}).get("files", []) if isinstance(message, dict) else []
def first_image_path(files) -> Optional[str]:
for f in files:
if isinstance(f, dict) and f.get("path"):
mt = f.get("mime_type") or _guess_mime(f["path"])
if mt.startswith("image/"):
return f["path"]
elif isinstance(f, str):
if _guess_mime(f).startswith("image/"):
return f
return None
img_path = first_image_path(files)
parts = []
if text and text.strip():
parts.append(text.strip())
if img_path:
parts.append("🖼️ [изображение]")
user_visible = "\n".join(parts) if parts else "🖐️"
chat_history = chat_history or []
chat_history.append({"role": "user", "content": user_visible})
chat_history.append({"role": "assistant", "content": ""})
yield {"text": "", "files": []}, chat_history, last_caption, last_asset_id, (last_debug or "")
caption = last_caption or ""
asset_id = last_asset_id or ""
debug_raw = last_debug or ""
if img_path:
chat_history[-1]["content"] = "🔎 Обрабатываю изображение во Florence…"
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "")
try:
caption, asset_id, debug_raw = get_caption_with_debug(img_path)
except Exception as e:
caption, debug_raw = "", f"[Florence error] {e}"
if caption:
system_prompt = (
"You are a helpful multimodal assistant. "
"Use the provided 'More Detailed Caption' as visual context. "
"If something is not visible or uncertain, say so.\n\n"
"Image Caption START >>>\n"
f"{caption}\n"
"<<< Image Caption END."
)
else:
system_prompt = (
"You are a helpful assistant. "
"If the user refers to an image but no caption is available, ask them to reattach the image."
)
user_text_for_llm = text or ("Describe the attached image." if caption else "Hi")
assistant_accum = ""
try:
stream = llm.chat.completions.create(
model="openai/gpt-oss-120b",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text_for_llm}
],
temperature=0.7,
top_p=1.0,
max_tokens=768,
stream=True,
)
for chunk in stream:
piece = _extract_text_from_stream_chunk(chunk)
if not piece:
continue
assistant_accum += piece
chat_history[-1]["content"] = assistant_accum
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "")
except Exception:
try:
resp = llm.chat.completions.create(
model="openai/gpt-oss-120b",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text_for_llm}
],
temperature=0.7,
top_p=1.0,
max_tokens=768,
stream=False,
)
final_text = ""
if hasattr(resp, "choices"):
try:
final_text = getattr(resp.choices[0].message, "content", "") or getattr(resp.choices[0], "text", "") or ""
except Exception:
final_text = str(resp)
elif isinstance(resp, dict):
choices = resp.get("choices", [])
if choices:
m = choices[0].get("message") or choices[0]
final_text = m.get("content") or m.get("text") or str(m)
else:
final_text = str(resp)
else:
final_text = str(resp)
chat_history[-1]["content"] = final_text
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "")
except Exception as e2:
chat_history[-1]["content"] = f"[Ошибка LLM: {e2}]"
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "")
# --------------------- Интерфейс ---------------------
messenger_css = """
:root {
--radius-xl: 16px;
}
.gradio-container { max-width: 780px !important; margin: auto; }
#title { text-align: center; padding: 8px 0 10px; font-size: 18px; }
#chat-wrap { border: 1px solid rgba(0,0,0,0.07); border-radius: var(--radius-xl); overflow: hidden; }
#chat { height: 520px; }
#bottom-bar { position: sticky; bottom: 0; background: var(--body-background-fill); border-top: 1px solid rgba(0,0,0,0.06); padding: 8px; display: flex; gap: 8px; align-items: center; }
#send { min-width: 42px; max-width: 42px; height: 42px; border-radius: 999px; }
#msg .mm-wrap { border: 1px solid rgba(0,0,0,0.08); border-radius: 999px; }
#raw-wrap .wrap>label { font-weight: 600; }
"""
theme = gr.themes.Soft(
primary_hue="cyan",
neutral_hue="slate",
).set(
button_large_radius="999px",
button_small_radius="999px",
block_radius="16px",
)
with gr.Blocks(theme=theme, css=messenger_css, analytics_enabled=False) as demo:
gr.Markdown("✨ <div id='title'>Визуальный чат: Florence → GPT‑OSS</div>")
caption_state = gr.State(value="")
asset_state = gr.State(value="")
debug_state = gr.State(value="")
with gr.Group(elem_id="chat-wrap"):
chatbot = gr.Chatbot(label="", height=520, elem_id="chat", type="messages")
with gr.Row(elem_id="bottom-bar"):
msg = gr.MultimodalTextbox(
show_label=False,
placeholder="Напишите сообщение… (иконка слева — добавить изображение)",
elem_id="msg",
)
send = gr.Button("➤", variant="primary", elem_id="send")
with gr.Accordion("Raw Florence output", open=True, elem_id="raw-wrap"):
raw_out = gr.Textbox(
label="",
value="",
lines=14,
show_copy_button=True
)
msg.submit(
respond,
inputs=[msg, chatbot, caption_state, asset_state, debug_state],
outputs=[msg, chatbot, caption_state, asset_state, raw_out]
)
send.click(
respond,
inputs=[msg, chatbot, caption_state, asset_state, debug_state],
outputs=[msg, chatbot, caption_state, asset_state, raw_out]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=False)