Spaces:
Running
Running
import gradio as gr | |
import json | |
import os | |
from typing import Any, List, Dict | |
import spaces | |
from PIL import Image, ImageDraw | |
import requests | |
from transformers import AutoModelForImageTextToText, AutoProcessor | |
from transformers.models.qwen2_vl.image_processing_qwen2_vl import smart_resize | |
import torch | |
import re | |
import traceback | |
# --- Configuration --- | |
MODEL_ID = "Hcompany/Holo1-3B" | |
# --- Helpers (robust across different transformers versions) --- | |
def pick_device() -> str: | |
# Force CPU per request | |
return "cpu" | |
def apply_chat_template_compat(processor, messages: List[Dict[str, Any]]) -> str: | |
""" | |
Works whether apply_chat_template lives on the processor or tokenizer, | |
or not at all (falls back to naive text join of 'text' contents). | |
""" | |
tok = getattr(processor, "tokenizer", None) | |
if hasattr(processor, "apply_chat_template"): | |
return processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
if tok is not None and hasattr(tok, "apply_chat_template"): | |
return tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
# Fallback: concatenate visible text segments | |
texts = [] | |
for m in messages: | |
for c in m.get("content", []): | |
if isinstance(c, dict) and c.get("type") == "text": | |
texts.append(c.get("text", "")) | |
return "\n".join(texts) | |
def batch_decode_compat(processor, token_id_batches, **kw): | |
tok = getattr(processor, "tokenizer", None) | |
if tok is not None and hasattr(tok, "batch_decode"): | |
return tok.batch_decode(token_id_batches, **kw) | |
if hasattr(processor, "batch_decode"): | |
return processor.batch_decode(token_id_batches, **kw) | |
raise AttributeError("No batch_decode available on processor or tokenizer.") | |
def get_image_proc_params(processor) -> Dict[str, int]: | |
""" | |
Safely access image processor params with defaults that work for Qwen2-VL family. | |
""" | |
ip = getattr(processor, "image_processor", None) | |
return { | |
"patch_size": getattr(ip, "patch_size", 14), | |
"merge_size": getattr(ip, "merge_size", 1), | |
"min_pixels": getattr(ip, "min_pixels", 256 * 256), | |
"max_pixels": getattr(ip, "max_pixels", 1280 * 1280), | |
} | |
def trim_generated(generated_ids, inputs): | |
""" | |
Trim prompt tokens from generated tokens when input_ids exist. | |
""" | |
in_ids = getattr(inputs, "input_ids", None) | |
if in_ids is None and isinstance(inputs, dict): | |
in_ids = inputs.get("input_ids", None) | |
if in_ids is None: | |
return [out_ids for out_ids in generated_ids] | |
return [out_ids[len(in_seq):] for in_seq, out_ids in zip(in_ids, generated_ids)] | |
# --- Model and Processor Loading (Load once) --- | |
print(f"Loading model and processor for {MODEL_ID} (CPU only)...") | |
model = None | |
processor = None | |
model_loaded = False | |
load_error_message = "" | |
try: | |
# CPU-friendly dtype; bf16 on CPU is spotty, so prefer float32 | |
model = AutoModelForImageTextToText.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.float32, | |
trust_remote_code=True | |
).to(pick_device()) | |
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) | |
model_loaded = True | |
print("Model and processor loaded successfully.") | |
except Exception as e: | |
load_error_message = ( | |
f"Error loading model/processor: {e}\n" | |
"This might be due to network issues, an incorrect model ID, or incompatible library versions.\n" | |
"Check the full traceback in the Space logs." | |
) | |
print(load_error_message) | |
traceback.print_exc() | |
# --- Prompt builder --- | |
def get_localization_prompt(pil_image: Image.Image, instruction: str) -> List[dict]: | |
guidelines: str = ( | |
"Localize an element on the GUI image according to my instructions and " | |
"output a click position as Click(x, y) with x num pixels from the left edge " | |
"and y num pixels from the top edge." | |
) | |
return [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "image": pil_image}, | |
{"type": "text", "text": f"{guidelines}\n{instruction}"} | |
], | |
} | |
] | |
# --- Inference (CPU) --- | |
def run_inference_localization( | |
messages_for_template: List[dict[str, Any]], | |
pil_image_for_processing: Image.Image | |
) -> str: | |
""" | |
CPU inference; robust to processor/tokenizer differences and logs full traceback on failure. | |
""" | |
try: | |
model.to(pick_device()) | |
# 1) Build prompt text via robust helper | |
text_prompt = apply_chat_template_compat(processor, messages_for_template) | |
# 2) Prepare inputs (text + image) | |
inputs = processor( | |
text=[text_prompt], | |
images=[pil_image_for_processing], | |
padding=True, | |
return_tensors="pt", | |
) | |
# Move tensor inputs to the same device as model (CPU) | |
if isinstance(inputs, dict): | |
for k, v in list(inputs.items()): | |
if hasattr(v, "to"): | |
inputs[k] = v.to(model.device) | |
# 3) Generate (deterministic) | |
generated_ids = model.generate( | |
**inputs, | |
max_new_tokens=128, | |
do_sample=False, | |
) | |
# 4) Trim prompt tokens if possible | |
generated_ids_trimmed = trim_generated(generated_ids, inputs) | |
# 5) Decode via robust helper | |
decoded_output = batch_decode_compat( | |
processor, | |
generated_ids_trimmed, | |
skip_special_tokens=True, | |
clean_up_tokenization_spaces=False | |
) | |
return decoded_output[0] if decoded_output else "" | |
except Exception as e: | |
print(f"Error during model inference: {e}") | |
traceback.print_exc() | |
raise | |
# --- Gradio processing function --- | |
def predict_click_location(input_pil_image: Image.Image, instruction: str): | |
if not model_loaded or not processor or not model: | |
return f"Model not loaded. Error: {load_error_message}", None | |
if not input_pil_image: | |
return "No image provided. Please upload an image.", None | |
if not instruction or instruction.strip() == "": | |
return "No instruction provided. Please type an instruction.", input_pil_image.copy().convert("RGB") | |
# 1) Resize according to image processor params (safe defaults if missing) | |
try: | |
ip = get_image_proc_params(processor) | |
resized_height, resized_width = smart_resize( | |
input_pil_image.height, | |
input_pil_image.width, | |
factor=ip["patch_size"] * ip["merge_size"], | |
min_pixels=ip["min_pixels"], | |
max_pixels=ip["max_pixels"], | |
) | |
resized_image = input_pil_image.resize( | |
size=(resized_width, resized_height), | |
resample=Image.Resampling.LANCZOS | |
) | |
except Exception as e: | |
print(f"Error resizing image: {e}") | |
traceback.print_exc() | |
return f"Error resizing image: {e}", input_pil_image.copy().convert("RGB") | |
# 2) Build messages with image + instruction | |
messages = get_localization_prompt(resized_image, instruction) | |
# 3) Run inference | |
try: | |
coordinates_str = run_inference_localization(messages, resized_image) | |
except Exception as e: | |
return f"Error during model inference: {e}", resized_image.copy().convert("RGB") | |
# 4) Parse coordinates and draw marker | |
output_image_with_click = resized_image.copy().convert("RGB") | |
match = re.search(r"Click\((\d+),\s*(\d+)\)", coordinates_str) | |
if match: | |
try: | |
x = int(match.group(1)) | |
y = int(match.group(2)) | |
draw = ImageDraw.Draw(output_image_with_click) | |
radius = max(5, min(resized_width // 100, resized_height // 100, 15)) | |
bbox = (x - radius, y - radius, x + radius, y + radius) | |
draw.ellipse(bbox, outline="red", width=max(2, radius // 4)) | |
print(f"Predicted and drawn click at: ({x}, {y}) on resized image ({resized_width}x{resized_height})") | |
except Exception as e: | |
print(f"Error drawing on image: {e}") | |
traceback.print_exc() | |
else: | |
print(f"Could not parse 'Click(x, y)' from model output: {coordinates_str}") | |
return coordinates_str, output_image_with_click | |
# --- Load Example Data --- | |
example_image = None | |
example_instruction = "Select July 14th as the check-out date" | |
try: | |
example_image_url = "https://huggingface.co/Hcompany/Holo1-7B/resolve/main/calendar_example.jpg" | |
example_image = Image.open(requests.get(example_image_url, stream=True).raw) | |
except Exception as e: | |
print(f"Could not load example image from URL: {e}") | |
traceback.print_exc() | |
try: | |
example_image = Image.new("RGB", (200, 150), color="lightgray") | |
draw = ImageDraw.Draw(example_image) | |
draw.text((10, 10), "Example image\nfailed to load", fill="black") | |
except Exception: | |
pass | |
# --- Gradio UI --- | |
title = "Holo1-7B: Action VLM Localization Demo (CPU)" | |
article = f""" | |
<p style='text-align: center'> | |
Model: <a href='https://huggingface.co/{MODEL_ID}' target='_blank'>{MODEL_ID}</a> by HCompany | | |
Paper: <a href='https://cdn.prod.website-files.com/67e2dbd9acff0c50d4c8a80c/683ec8095b353e8b38317f80_h_tech_report_v1.pdf' target='_blank'>HCompany Tech Report</a> | | |
Blog: <a href='https://www.hcompany.ai/surfer-h' target='_blank'>Surfer-H Blog Post</a> | |
</p> | |
""" | |
if not model_loaded: | |
with gr.Blocks() as demo: | |
gr.Markdown(f"# <center>⚠️ Error: Model Failed to Load ⚠️</center>") | |
gr.Markdown(f"<center>{load_error_message}</center>") | |
gr.Markdown("<center>See Space logs for the full traceback.</center>") | |
else: | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_image_component = gr.Image(type="pil", label="Input UI Image", height=400) | |
instruction_component = gr.Textbox( | |
label="Instruction", | |
placeholder="e.g., Click the 'Login' button", | |
info="Type the action you want the model to localize on the image." | |
) | |
submit_button = gr.Button("Localize Click", variant="primary") | |
with gr.Column(scale=1): | |
output_coords_component = gr.Textbox( | |
label="Predicted Coordinates (Format: Click(x, y))", | |
interactive=False | |
) | |
output_image_component = gr.Image( | |
type="pil", | |
label="Image with Predicted Click Point", | |
height=400, | |
interactive=False | |
) | |
if example_image: | |
gr.Examples( | |
examples=[[example_image, example_instruction]], | |
inputs=[input_image_component, instruction_component], | |
outputs=[output_coords_component, output_image_component], | |
fn=predict_click_location, | |
cache_examples="lazy", | |
) | |
gr.Markdown(article) | |
submit_button.click( | |
fn=predict_click_location, | |
inputs=[input_image_component, instruction_component], | |
outputs=[output_coords_component, output_image_component] | |
) | |
if __name__ == "__main__": | |
# CPU Spaces can be slow; keep debug True for logs | |
demo.launch(debug=True) | |