|
import sys |
|
import os |
|
import re |
|
import json |
|
import base64 |
|
from io import BytesIO |
|
from PIL import Image |
|
import argparse |
|
from vis_python_exe import PythonExecutor |
|
from openai import OpenAI |
|
from typing import Optional, Union |
|
import gradio as gr |
|
import markdown |
|
|
|
def encode_image(image): |
|
""" |
|
将PIL.Image对象或图像文件路径转换为base64编码字符串 |
|
|
|
参数: |
|
image: 可以是PIL.Image对象或图像文件路径 |
|
|
|
返回: |
|
base64编码的字符串 |
|
""" |
|
if isinstance(image, str): |
|
|
|
with open(image, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
else: |
|
|
|
buffered = BytesIO() |
|
image.save(buffered, format='PNG') |
|
return base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
def excute_codes(codes, messages, executor: PythonExecutor): |
|
no_code_idx = [] |
|
codes_use = [] |
|
for i, code in enumerate(codes): |
|
if code == "": |
|
no_code_idx.append(i) |
|
else: |
|
codes_use.append(code) |
|
batch_results = executor.batch_apply(codes_use, messages) |
|
return batch_results, no_code_idx |
|
|
|
def process_prompt_init(question, image, prompt_template, prompt_type): |
|
prompt_prefix = prompt_template[prompt_type] |
|
|
|
image_base64 = encode_image(image) |
|
question_with_options = question |
|
|
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options)}] |
|
} |
|
] |
|
|
|
return messages |
|
|
|
def update_messages_with_excu_content(messages, images_result, text_result, image_clue_idx): |
|
new_messages = [] |
|
image_content = [] |
|
for message_item in messages[:-1]: |
|
new_messages.append(message_item) |
|
|
|
assistant_message_item = messages[-1]['content'] |
|
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}] |
|
if images_result is not None: |
|
for image_base64_item in images_result: |
|
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}] |
|
image_content += interpreter_message_images |
|
image_clue_idx += 1 |
|
else: |
|
image_content = [{"type": "text", "text": "None"}] |
|
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] |
|
|
|
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + image_content + interpreter_message_text_profill |
|
new_messages.append({"role": "assistant", "content": assistant_message_item}) |
|
return new_messages, image_clue_idx |
|
|
|
|
|
|
|
def update_messages_with_code(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def update_messages_with_text(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def call_chatgpt_api(messages, client, max_tokens=10000, stop=None, temperature=1.0): |
|
"""Call ChatGPT API with the given messages""" |
|
try: |
|
response = client.chat.completions.create( |
|
model="gpt-4.1", |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=1.0, |
|
stop=stop |
|
) |
|
|
|
response_text = response.choices[0].message.content |
|
|
|
|
|
stop_reason = None |
|
if stop and any(s in response_text for s in stop): |
|
for s in stop: |
|
if s in response_text: |
|
stop_reason = s |
|
break |
|
else: |
|
stop_reason = response.choices[0].finish_reason |
|
|
|
if "<code>" in response_text: |
|
stop_reason = "</code>" |
|
|
|
return response_text, stop_reason |
|
|
|
except Exception as e: |
|
print(f"API Error: {str(e)}") |
|
return None, None |
|
|
|
def evaluate_single_data(data, client, executor, prompt_template, prompt_type): |
|
|
|
messages = process_prompt_init(data["question"], data['image'], prompt_template, prompt_type) |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
final_response = response_text |
|
code_execution_count = 0 |
|
image_clue_idx = 1 |
|
|
|
while True: |
|
|
|
if pred_stop_reason == "</code>": |
|
|
|
messages = update_messages_with_code(messages, response_text) |
|
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip() |
|
|
|
|
|
exe_result = excute_codes([code_to_execute], messages, executor)[0][0] |
|
if exe_result is None: |
|
text_result = "None" |
|
images_result = None |
|
else: |
|
output, report = exe_result |
|
try: |
|
text_result = exe_result[0]['text'] |
|
except: |
|
text_result = None |
|
print("text result is none.") |
|
try: |
|
images_result = exe_result[0]['images'] |
|
except: |
|
images_result = None |
|
print("image result is none.") |
|
|
|
messages, new_image_clue_idx = update_messages_with_excu_content(messages, images_result, text_result, image_clue_idx) |
|
image_clue_idx = new_image_clue_idx |
|
|
|
code_execution_count += 1 |
|
print(f"Code Execution Count: {code_execution_count}") |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
|
|
else: |
|
final_response = response_text |
|
messages = update_messages_with_text(messages, response_text) |
|
print("GPT-4.1 finish.") |
|
break |
|
|
|
return messages |
|
|
|
def process_message(messages): |
|
|
|
html_output = '<div style="color: black;">' |
|
|
|
for message_item in messages: |
|
role = message_item['role'] |
|
content = message_item['content'] |
|
|
|
|
|
if role == "user" or role == "human": |
|
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>' |
|
elif role == "assistant": |
|
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>' |
|
else: |
|
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>' |
|
|
|
|
|
for content_item in content: |
|
content_type = content_item['type'] |
|
|
|
if content_type == "text": |
|
|
|
md_text = content_item['text'] |
|
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite']) |
|
|
|
|
|
html_output += f'<div style="color: black;">{html_text}</div>' |
|
|
|
elif content_type == "image_url": |
|
content_value = content_item['image_url']['url'] |
|
|
|
if content_value.startswith("data:"): |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
else: |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
|
|
html_output += '</div>' |
|
|
|
html_output += '</div>' |
|
return html_output |
|
|
|
def o3_chat(api_key, base_url, question, image): |
|
print("done!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
|
|
|
client = OpenAI(api_key=api_key, base_url=base_url) |
|
executor = PythonExecutor() |
|
|
|
prompt_template = json.load(open("./prompt_template_vis.json", "r", encoding="utf-8")) |
|
prompt_type = 'vistool' |
|
|
|
data = { |
|
"question": question, |
|
"image": image, |
|
} |
|
|
|
|
|
messages = evaluate_single_data(data, client, executor, prompt_template, prompt_type) |
|
html_output = process_message(messages) |
|
|
|
|
|
json_str = json.dumps(messages, ensure_ascii=False, indent=4) |
|
|
|
return html_output |
|
|
|
|
|
def create_demo(): |
|
with gr.Blocks(title="GPT-4.1 with Python Interpreter", css="div.prose * {color: black !important;}") as demo: |
|
gr.Markdown("# GPT-4.1 with Python Interpreter") |
|
gr.Markdown("Upload an image and ask a question to get a response with code execution capabilities.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
api_key = gr.Textbox(label="OpenAI API Key", type="password", value="sk-kBQuM0gvNBhOHmKz43b3iQut01bsOgg8Pv76eMKguu6jvncm") |
|
base_url = gr.Textbox(label="Base URL (optional)", value="https://api.claudeshop.top/v1") |
|
image_input = gr.Image(label="Upload Image", type="pil") |
|
question = gr.Textbox(label="Question", placeholder="Ask a question about the image...") |
|
submit_btn = gr.Button("Submit") |
|
|
|
with gr.Row(): |
|
output = gr.HTML(label="Response") |
|
|
|
|
|
submit_btn.click( |
|
fn=o3_chat, |
|
inputs=[api_key, base_url, question, image_input], |
|
outputs=[output] |
|
) |
|
|
|
|
|
examples = [ |
|
[ |
|
"./examples/1.png", |
|
"From the information on that advertising board, what is the type of this shop?" |
|
], |
|
[ |
|
"./examples/2.png", |
|
"What is the diagnosis for the abnormality seen in this image?" |
|
] |
|
] |
|
|
|
gr.Examples( |
|
examples=examples, |
|
inputs=[image_input, question], |
|
outputs=[output], |
|
fn=lambda img, q: o3_chat(api_key.value, base_url.value, q, img), |
|
cache_examples=True, |
|
label="Click any example to try it out!" |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips |
|
1. For best results, ask specific questions |
|
2. The system can execute Python code - ask for code implementations |
|
3. Try uploading different types of images (photos, charts, diagrams) |
|
4. You can ask follow-up questions about previous responses |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.launch() |
|
|
|
|
|
|