Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import json, os, re, traceback, contextlib | |
from typing import Any, List, Dict | |
import spaces | |
import torch | |
from PIL import Image, ImageDraw | |
import requests | |
from transformers import AutoModelForImageTextToText, AutoProcessor | |
from transformers.models.qwen2_vl.image_processing_qwen2_vl import smart_resize | |
# --- Configuration --- | |
MODEL_ID = "Hcompany/Holo1-3B" | |
# ---------------- Device / DType helpers ---------------- | |
def pick_device() -> str: | |
""" | |
On HF Spaces (ZeroGPU), CUDA is only available inside @spaces.GPU calls. | |
We still honor FORCE_DEVICE for local testing. | |
""" | |
forced = os.getenv("FORCE_DEVICE", "").lower().strip() | |
if forced in {"cpu", "cuda", "mps"}: | |
return forced | |
if torch.cuda.is_available(): | |
return "cuda" | |
if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available(): | |
return "mps" | |
return "cpu" | |
def pick_dtype(device: str) -> torch.dtype: | |
if device == "cuda": | |
major, _ = torch.cuda.get_device_capability() | |
return torch.bfloat16 if major >= 8 else torch.float16 # Ampere+ -> bf16 | |
if device == "mps": | |
return torch.float16 | |
return torch.float32 # CPU | |
def move_to_device(batch, device: str): | |
if isinstance(batch, dict): | |
return {k: (v.to(device, non_blocking=True) if hasattr(v, "to") else v) for k, v in batch.items()} | |
if hasattr(batch, "to"): | |
return batch.to(device, non_blocking=True) | |
return batch | |
# --- Chat/template helpers --- | |
def apply_chat_template_compat(processor, messages: List[Dict[str, Any]]) -> str: | |
tok = getattr(processor, "tokenizer", None) | |
if hasattr(processor, "apply_chat_template"): | |
return processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
if tok is not None and hasattr(tok, "apply_chat_template"): | |
return tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
texts = [] | |
for m in messages: | |
for c in m.get("content", []): | |
if isinstance(c, dict) and c.get("type") == "text": | |
texts.append(c.get("text", "")) | |
return "\n".join(texts) | |
def batch_decode_compat(processor, token_id_batches, **kw): | |
tok = getattr(processor, "tokenizer", None) | |
if tok is not None and hasattr(tok, "batch_decode"): | |
return tok.batch_decode(token_id_batches, **kw) | |
if hasattr(processor, "batch_decode"): | |
return processor.batch_decode(token_id_batches, **kw) | |
raise AttributeError("No batch_decode available on processor or tokenizer.") | |
def get_image_proc_params(processor) -> Dict[str, int]: | |
ip = getattr(processor, "image_processor", None) | |
return { | |
"patch_size": getattr(ip, "patch_size", 14), | |
"merge_size": getattr(ip, "merge_size", 1), | |
"min_pixels": getattr(ip, "min_pixels", 256 * 256), | |
"max_pixels": getattr(ip, "max_pixels", 1280 * 1280), | |
} | |
def trim_generated(generated_ids, inputs): | |
in_ids = getattr(inputs, "input_ids", None) | |
if in_ids is None and isinstance(inputs, dict): | |
in_ids = inputs.get("input_ids", None) | |
if in_ids is None: | |
return [out_ids for out_ids in generated_ids] | |
return [out_ids[len(in_seq):] for in_seq, out_ids in zip(in_ids, generated_ids)] | |
# --- Load model/processor ON CPU at import time (required for ZeroGPU) --- | |
print(f"Loading model and processor for {MODEL_ID} on CPU startup (ZeroGPU safe)...") | |
model = None | |
processor = None | |
model_loaded = False | |
load_error_message = "" | |
try: | |
model = AutoModelForImageTextToText.from_pretrained( | |
MODEL_ID, | |
torch_dtype=torch.float32, # CPU-safe dtype at import | |
trust_remote_code=True, | |
) | |
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) | |
model.eval() | |
model_loaded = True | |
print("Model and processor loaded on CPU.") | |
except Exception as e: | |
load_error_message = ( | |
f"Error loading model/processor: {e}\n" | |
"This might be due to network/model ID/library versions.\n" | |
"Check the full traceback in the logs." | |
) | |
print(load_error_message) | |
traceback.print_exc() | |
# --- Prompt builder --- | |
def get_localization_prompt(pil_image: Image.Image, instruction: str) -> List[dict]: | |
guidelines: str = ( | |
"Localize an element on the GUI image according to my instructions and " | |
"output a click position as Click(x, y) with x num pixels from the left edge " | |
"and y num pixels from the top edge." | |
) | |
return [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image", "image": pil_image}, | |
{"type": "text", "text": f"{guidelines}\n{instruction}"} | |
], | |
} | |
] | |
# --- Inference core (device passed in; AMP used when suitable) --- | |
def run_inference_localization( | |
messages_for_template: List[dict[str, Any]], | |
pil_image_for_processing: Image.Image, | |
device: str, | |
dtype: torch.dtype, | |
) -> str: | |
text_prompt = apply_chat_template_compat(processor, messages_for_template) | |
inputs = processor( | |
text=[text_prompt], | |
images=[pil_image_for_processing], | |
padding=True, | |
return_tensors="pt", | |
) | |
inputs = move_to_device(inputs, device) | |
# AMP contexts | |
if device == "cuda": | |
amp_ctx = torch.autocast(device_type="cuda", dtype=dtype) | |
elif device == "mps": | |
amp_ctx = torch.autocast(device_type="mps", dtype=torch.float16) | |
else: | |
amp_ctx = contextlib.nullcontext() | |
with amp_ctx: | |
generated_ids = model.generate( | |
**inputs, | |
max_new_tokens=128, | |
do_sample=False, | |
) | |
generated_ids_trimmed = trim_generated(generated_ids, inputs) | |
decoded_output = batch_decode_compat( | |
processor, | |
generated_ids_trimmed, | |
skip_special_tokens=True, | |
clean_up_tokenization_spaces=False | |
) | |
return decoded_output[0] if decoded_output else "" | |
# --- Gradio processing function (ZeroGPU-visible) --- | |
# Decorate the function Gradio calls so Spaces detects a GPU entry point. | |
# keep GPU attached briefly between calls (seconds) | |
def predict_click_location(input_pil_image: Image.Image, instruction: str): | |
if not model_loaded or not processor or not model: | |
return f"Model not loaded. Error: {load_error_message}", None, "device: n/a | dtype: n/a" | |
if not input_pil_image: | |
return "No image provided. Please upload an image.", None, "device: n/a | dtype: n/a" | |
if not instruction or instruction.strip() == "": | |
return "No instruction provided. Please type an instruction.", input_pil_image.copy().convert("RGB"), "device: n/a | dtype: n/a" | |
# Decide device/dtype *inside* the GPU-decorated call | |
device = pick_device() | |
dtype = pick_dtype(device) | |
# Optional perf knobs for CUDA | |
if device == "cuda": | |
torch.backends.cuda.matmul.allow_tf32 = True | |
torch.set_float32_matmul_precision("high") | |
# If needed, move model now that GPU is available | |
try: | |
p = next(model.parameters()) | |
cur_dev = p.device.type | |
cur_dtype = p.dtype | |
except StopIteration: | |
cur_dev, cur_dtype = "cpu", torch.float32 | |
if cur_dev != device or cur_dtype != dtype: | |
model.to(device=device, dtype=dtype) | |
model.eval() | |
# 1) Resize according to image processor params (safe defaults if missing) | |
try: | |
ip = get_image_proc_params(processor) | |
resized_height, resized_width = smart_resize( | |
input_pil_image.height, | |
input_pil_image.width, | |
factor=ip["patch_size"] * ip["merge_size"], | |
min_pixels=ip["min_pixels"], | |
max_pixels=ip["max_pixels"], | |
) | |
resized_image = input_pil_image.resize( | |
size=(resized_width, resized_height), | |
resample=Image.Resampling.LANCZOS | |
) | |
except Exception as e: | |
traceback.print_exc() | |
return f"Error resizing image: {e}", input_pil_image.copy().convert("RGB"), f"device: {device} | dtype: {dtype}" | |
# 2) Build messages with image + instruction | |
messages = get_localization_prompt(resized_image, instruction) | |
# 3) Run inference | |
try: | |
coordinates_str = run_inference_localization(messages, resized_image, device, dtype) | |
except Exception as e: | |
traceback.print_exc() | |
return f"Error during model inference: {e}", resized_image.copy().convert("RGB"), f"device: {device} | dtype: {dtype}" | |
# 4) Parse coordinates and draw marker | |
output_image_with_click = resized_image.copy().convert("RGB") | |
match = re.search(r"Click\((\d+),\s*(\d+)\)", coordinates_str) | |
if match: | |
try: | |
x = int(match.group(1)) | |
y = int(match.group(2)) | |
draw = ImageDraw.Draw(output_image_with_click) | |
radius = max(5, min(resized_width // 100, resized_height // 100, 15)) | |
bbox = (x - radius, y - radius, x + radius, y + radius) | |
draw.ellipse(bbox, outline="red", width=max(2, radius // 4)) | |
print(f"Predicted and drawn click at: ({x}, {y}) on resized image ({resized_width}x{resized_height})") | |
except Exception as e: | |
print(f"Error drawing on image: {e}") | |
traceback.print_exc() | |
else: | |
print(f"Could not parse 'Click(x, y)' from model output: {coordinates_str}") | |
return coordinates_str, output_image_with_click, f"device: {device} | dtype: {str(dtype).replace('torch.', '')}" | |
# --- Load Example Data --- | |
example_image = None | |
example_instruction = "Enter the server address readyforquantum.com to check its security" | |
try: | |
example_image_url = "https://readyforquantum.com/img/screentest.jpg" | |
example_image = Image.open(requests.get(example_image_url, stream=True).raw) | |
except Exception as e: | |
print(f"Could not load example image from URL: {e}") | |
traceback.print_exc() | |
try: | |
example_image = Image.new("RGB", (200, 150), color="lightgray") | |
draw = ImageDraw.Draw(example_image) | |
draw.text((10, 10), "Example image\nfailed to load", fill="black") | |
except Exception: | |
pass | |
# --- Gradio UI --- | |
title = "Holo1-3B: Holo1 Localization Demo (ZeroGPU-ready)" | |
article = f""" | |
<p style='text-align: center'> | |
Model: <a href='https://huggingface.co/{MODEL_ID}' target='_blank'>{MODEL_ID}</a> by HCompany | | |
Paper: <a href='https://cdn.prod.website-files.com/67e2dbd9acff0c50d4c8a80c/683ec8095b353e8b38317f80_h_tech_report_v1.pdf' target='_blank'>HCompany Tech Report</a> | | |
Blog: <a href='https://www.hcompany.ai/surfer-h' target='_blank'>Surfer-H Blog Post</a><br/> | |
<small>GPU (if available) is requested only during inference via @spaces.GPU.</small> | |
</p> | |
""" | |
if not model_loaded: | |
with gr.Blocks() as demo: | |
gr.Markdown(f"# <center>⚠️ Error: Model Failed to Load ⚠️</center>") | |
gr.Markdown(f"<center>{load_error_message}</center>") | |
gr.Markdown("<center>See logs for the full traceback.</center>") | |
else: | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(f"<h1 style='text-align: center;'>{title}</h1>") | |
gr.Markdown(article) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
input_image_component = gr.Image(type="pil", label="Input UI Image", height=400) | |
instruction_component = gr.Textbox( | |
label="Instruction", | |
placeholder="e.g., Click the 'Login' button", | |
info="Type the action you want the model to localize on the image." | |
) | |
submit_button = gr.Button("Localize Click", variant="primary") | |
with gr.Column(scale=1): | |
output_coords_component = gr.Textbox( | |
label="Predicted Coordinates (Format: Click(x, y))", | |
interactive=False | |
) | |
output_image_component = gr.Image( | |
type="pil", | |
label="Image with Predicted Click Point", | |
height=400, | |
interactive=False | |
) | |
runtime_info = gr.Textbox( | |
label="Runtime Info", | |
value="device: n/a | dtype: n/a", | |
interactive=False | |
) | |
if example_image: | |
gr.Examples( | |
examples=[[example_image, example_instruction]], | |
inputs=[input_image_component, instruction_component], | |
outputs=[output_coords_component, output_image_component, runtime_info], | |
fn=predict_click_location, | |
cache_examples="lazy", | |
) | |
submit_button.click( | |
fn=predict_click_location, | |
inputs=[input_image_component, instruction_component], | |
outputs=[output_coords_component, output_image_component, runtime_info] | |
) | |
if __name__ == "__main__": | |
# Do NOT pass 'concurrency_count' or ZeroGPU-specific launch args. | |
demo.launch(debug=True) | |