#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import io import json import zipfile import mimetypes from typing import Any, Dict, List, Optional, Tuple import requests import gradio as gr from openai import OpenAI # --------------------- Конфигурация --------------------- NV_API_KEY = os.environ.get("NV_API_KEY") NV_BASE_URL = os.environ.get("NV_BASE_URL", "https://integrate.api.nvidia.com/v1") NV_VLM_URL = os.environ.get("NV_VLM_URL", "https://ai.api.nvidia.com/v1/vlm/microsoft/florence-2") NVCF_ASSETS_URL = "https://api.nvcf.nvidia.com/v2/nvcf/assets" if not NV_API_KEY: raise RuntimeError("NV_API_KEY не задан. В HF Space: Settings → Secrets → NV_API_KEY") llm = OpenAI(base_url=NV_BASE_URL, api_key=NV_API_KEY) # --------------------- Florence utils --------------------- def _guess_mime(path: str) -> str: return mimetypes.guess_type(path)[0] or "image/jpeg" def nvcf_upload_asset(image_path: str, description: str = "Chat image") -> str: auth = requests.post( NVCF_ASSETS_URL, headers={ "Authorization": f"Bearer {NV_API_KEY}", "Content-Type": "application/json", "accept": "application/json", }, json={"contentType": _guess_mime(image_path), "description": description}, timeout=30, ) auth.raise_for_status() up_url = auth.json()["uploadUrl"] asset_id = str(auth.json()["assetId"]) with open(image_path, "rb") as f: put = requests.put( up_url, data=f, headers={ "x-amz-meta-nvcf-asset-description": description, "content-type": _guess_mime(image_path), }, timeout=300, ) put.raise_for_status() return asset_id def _vlm_content(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> str: parts = [task_token] if text_prompt and text_prompt.strip(): parts.append(text_prompt.strip()) parts.append(f'') return "".join(parts) PRIORITY_TEXT_KEYS = [ "more_detailed_caption", "detailed_caption", "caption", "generated_text", "text", "ocr", "description", ] def _deep_text_candidates(obj: Any) -> List[str]: out = [] def walk(o): if isinstance(o, dict): for k in PRIORITY_TEXT_KEYS: if k in o and isinstance(o[k], str) and o[k].strip(): out.append(o[k].strip()) for v in o.values(): walk(v) elif isinstance(o, list): for it in o: walk(it) elif isinstance(o, str): if o.strip(): out.append(o.strip()) walk(obj) return out def _debug_dump_from_response(resp: requests.Response) -> str: lines = [] data = resp.content ct = (resp.headers.get("content-type") or "").lower() lines.append("=== Florence HTTP Response ===") lines.append(f"status: {resp.status_code}") lines.append(f"content-type: {ct}") lines.append(f"bytes: {len(data)}") if "application/json" in ct and not data.startswith(b"PK"): try: raw = resp.text except Exception: raw = data.decode("utf-8", errors="ignore") lines.append("--- RAW JSON ---") lines.append(raw) return "\n".join(lines) if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: lines.append("--- ZIP CONTENTS ---") try: with zipfile.ZipFile(io.BytesIO(data), "r") as z: for name in z.namelist(): lines.append(f"* {name}") for name in z.namelist(): low = name.lower() if low.endswith(".json") or low.endswith(".txt"): try: with z.open(name) as f: raw = f.read().decode("utf-8", errors="ignore") lines.append(f"\n--- FILE: {name} ---\n{raw}") except Exception as e: lines.append(f"\n--- FILE: {name} --- [read error: {e}]") except Exception as e: lines.append(f"[zip parse error: {e}]") return "\n".join(lines) try: txt = data.decode("utf-8", errors="ignore") except Exception: txt = "[binary body]" lines.append("--- RAW BODY ---") lines.append(txt) return "\n".join(lines) def _parse_vlm_text(resp: requests.Response) -> str: data = resp.content ct = (resp.headers.get("content-type") or "").lower() if "application/json" in ct and not data.startswith(b"PK"): try: obj = resp.json() cands = _deep_text_candidates(obj) return cands[0] if cands else "" except Exception: return "" if data.startswith(b"PK") or "zip" in ct or "octet-stream" in ct: try: with zipfile.ZipFile(io.BytesIO(data), "r") as z: for name in z.namelist(): if not name.lower().endswith(".json"): continue try: with z.open(name) as f: obj = json.loads(f.read().decode("utf-8", errors="ignore")) cands = _deep_text_candidates(obj) if cands: return cands[0] except Exception: pass for name in z.namelist(): if name.lower().endswith(".txt"): try: with z.open(name) as f: txt = f.read().decode("utf-8", errors="ignore").strip() if txt: return txt except Exception: pass except Exception: return "" try: return data.decode("utf-8", errors="ignore").strip() except Exception: return "" def _call_florence(task_token: str, asset_id: str, text_prompt: Optional[str] = None) -> Tuple[str, str]: content = _vlm_content(task_token, asset_id, text_prompt) payload = {"messages": [{"role": "user", "content": content}]} headers = { "Authorization": f"Bearer {NV_API_KEY}", "Accept": "application/zip, application/json, */*", "Content-Type": "application/json", "NVCF-INPUT-ASSET-REFERENCES": asset_id, "NVCF-FUNCTION-ASSET-IDS": asset_id, } resp = requests.post(NV_VLM_URL, headers=headers, json=payload, timeout=300) raw_dump = _debug_dump_from_response(resp) if resp is not None else "[no response]" if not resp.ok: return f"[VLM HTTP {resp.status_code}]", raw_dump text = _parse_vlm_text(resp) return text, raw_dump def _is_good(text: str) -> bool: return isinstance(text, str) and len(text.strip()) >= 3 and "изображений-результатов" not in text.lower() def get_caption_with_debug(image_path: str) -> Tuple[str, str, str]: asset_id = nvcf_upload_asset(image_path) attempts = [ ("", None), ("", None), ("", None), ("", None), ] debug_parts = [] for token, prompt in attempts: text, raw_dump = _call_florence(token, asset_id, prompt) debug_parts.append(f"=== Attempt {token} ===\n{raw_dump}\n") if _is_good(text): return text, asset_id, "\n".join(debug_parts) return "", asset_id, "\n".join(debug_parts) # --------------------- LLM streaming utils --------------------- def _extract_text_from_stream_chunk(chunk: Any) -> str: try: if hasattr(chunk, "choices"): choices = getattr(chunk, "choices") if choices: c0 = choices[0] delta = getattr(c0, "delta", None) if delta is not None: txt = getattr(delta, "reasoning_content", None) or getattr(delta, "content", None) if txt: return str(txt) text_attr = getattr(c0, "text", None) if text_attr: return str(text_attr) if isinstance(chunk, dict): choices = chunk.get("choices") or [] if choices: delta = choices[0].get("delta") or {} return str(delta.get("content") or delta.get("reasoning_content") or choices[0].get("text") or "") except Exception: pass return "" # --------------------- Чат-логика --------------------- def respond( message: Dict[str, Any], chat_history: List[Dict[str, str]], last_caption: str, last_asset_id: str, last_debug: str ): text = (message or {}).get("text", "") if isinstance(message, dict) else str(message or "") files = (message or {}).get("files", []) if isinstance(message, dict) else [] def first_image_path(files) -> Optional[str]: for f in files: if isinstance(f, dict) and f.get("path"): mt = f.get("mime_type") or _guess_mime(f["path"]) if mt.startswith("image/"): return f["path"] elif isinstance(f, str): if _guess_mime(f).startswith("image/"): return f return None img_path = first_image_path(files) parts = [] if text and text.strip(): parts.append(text.strip()) if img_path: parts.append("🖼️ [изображение]") user_visible = "\n".join(parts) if parts else "🖐️" chat_history = chat_history or [] chat_history.append({"role": "user", "content": user_visible}) chat_history.append({"role": "assistant", "content": ""}) yield {"text": "", "files": []}, chat_history, last_caption, last_asset_id, (last_debug or "") caption = last_caption or "" asset_id = last_asset_id or "" debug_raw = last_debug or "" if img_path: chat_history[-1]["content"] = "🔎 Обрабатываю изображение во Florence…" yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") try: caption, asset_id, debug_raw = get_caption_with_debug(img_path) except Exception as e: caption, debug_raw = "", f"[Florence error] {e}" if caption: system_prompt = ( "You are a helpful multimodal assistant. " "Use the provided 'More Detailed Caption' as visual context. " "If something is not visible or uncertain, say so.\n\n" "Image Caption START >>>\n" f"{caption}\n" "<<< Image Caption END." ) else: system_prompt = ( "You are a helpful assistant. " "If the user refers to an image but no caption is available, ask them to reattach the image." ) user_text_for_llm = text or ("Describe the attached image." if caption else "Hi") assistant_accum = "" try: stream = llm.chat.completions.create( model="openai/gpt-oss-120b", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_text_for_llm} ], temperature=0.7, top_p=1.0, max_tokens=768, stream=True, ) for chunk in stream: piece = _extract_text_from_stream_chunk(chunk) if not piece: continue assistant_accum += piece chat_history[-1]["content"] = assistant_accum yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") except Exception: try: resp = llm.chat.completions.create( model="openai/gpt-oss-120b", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_text_for_llm} ], temperature=0.7, top_p=1.0, max_tokens=768, stream=False, ) final_text = "" if hasattr(resp, "choices"): try: final_text = getattr(resp.choices[0].message, "content", "") or getattr(resp.choices[0], "text", "") or "" except Exception: final_text = str(resp) elif isinstance(resp, dict): choices = resp.get("choices", []) if choices: m = choices[0].get("message") or choices[0] final_text = m.get("content") or m.get("text") or str(m) else: final_text = str(resp) else: final_text = str(resp) chat_history[-1]["content"] = final_text yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") except Exception as e2: chat_history[-1]["content"] = f"[Ошибка LLM: {e2}]" yield {"text": "", "files": []}, chat_history, caption, asset_id, (debug_raw or "") # --------------------- Интерфейс --------------------- messenger_css = """ :root { --radius-xl: 16px; } .gradio-container { max-width: 780px !important; margin: auto; } #title { text-align: center; padding: 8px 0 10px; font-size: 18px; } #chat-wrap { border: 1px solid rgba(0,0,0,0.07); border-radius: var(--radius-xl); overflow: hidden; } #chat { height: 520px; } #bottom-bar { position: sticky; bottom: 0; background: var(--body-background-fill); border-top: 1px solid rgba(0,0,0,0.06); padding: 8px; display: flex; gap: 8px; align-items: center; } #send { min-width: 42px; max-width: 42px; height: 42px; border-radius: 999px; } #msg .mm-wrap { border: 1px solid rgba(0,0,0,0.08); border-radius: 999px; } #raw-wrap .wrap>label { font-weight: 600; } """ theme = gr.themes.Soft( primary_hue="cyan", neutral_hue="slate", ).set( button_large_radius="999px", button_small_radius="999px", block_radius="16px", ) with gr.Blocks(theme=theme, css=messenger_css, analytics_enabled=False) as demo: gr.Markdown("✨
Визуальный чат: Florence → GPT‑OSS
") caption_state = gr.State(value="") asset_state = gr.State(value="") debug_state = gr.State(value="") with gr.Group(elem_id="chat-wrap"): chatbot = gr.Chatbot(label="", height=520, elem_id="chat", type="messages") with gr.Row(elem_id="bottom-bar"): msg = gr.MultimodalTextbox( show_label=False, placeholder="Напишите сообщение… (иконка слева — добавить изображение)", elem_id="msg", ) send = gr.Button("➤", variant="primary", elem_id="send") with gr.Accordion("Raw Florence output", open=True, elem_id="raw-wrap"): raw_out = gr.Textbox( label="", value="", lines=14, show_copy_button=True ) msg.submit( respond, inputs=[msg, chatbot, caption_state, asset_state, debug_state], outputs=[msg, chatbot, caption_state, asset_state, raw_out] ) send.click( respond, inputs=[msg, chatbot, caption_state, asset_state, debug_state], outputs=[msg, chatbot, caption_state, asset_state, raw_out] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=False)