Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import os | |
import io | |
import json | |
import zipfile | |
import mimetypes | |
from typing import Any, Dict, List, Optional, Tuple | |
import requests | |
import gradio as gr | |
from openai import OpenAI | |
# --------------------- Конфигурация --------------------- | |
NV_API_KEY = os.environ.get("NV_API_KEY") | |
NV_BASE_URL = os.environ.get("NV_BASE_URL", "https://integrate.api.nvidia.com/v1") | |
NV_VLM_URL = os.environ.get("NV_VLM_URL", "https://ai.api.nvidia.com/v1/vlm/microsoft/florence-2") | |
NVCF_ASSETS_URL = "https://api.nvcf.nvidia.com/v2/nvcf/assets" | |
if not NV_API_KEY: | |
raise RuntimeError("NV_API_KEY не задан. В HF Space: Settings → Secrets → NV_API_KEY") | |
llm = OpenAI(base_url=NV_BASE_URL, api_key=NV_API_KEY) | |
# --------------------- Florence utils --------------------- | |
def _guess_mime(path: str) -> str: | |
return mimetypes.guess_type(path)[0] or "image/jpeg" | |
def nvcf_upload_asset(image_path: str, description: str = "Chat image") -> str: | |
auth = requests.post( | |
NVCF_ASSETS_URL, | |
headers={ | |
"Authorization": f"Bearer {NV_API_KEY}", | |
"Content-Type": "application/json", | |
"accept": "application/json", | |
}, | |
json={"contentType": _guess_mime(image_path), "description": description}, | |
timeout=30, | |
) | |
auth.raise_for_status() | |
up_url = auth.json()["uploadUrl"] | |
asset_id = str(auth.json()["assetId"]) | |
with open(image_path, "rb") as f: | |
put = requests.put( | |
up_url, | |
data=f, | |
headers={ | |
"x-amz-meta-nvcf-asset-description": description, | |
"content-type": _guess_mime(image_path), | |
}, | |
timeout=300, | |
) | |
put.raise_for_status() | |
return asset_id | |
def _vlm_content(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> str: | |
parts = [task_token] | |
if text_prompt and text_prompt.strip(): | |
parts.append(text_prompt.strip()) | |
parts.append(f'<img src="data:image/jpeg;asset_id,{asset_id}" />') | |
return "".join(parts) | |
PRIORITY_TEXT_KEYS = [ | |
"more_detailed_caption", "detailed_caption", "caption", | |
"generated_text", "text", "ocr", "description", | |
] | |
def _deep_text_candidates(obj: Any) -> List[str]: | |
out = [] | |
def walk(o): | |
if isinstance(o, dict): | |
for k in PRIORITY_TEXT_KEYS: | |
if k in o and isinstance(o[k], str) and o[k].strip(): | |
out.append(o[k].strip()) | |
for v in o.values(): | |
walk(v) | |
elif isinstance(o, list): | |
for it in o: | |
walk(it) | |
elif isinstance(o, str): | |
if o.strip(): | |
out.append(o.strip()) | |
walk(obj) | |
return out | |
def _debug_dump_from_response(resp: requests.Response) -> str: | |
lines = [] | |
data = resp.content | |
ct = (resp.headers.get("content-type") or "").lower() | |
lines.append("=== Florence HTTP Response ===") | |
lines.append(f"status: {resp.status_code}") | |
lines.append(f"content-type: {ct}") | |
lines.append(f"bytes: {len(data)}") | |
if "application/json" in ct and not data.startswith(b"PK"): | |
try: | |
raw = resp.text | |
except Exception: | |
raw = data.decode("utf-8", errors="ignore") | |
lines.append("--- RAW JSON ---") | |
lines.append(raw) | |
return "\n".join(lines) | |
if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: | |
lines.append("--- ZIP CONTENTS ---") | |
try: | |
with zipfile.ZipFile(io.BytesIO(data), "r") as z: | |
for name in z.namelist(): | |
lines.append(f"* {name}") | |
for name in z.namelist(): | |
low = name.lower() | |
if low.endswith(".json") or low.endswith(".txt"): | |
try: | |
with z.open(name) as f: | |
raw = f.read().decode("utf-8", errors="ignore") | |
lines.append(f"\n--- FILE: {name} ---\n{raw}") | |
except Exception as e: | |
lines.append(f"\n--- FILE: {name} --- [read error: {e}]") | |
except Exception as e: | |
lines.append(f"[zip parse error: {e}]") | |
return "\n".join(lines) | |
try: | |
txt = data.decode("utf-8", errors="ignore") | |
except Exception: | |
txt = "[binary body]" | |
lines.append("--- RAW BODY ---") | |
lines.append(txt) | |
return "\n".join(lines) | |
def _parse_vlm_text(resp: requests.Response) -> str: | |
data = resp.content | |
ct = (resp.headers.get("content-type") or "").lower() | |
if "application/json" in ct and not data.startswith(b"PK"): | |
try: | |
obj = resp.json() | |
cands = _deep_text_candidates(obj) | |
return cands[0] if cands else "" | |
except Exception: | |
return "" | |
if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: | |
try: | |
with zipfile.ZipFile(io.BytesIO(data), "r") as z: | |
for name in z.namelist(): | |
if not name.lower().endswith(".json"): | |
continue | |
try: | |
with z.open(name) as f: | |
obj = json.loads(f.read().decode("utf-8", errors="ignore")) | |
cands = _deep_text_candidates(obj) | |
if cands: | |
return cands[0] | |
except Exception: | |
pass | |
for name in z.namelist(): | |
if name.lower().endswith(".txt"): | |
try: | |
with z.open(name) as f: | |
txt = f.read().decode("utf-8", errors="ignore").strip() | |
if txt: | |
return txt | |
except Exception: | |
pass | |
except Exception: | |
return "" | |
try: | |
return data.decode("utf-8", errors="ignore").strip() | |
except Exception: | |
return "" | |
def _call_florence(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> Tuple[str, str]: | |
content = _vlm_content(task_token, asset_id, text_prompt) | |
payload = {"messages": [{"role": "user", "content": content}]} | |
headers = { | |
"Authorization": f"Bearer {NV_API_KEY}", | |
"Accept": "application/zip, application/json, */*", | |
"Content-Type": "application/json", | |
"NVCF-INPUT-ASSET-REFERENCES": asset_id, | |
"NVCF-FUNCTION-ASSET-IDS": asset_id, | |
} | |
resp = requests.post(NV_VLM_URL, headers=headers, json=payload, timeout=300) | |
raw_dump = _debug_dump_from_response(resp) if resp is not None else "[no response]" | |
if not resp.ok: | |
return f"[VLM HTTP {resp.status_code}]", raw_dump | |
text = _parse_vlm_text(resp) | |
return text, raw_dump | |
def _is_good(text: str) -> bool: | |
return isinstance(text, str) and len(text.strip()) >= 3 and "изображений-результатов" not in text.lower() | |
def get_caption_with_debug(image_path: str) -> Tuple[str, str, str]: | |
asset_id = nvcf_upload_asset(image_path) | |
attempts = [ | |
("<MORE_DETAILED_CAPTION>", None), | |
("<DETAILED_CAPTION>", None), | |
("<CAPTION>", None), | |
("<OCR>", None), | |
] | |
debug_parts = [] | |
for token, prompt in attempts: | |
text, raw_dump = _call_florence(token, asset_id, prompt) | |
debug_parts.append(f"=== Attempt {token} ===\n{raw_dump}\n") | |
if _is_good(text): | |
return text, asset_id, "\n".join(debug_parts) | |
return "", asset_id, "\n".join(debug_parts) | |
# --------------------- LLM streaming utils --------------------- | |
def _extract_text_from_stream_chunk(chunk: Any) -> str: | |
try: | |
if hasattr(chunk, "choices"): | |
choices = getattr(chunk, "choices") | |
if choices: | |
c0 = choices[0] | |
delta = getattr(c0, "delta", None) | |
if delta is not None: | |
txt = getattr(delta, "reasoning_content", None) or getattr(delta, "content", None) | |
if txt: | |
return str(txt) | |
text_attr = getattr(c0, "text", None) | |
if text_attr: | |
return str(text_attr) | |
if isinstance(chunk, dict): | |
choices = chunk.get("choices") or [] | |
if choices: | |
delta = choices[0].get("delta") or {} | |
return str(delta.get("content") or delta.get("reasoning_content") or choices[0].get("text") or "") | |
except Exception: | |
pass | |
return "" | |
# --------------------- Чат-логика --------------------- | |
def respond( | |
message: Dict[str, Any], | |
chat_history: List[Dict[str, str]], | |
last_caption: str, | |
last_asset_id: str, | |
last_debug: str | |
): | |
text = (message or {}).get("text", "") if isinstance(message, dict) else str(message or "") | |
files = (message or {}).get("files", []) if isinstance(message, dict) else [] | |
def first_image_path(files) -> Optional[str]: | |
for f in files: | |
if isinstance(f, dict) and f.get("path"): | |
mt = f.get("mime_type") or _guess_mime(f["path"]) | |
if mt.startswith("image/"): | |
return f["path"] | |
elif isinstance(f, str): | |
if _guess_mime(f).startswith("image/"): | |
return f | |
return None | |
img_path = first_image_path(files) | |
parts = [] | |
if text and text.strip(): | |
parts.append(text.strip()) | |
if img_path: | |
parts.append("🖼️ [изображение]") | |
user_visible = "\n".join(parts) if parts else "🖐️" | |
chat_history = chat_history or [] | |
chat_history.append({"role": "user", "content": user_visible}) | |
chat_history.append({"role": "assistant", "content": ""}) | |
yield {"text": "", "files": []}, chat_history, last_caption, last_asset_id, (last_debug or "") | |
caption = last_caption or "" | |
asset_id = last_asset_id or "" | |
debug_raw = last_debug or "" | |
if img_path: | |
chat_history[-1]["content"] = "🔎 Обрабатываю изображение во Florence…" | |
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
try: | |
caption, asset_id, debug_raw = get_caption_with_debug(img_path) | |
except Exception as e: | |
caption, debug_raw = "", f"[Florence error] {e}" | |
if caption: | |
system_prompt = ( | |
"You are a helpful multimodal assistant. " | |
"Use the provided 'More Detailed Caption' as visual context. " | |
"If something is not visible or uncertain, say so.\n\n" | |
"Image Caption START >>>\n" | |
f"{caption}\n" | |
"<<< Image Caption END." | |
) | |
else: | |
system_prompt = ( | |
"You are a helpful assistant. " | |
"If the user refers to an image but no caption is available, ask them to reattach the image." | |
) | |
user_text_for_llm = text or ("Describe the attached image." if caption else "Hi") | |
assistant_accum = "" | |
try: | |
stream = llm.chat.completions.create( | |
model="openai/gpt-oss-120b", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_text_for_llm} | |
], | |
temperature=0.7, | |
top_p=1.0, | |
max_tokens=768, | |
stream=True, | |
) | |
for chunk in stream: | |
piece = _extract_text_from_stream_chunk(chunk) | |
if not piece: | |
continue | |
assistant_accum += piece | |
chat_history[-1]["content"] = assistant_accum | |
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
except Exception: | |
try: | |
resp = llm.chat.completions.create( | |
model="openai/gpt-oss-120b", | |
messages=[ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_text_for_llm} | |
], | |
temperature=0.7, | |
top_p=1.0, | |
max_tokens=768, | |
stream=False, | |
) | |
final_text = "" | |
if hasattr(resp, "choices"): | |
try: | |
final_text = getattr(resp.choices[0].message, "content", "") or getattr(resp.choices[0], "text", "") or "" | |
except Exception: | |
final_text = str(resp) | |
elif isinstance(resp, dict): | |
choices = resp.get("choices", []) | |
if choices: | |
m = choices[0].get("message") or choices[0] | |
final_text = m.get("content") or m.get("text") or str(m) | |
else: | |
final_text = str(resp) | |
else: | |
final_text = str(resp) | |
chat_history[-1]["content"] = final_text | |
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
except Exception as e2: | |
chat_history[-1]["content"] = f"[Ошибка LLM: {e2}]" | |
yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") | |
# --------------------- Интерфейс --------------------- | |
messenger_css = """ | |
:root { | |
--radius-xl: 16px; | |
} | |
.gradio-container { max-width: 780px !important; margin: auto; } | |
#title { text-align: center; padding: 8px 0 10px; font-size: 18px; } | |
#chat-wrap { border: 1px solid rgba(0,0,0,0.07); border-radius: var(--radius-xl); overflow: hidden; } | |
#chat { height: 520px; } | |
#bottom-bar { position: sticky; bottom: 0; background: var(--body-background-fill); border-top: 1px solid rgba(0,0,0,0.06); padding: 8px; display: flex; gap: 8px; align-items: center; } | |
#send { min-width: 42px; max-width: 42px; height: 42px; border-radius: 999px; } | |
#msg .mm-wrap { border: 1px solid rgba(0,0,0,0.08); border-radius: 999px; } | |
#raw-wrap .wrap>label { font-weight: 600; } | |
""" | |
theme = gr.themes.Soft( | |
primary_hue="cyan", | |
neutral_hue="slate", | |
).set( | |
button_large_radius="999px", | |
button_small_radius="999px", | |
block_radius="16px", | |
) | |
with gr.Blocks(theme=theme, css=messenger_css, analytics_enabled=False) as demo: | |
gr.Markdown("✨ <div id='title'>Визуальный чат: Florence → GPT‑OSS</div>") | |
caption_state = gr.State(value="") | |
asset_state = gr.State(value="") | |
debug_state = gr.State(value="") | |
with gr.Group(elem_id="chat-wrap"): | |
chatbot = gr.Chatbot(label="", height=520, elem_id="chat", type="messages") | |
with gr.Row(elem_id="bottom-bar"): | |
msg = gr.MultimodalTextbox( | |
show_label=False, | |
placeholder="Напишите сообщение… (иконка слева — добавить изображение)", | |
elem_id="msg", | |
) | |
send = gr.Button("➤", variant="primary", elem_id="send") | |
with gr.Accordion("Raw Florence output", open=True, elem_id="raw-wrap"): | |
raw_out = gr.Textbox( | |
label="", | |
value="", | |
lines=14, | |
show_copy_button=True | |
) | |
msg.submit( | |
respond, | |
inputs=[msg, chatbot, caption_state, asset_state, debug_state], | |
outputs=[msg, chatbot, caption_state, asset_state, raw_out] | |
) | |
send.click( | |
respond, | |
inputs=[msg, chatbot, caption_state, asset_state, debug_state], | |
outputs=[msg, chatbot, caption_state, asset_state, raw_out] | |
) | |
if __name__ == "__main__": | |
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=False) |