|
import sys |
|
import os |
|
import re |
|
import json |
|
import base64 |
|
from io import BytesIO |
|
from PIL import Image |
|
import argparse |
|
|
|
from shared_vis_python_exe import PythonExecutor |
|
from openai import OpenAI |
|
from typing import Optional, Union |
|
import gradio as gr |
|
import markdown |
|
|
|
def encode_image(image): |
|
""" |
|
将PIL.Image对象或图像文件路径转换为base64编码字符串,并获取分辨率信息 |
|
|
|
参数: |
|
image: 可以是PIL.Image对象或图像文件路径 |
|
|
|
返回: |
|
包含以下键的字典: |
|
- 'base64': base64编码的字符串 |
|
- 'width': 图片宽度(像素) |
|
- 'height': 图片高度(像素) |
|
- 'resolution': 字符串形式的"宽度x高度" |
|
""" |
|
img_obj = None |
|
|
|
if isinstance(image, str): |
|
|
|
img_obj = Image.open(image) |
|
with open(image, "rb") as image_file: |
|
base64_str = base64.b64encode(image_file.read()).decode('utf-8') |
|
else: |
|
|
|
img_obj = image |
|
buffered = BytesIO() |
|
image.save(buffered, format='PNG') |
|
base64_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
|
|
width, height = img_obj.size |
|
|
|
return { |
|
'base64': base64_str, |
|
'width': width, |
|
'height': height |
|
} |
|
|
|
def excute_codes(codes, messages, executor: PythonExecutor): |
|
no_code_idx = [] |
|
codes_use = [] |
|
for i, code in enumerate(codes): |
|
if code == "": |
|
no_code_idx.append(i) |
|
else: |
|
codes_use.append(code) |
|
batch_results = executor.batch_apply(codes_use, messages) |
|
return batch_results, no_code_idx |
|
|
|
def process_prompt_init(question, image, prompt_template, prompt_type): |
|
prompt_prefix = prompt_template[prompt_type] |
|
|
|
img_result = encode_image(image) |
|
image_base64 = img_result['base64'] |
|
width = img_result['width'] |
|
height = img_result['height'] |
|
question_with_options = question |
|
|
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options, width=str(width), height=str(height))}] |
|
} |
|
] |
|
|
|
return messages |
|
|
|
def update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx): |
|
if error_result is None: |
|
new_messages = [] |
|
image_content = [] |
|
for message_item in messages[:-1]: |
|
new_messages.append(message_item) |
|
|
|
assistant_message_item = messages[-1]['content'] |
|
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}] |
|
if images_result is not None: |
|
for image_base64_item in images_result[image_clue_idx-1:]: |
|
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}] |
|
image_content += interpreter_message_images |
|
image_clue_idx += 1 |
|
else: |
|
image_content = [{"type": "text", "text": "None"}] |
|
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] |
|
|
|
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + image_content + interpreter_message_text_profill |
|
new_messages.append({"role": "assistant", "content": assistant_message_item}) |
|
else: |
|
new_messages = [] |
|
for message_item in messages[:-1]: |
|
new_messages.append(message_item) |
|
|
|
assistant_message_item = messages[-1]['content'] |
|
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>{error_result}"}] |
|
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] |
|
|
|
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + interpreter_message_text_profill |
|
new_messages.append({"role": "assistant", "content": assistant_message_item}) |
|
|
|
return new_messages, image_clue_idx |
|
|
|
|
|
|
|
def update_messages_with_code(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def update_messages_with_text(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def call_chatgpt_api(model_name, messages, client, max_tokens=10000, stop=None, temperature=0.6): |
|
"""Call ChatGPT API with the given messages""" |
|
try: |
|
response = client.chat.completions.create( |
|
model=model_name, |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=1.0, |
|
stop=stop |
|
) |
|
|
|
response_text = response.choices[0].message.content |
|
|
|
|
|
stop_reason = None |
|
if stop and any(s in response_text for s in stop): |
|
for s in stop: |
|
if s in response_text: |
|
stop_reason = s |
|
break |
|
else: |
|
stop_reason = response.choices[0].finish_reason |
|
|
|
if "<code>" in response_text: |
|
stop_reason = "</code>" |
|
|
|
return response_text, stop_reason |
|
|
|
except Exception as e: |
|
print(f"API Error: {str(e)}") |
|
return None, None |
|
|
|
def evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type): |
|
|
|
messages = process_prompt_init(data["question"], data['image'], prompt_template, prompt_type) |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
model_name, |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
final_response = response_text |
|
code_execution_count = 0 |
|
image_clue_idx = 1 |
|
|
|
while True: |
|
|
|
if pred_stop_reason == "</code>": |
|
|
|
messages = update_messages_with_code(messages, response_text) |
|
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip() |
|
|
|
|
|
exe_result = excute_codes([code_to_execute], messages, executor)[0][0] |
|
if exe_result is None: |
|
text_result = "None" |
|
images_result = None |
|
else: |
|
output, report = exe_result |
|
if report == "Done": |
|
error_result = None |
|
try: |
|
text_result = exe_result[0]['text'] |
|
except: |
|
text_result = None |
|
print("text result is none.") |
|
try: |
|
images_result = exe_result[0]['images'] |
|
except: |
|
images_result = None |
|
print("image result is none.") |
|
else: |
|
error_result = report |
|
text_result = None |
|
images_result = None |
|
|
|
messages, new_image_clue_idx = update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx) |
|
image_clue_idx = new_image_clue_idx |
|
|
|
code_execution_count += 1 |
|
print(f"Code Execution Count: {code_execution_count}") |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
model_name, |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
|
|
else: |
|
final_response = response_text |
|
messages = update_messages_with_text(messages, response_text) |
|
print("GPT-4.1 finish.") |
|
break |
|
|
|
return messages |
|
|
|
def process_message(messages): |
|
|
|
html_output = '<div style="color: black;">' |
|
|
|
for message_item in messages: |
|
role = message_item['role'] |
|
content = message_item['content'] |
|
|
|
|
|
if role == "user" or role == "human": |
|
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>' |
|
elif role == "assistant": |
|
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>' |
|
else: |
|
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>' |
|
|
|
|
|
for content_item in content: |
|
content_type = content_item['type'] |
|
|
|
if content_type == "text": |
|
|
|
md_text = content_item['text'] |
|
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite']) |
|
|
|
|
|
html_output += f'<div style="color: black;">{html_text}</div>' |
|
|
|
elif content_type == "image_url": |
|
content_value = content_item['image_url']['url'] |
|
|
|
if content_value.startswith("data:"): |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
else: |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
|
|
html_output += '</div>' |
|
|
|
html_output += '</div>' |
|
return html_output |
|
|
|
def o3_chat(model_name, api_key, base_url, question, image): |
|
print("done!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
|
|
|
client = OpenAI(api_key=api_key, base_url=base_url) |
|
executor = PythonExecutor() |
|
|
|
|
|
prompt_template = json.load(open("./prompt_template_vis.json", "r", encoding="utf-8")) |
|
prompt_type = 'vistool_with_img_info' |
|
|
|
data = { |
|
"question": question, |
|
"image": image, |
|
} |
|
|
|
|
|
messages = evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type) |
|
html_output = process_message(messages) |
|
|
|
|
|
json_str = json.dumps(messages, ensure_ascii=False, indent=4) |
|
|
|
return html_output |
|
|
|
|
|
def create_demo(): |
|
with gr.Blocks(title="GPT-4.1 with Python Interpreter", css="div.prose * {color: black !important;}") as demo: |
|
gr.Markdown("# GPT-4.1 with Python Interpreter") |
|
gr.Markdown("please do not share to others") |
|
gr.Markdown("Upload an image and ask a question to get a response with code execution capabilities.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
model_name = gr.Dropdown( |
|
label="Model Selection", |
|
choices=["gpt-4.1", "gpt-4o", "o4-mini", "gemini-2.5-pro-preview-05-06", "claude-3-7-sonnet-latest", "claude-3-7-sonnet-thinking"], |
|
value="gpt-4.1" |
|
) |
|
api_key = gr.Textbox(label="OpenAI API Key", type="password", value="sk-kBQuM0gvNBhOHmKz43b3iQut01bsOgg8Pv76eMKguu6jvncm") |
|
base_url = gr.Textbox(label="Base URL (optional)", value="https://api.claudeshop.top/v1") |
|
image_input = gr.Image(label="Upload Image", type="pil") |
|
question = gr.Textbox(label="Question", placeholder="Ask a question about the image...") |
|
submit_btn = gr.Button("Submit") |
|
|
|
with gr.Row(): |
|
output = gr.HTML(label="Response") |
|
|
|
|
|
submit_btn.click( |
|
fn=o3_chat, |
|
inputs=[model_name, api_key, base_url, question, image_input], |
|
outputs=[output] |
|
) |
|
|
|
|
|
examples = [ |
|
[ |
|
"./examples/1.png", |
|
"From the information on that advertising board, what is the type of this shop?\nA. The shop is a yoga studio.\nB. The shop is a cafe.\nC. The shop is a seven-eleven.\nD. The shop is a milk tea shop.", |
|
], |
|
[ |
|
"./examples/2.png", |
|
"What is the diagnosis for the abnormality seen in this image?\nA. Pulmonary embolism.\nB. Tuberculosis.\nC. COVID-19 infection.\nD. Influenza.", |
|
], |
|
[ |
|
"./examples/3.png", |
|
"What is the color of the liquid contained in the glass on the table?\nA. The color of the liquid contained in the glass on the table is green.\nB. The color of the liquid contained in the glass on the table is transparent.\nC. The color of the liquid contained in the glass on the table is white.\nD. The color of the liquid contained in the glass on the table is orange.", |
|
], |
|
[ |
|
"./examples/4.png", |
|
"Is the dog on the left or right side of the bicycle?\nA. The dog is on the right side of the bicycle.\nB. The dog is on the left side of the bicycle.", |
|
], |
|
[ |
|
"./examples/5.png", |
|
"Is the kid with black shirt on the left or right side of the kid with blue shirt?\nA. The kid with black shirt is on the left side of the kid with blue shirt.\nB. The kid with black shirt is on the right side of the kid with blue shirt.", |
|
], |
|
[ |
|
"./examples/6.png", |
|
"What can be observed in this image?\nA. Nerve entrapment.\nB. Musculoskeletal abnormality.\nC. Arteriovenous anomaly.\nD. Renal cyst.", |
|
], |
|
[ |
|
"./examples/7.png", |
|
"What is the specific stage of cancer depicted in the image? A)Stage Ib, B)Stage IIIb, C)Stage IIc, D)Stage IIIa", |
|
], |
|
[ |
|
"./examples/8.png", |
|
"A gymnast jotted down the number of cartwheels she did each day. What is the mode of the numbers?", |
|
], |
|
[ |
|
"./examples/9.png", |
|
"Does Virginia have the highest value in the USA ?", |
|
], |
|
[ |
|
"./examples/10.png", |
|
"AB is the diameter of ⊙O, PA is tangent to ⊙O at point A, and PO intersects ⊙O at point C; connect BC, if ∠P = 40.0, then ∠B is equal to ()", |
|
], |
|
[ |
|
"./examples/11.png", |
|
"How many single-color paths go from C to A?", |
|
], |
|
[ |
|
"./examples/12.png", |
|
"There is a numerical converter, the principle of which is shown in the following diagram: When the input x=16, the output y equals.", |
|
], |
|
[ |
|
"./examples/13.png", |
|
"As shown in Figure 1, it is a right-angled triangular paper piece, $$ \angle A=30^{ \circ }$$, $$BC=\quantity{4}{cm}$$, it is folded so that point $$C$$ lands on point $$C'$$ on the hypotenuse, with the fold line being $$BD$$, as shown in Figure 2. Then, Figure 2 is folded along $$DE$$, so that point $$A$$ lands on point $$A'$$ on the extension of $$DC'$$, as shown in Figure 3. The length of the fold line $$DE$$ is ___.", |
|
], |
|
] |
|
|
|
gr.Examples( |
|
examples, |
|
[image_input, question], |
|
label="Click any example to try it out!" |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips |
|
1. Click the 'log' botton top left to check the output log. |
|
2. It may take 2~5 min. |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.launch() |
|
|
|
|
|
|