image / app.py
mgbam's picture
Add application file
2f9ea03
raw
history blame
12.6 kB
import sys
sys.path.append('./LLAUS')
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
import torch
from llava import LlavaLlamaForCausalLM
from llava.conversation import conv_templates
from llava.utils import disable_torch_init
from transformers import CLIPVisionModel, CLIPImageProcessor, StoppingCriteria
from PIL import Image
from torch.cuda.amp import autocast
import gradio as gr
import spaces
from peft import prepare_model_for_int8_training, LoraConfig, get_peft_model
import os
from transformers import AutoProcessor, AutoModel
import torch.nn.functional as F
#---------------------------------
#++++++++ Model ++++++++++
#---------------------------------
DEFAULT_IMAGE_TOKEN = "<image>"
DEFAULT_IMAGE_PATCH_TOKEN = "<im_patch>"
DEFAULT_IM_START_TOKEN = "<im_start>"
DEFAULT_IM_END_TOKEN = "<im_end>"
def patch_config(config_path):
"""Applies necessary patches to the model config."""
patch_dict = {
"use_mm_proj": True,
"mm_vision_tower": "openai/clip-vit-large-patch14",
"mm_hidden_size": 1024
}
cfg = AutoConfig.from_pretrained(config_path)
if not hasattr(cfg, "mm_vision_tower"):
print(f'`mm_vision_tower` not found in `{config_path}`, applying patch and save to disk.')
for k, v in patch_dict.items():
setattr(cfg, k, v)
cfg.save_pretrained(config_path)
def load_llava_model():
"""Loads and initializes the LLaVA model."""
model_name = "Baron-GG/LLaVA-Med" # Change this to your model if you uploaded a new one
disable_torch_init()
tokenizer = AutoTokenizer.from_pretrained(model_name)
patch_config(model_name)
model = LlavaLlamaForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16).cuda()
model.model.requires_grad_(False)
image_processor = CLIPImageProcessor.from_pretrained(model.config.mm_vision_tower, torch_dtype=torch.float16)
model.config.use_cache = False
model.config.tune_mm_mlp_adapter = False
model.config.freeze_mm_mlp_adapter = False
model.config.mm_use_im_start_end = True
mm_use_im_start_end = getattr(model.config, "mm_use_im_start_end", False)
tokenizer.add_tokens([DEFAULT_IMAGE_PATCH_TOKEN], special_tokens=True)
if mm_use_im_start_end:
tokenizer.add_tokens([DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN], special_tokens=True)
vision_tower = model.model.vision_tower[0]
vision_tower.to(device='cuda', dtype=torch.float16)
vision_config = vision_tower.config
vision_config.im_patch_token = tokenizer.convert_tokens_to_ids([DEFAULT_IMAGE_PATCH_TOKEN])[0]
vision_config.use_im_start_end = mm_use_im_start_end
if mm_use_im_start_end:
vision_config.im_start_token, vision_config.im_end_token = tokenizer.convert_tokens_to_ids([DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN])
image_token_len = (vision_config.image_size // vision_config.patch_size) ** 2
model = prepare_model_for_int8_training(model)
lora_config = LoraConfig(
r=64,
lora_alpha=16,
target_modules=["q_proj", "v_proj","k_proj","o_proj"],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config).cuda()
model.eval()
return model, tokenizer, image_processor, image_token_len, mm_use_im_start_end
def load_biomedclip_model():
"""Loads the BiomedCLIP model and tokenizer."""
biomedclip_model_name = 'microsoft/BiomedCLIP-PubMedBERT_256-vit_base_patch16_224'
processor = AutoProcessor.from_pretrained(biomedclip_model_name)
model = AutoModel.from_pretrained(biomedclip_model_name).cuda().eval()
return model, processor
class KeywordsStoppingCriteria(StoppingCriteria):
"""Custom stopping criteria for generation."""
def __init__(self, keywords, tokenizer, input_ids):
self.keywords = keywords
self.tokenizer = tokenizer
self.start_len = None
self.input_ids = input_ids
def __call__(self, output_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
if self.start_len is None:
self.start_len = self.input_ids.shape[1]
else:
outputs = self.tokenizer.batch_decode(output_ids[:, self.start_len:], skip_special_tokens=True)[0]
for keyword in self.keywords:
if keyword in outputs:
return True
return False
def compute_similarity(image, text, biomedclip_model, biomedclip_processor):
"""Computes similarity scores using BiomedCLIP."""
with torch.no_grad():
inputs = biomedclip_processor(text=text, images=image, return_tensors="pt", padding=True).to(biomedclip_model.device)
outputs = biomedclip_model(**inputs)
image_embeds = outputs.image_embeds
text_embeds = outputs.text_embeds
image_embeds = F.normalize(image_embeds, dim=-1)
text_embeds = F.normalize(text_embeds, dim=-1)
similarity = (text_embeds @ image_embeds.transpose(-1, -2)).squeeze()
return similarity
@torch.no_grad()
def eval_llava_model(llava_model, llava_tokenizer, llava_image_processor, image, question, image_token_len, mm_use_im_start_end, max_new_tokens, temperature):
"""Evaluates the LLaVA model for a given image and question."""
image_list = []
image_tensor = llava_image_processor.preprocess(image, return_tensors='pt')['pixel_values'][0] # 3, 224, 224
image_list.append(image_tensor)
image_idx = 1
if mm_use_im_start_end:
qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_PATCH_TOKEN * image_token_len * image_idx + DEFAULT_IM_END_TOKEN + question
else:
qs = DEFAULT_IMAGE_PATCH_TOKEN * image_token_len * image_idx + '\n' + question
conv = conv_templates["simple"].copy()
conv.append_message(conv.roles[0], qs)
prompt = conv.get_prompt()
inputs = llava_tokenizer([prompt])
image_tensor = torch.stack(image_list, dim=0).half().cuda()
input_ids = torch.as_tensor(inputs.input_ids).cuda()
keywords = ['###']
stopping_criteria = KeywordsStoppingCriteria(keywords, llava_tokenizer, input_ids)
with autocast():
output_ids = llava_model.generate(
input_ids=input_ids,
images=image_tensor,
do_sample=True,
temperature=temperature,
max_new_tokens=max_new_tokens,
stopping_criteria=[stopping_criteria]
)
input_token_len = input_ids.shape[1]
n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
if n_diff_input_output > 0:
print(f'[Warning] Sample: {n_diff_input_output} output_ids are not the same as the input_ids')
outputs = llava_tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
while True:
cur_len = len(outputs)
outputs = outputs.strip()
for pattern in ['###', 'Assistant:', 'Response:']:
if outputs.startswith(pattern):
outputs = outputs[len(pattern):].strip()
if len(outputs) == cur_len:
break
try:
index = outputs.index(conv.sep)
except ValueError:
outputs += conv.sep
index = outputs.index(conv.sep)
outputs = outputs[:index].strip()
print(outputs)
return outputs
#---------------------------------
#++++++++ Gradio ++++++++++
#---------------------------------
SHARED_UI_WARNING = f'''### [NOTE] It is possible that you are waiting in a lengthy queue.
You can duplicate and use it with a paid private GPU.
<a class="duplicate-button" style="display:inline-block" target="_blank" href="https://huggingface.co/spaces/Vision-CAIR/minigpt4?duplicate=true"><img style="margin-top:0;margin-bottom:0" src="https://huggingface.co/datasets/huggingface/badges/raw/main/duplicate-this-space-xl-dark.svg" alt="Duplicate Space"></a>
Alternatively, you can also use the demo on our [project page](https://minigpt-4.github.io).
'''
def gradio_reset(chat_state, img_list):
"""Resets the chat state and image list."""
if chat_state is not None:
chat_state.messages = []
if img_list is not None:
img_list = []
return None, gr.update(value=None, interactive=True), gr.update(placeholder='Please upload your medical image first', interactive=False), gr.update(value="Upload & Start Analysis", interactive=True), chat_state, img_list
def upload_img(gr_img, text_input, chat_state):
"""Handles image upload."""
if gr_img is None:
return None, None, gr.update(interactive=True), chat_state, None
img_list = [gr_img]
return gr.update(interactive=False), gr.update(interactive=True, placeholder='Type and press Enter'), gr.update(value="Start Analysis", interactive=False), chat_state, img_list
def gradio_ask(user_message, chatbot, chat_state):
"""Handles user input."""
if not user_message:
return gr.update(interactive=True, placeholder='Input should not be empty!'), chatbot, chat_state
chatbot = chatbot + [[user_message, None]]
return '', chatbot, chat_state
@spaces.GPU
def gradio_answer(chatbot, chat_state, img_list, llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end, max_new_token, temperature, biomedclip_model, biomedclip_processor):
"""Generates and adds the bot's response to the chatbot using LLaVA"""
if not img_list:
return chatbot, chat_state, img_list
# compute similarity using biomedclip
similarity_score = compute_similarity(img_list[0],chatbot[-1][0], biomedclip_model, biomedclip_processor)
print(f'Similarity Score is: {similarity_score}')
# prepare the input for LLAVA
llava_input_text = f"Based on the image and query provided the similarity score is {similarity_score:.3f}. " + chatbot[-1][0]
llm_message = eval_llava_model(llava_model, llava_tokenizer, llava_image_processor, img_list[0], llava_input_text, image_token_len, mm_use_im_start_end, max_new_token, temperature)
chatbot[-1][1] = llm_message
return chatbot, chat_state, img_list
title = """<h1 align="center">Medical Image Analysis Tool</h1>"""
description = """<h3>Upload medical images, ask questions, and receive analysis.</h3>"""
examples_list=[
["./case1.png", "Analyze the X-ray for any abnormalities."],
["./case2.jpg", "What type of disease may be present?"],
["./case1.png","What is the anatomical structure shown here?"]
]
# Load models and related resources outside of the Gradio block for loading on startup
llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end = load_llava_model()
biomedclip_model, biomedclip_processor = load_biomedclip_model()
with gr.Blocks() as demo:
gr.Markdown(title)
# gr.Markdown(SHARED_UI_WARNING)
gr.Markdown(description)
with gr.Row():
with gr.Column(scale=0.5):
image = gr.Image(type="pil", label="Medical Image")
upload_button = gr.Button(value="Upload & Start Analysis", interactive=True, variant="primary")
clear = gr.Button("Restart")
max_new_token = gr.Slider(
minimum=1,
maximum=512,
value=128,
step=1,
interactive=True,
label="Max new tokens"
)
temperature = gr.Slider(
minimum=0.1,
maximum=2.0,
value=0.3,
step=0.1,
interactive=True,
label="Temperature",
)
with gr.Column():
chat_state = gr.State()
img_list = gr.State()
chatbot = gr.Chatbot(label='Medical Analysis')
text_input = gr.Textbox(label='Analysis Query', placeholder='Please upload your medical image first', interactive=False)
gr.Examples(examples=examples_list, inputs=[image, text_input])
upload_button.click(upload_img, [image, text_input, chat_state], [image, text_input, upload_button, chat_state, img_list])
text_input.submit(gradio_ask, [text_input, chatbot, chat_state], [text_input, chatbot, chat_state]).then(
gradio_answer, [chatbot, chat_state, img_list, llava_model, llava_tokenizer, llava_image_processor, image_token_len, mm_use_im_start_end, max_new_token, temperature, biomedclip_model, biomedclip_processor], [chatbot, chat_state, img_list]
)
clear.click(gradio_reset, [chat_state, img_list], [chatbot, image, text_input, upload_button, chat_state, img_list], queue=False)
demo.launch()