PyVision / vis_python_exe.py
stzhao's picture
Update vis_python_exe.py
ecd3503 verified
raw
history blame
29.1 kB
import sys
import os
import re
import json
import base64
from io import BytesIO
from PIL import Image
import argparse
from inference_engine.safe_persis_shared_vis_python_exe import PythonExecutor, ImageRuntime
from openai import OpenAI
import anthropic
def encode_image(image):
"""
Convert a PIL.Image object or image file path to base64-encoded string, and get resolution info.
Args:
image: Can be a PIL.Image object or image file path.
Returns:
dict with keys:
- 'base64': base64-encoded string
- 'width': width in pixels
- 'height': height in pixels
- 'resolution': string "widthxheight"
"""
img_obj = None
if isinstance(image, str):
# Handle file path
img_obj = Image.open(image)
with open(image, "rb") as image_file:
base64_str = base64.b64encode(image_file.read()).decode('utf-8')
else:
# Handle PIL.Image object
img_obj = image
buffered = BytesIO()
image.save(buffered, format='PNG')
base64_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
width, height = img_obj.size
return {
'base64': base64_str,
'width': width,
'height': height
}
def encode_image_with_resize(image):
"""
Convert a PIL.Image object or image file path to base64-encoded string, get resolution info.
If resolution > 1024x1024, resize to half.
Args:
image: Can be a PIL.Image object or image file path
Returns:
dict with keys:
- 'base64': base64-encoded string
- 'width': width in pixels
- 'height': height in pixels
- 'resolution': string "widthxheight"
"""
img_obj = None
if isinstance(image, str):
img_obj = Image.open(image)
else:
img_obj = image
# Resize if larger than 1024x1024
width, height = img_obj.size
if width > 1024 or height > 1024:
new_size = (width // 2, height // 2)
img_obj = img_obj.resize(new_size, Image.LANCZOS)
width, height = img_obj.size
buffered = BytesIO()
img_obj.save(buffered, format='PNG')
base64_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return {
'base64': base64_str,
'width': width,
'height': height,
'resolution': f"{width}x{height}"
}
def check(evaluator, pred_ans, real_ans):
if len(pred_ans) == 0:
return []
correctness = evaluator.score(pred_ans, real_ans)
return correctness
def execute_codes(codes, messages, executor: PythonExecutor):
no_code_idx = []
codes_use = []
for i, code in enumerate(codes):
if code == "":
no_code_idx.append(i)
else:
codes_use.append(code)
batch_results = executor.batch_apply(codes_use, messages)
return batch_results, no_code_idx
def process_prompt_init(question, image_path_list, prompt_template, prompt_type, api_name):
with open(prompt_template, "r") as fin:
sys = json.load(fin)
prompt_prefix = sys[prompt_type]
image_path = image_path_list[0]
if "<IMAGE_PLACE_HOLDER_0>" in question:
if "no_tool" in prompt_type:
if "claude" in api_name:
img_result = encode_image_with_resize(image_path)
else:
img_result = encode_image(image_path)
image_base64 = img_result['base64']
question_with_options = question
question = prompt_prefix.format(query=question_with_options)
parts = question.split("<IMAGE_PLACE_HOLDER_0>")
content = []
# Add text before image (if any)
if parts[0].strip():
content.append({"type": "text", "text": parts[0].strip()})
# Add image
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}})
# Add text after image (if any)
if len(parts) > 1 and parts[1].strip():
content.append({"type": "text", "text": parts[1].strip()})
messages = [
{
"role": "user",
"content": content
}
]
return messages
else:
if "claude" in api_name:
img_result = encode_image_with_resize(image_path)
else:
img_result = encode_image(image_path)
image_base64 = img_result['base64']
width = img_result['width']
height = img_result['height']
question_with_options = question
question = prompt_prefix.format(query=question_with_options, width=str(width), height=str(height))
# Split question into parts
parts = question.split("<IMAGE_PLACE_HOLDER_0>")
# Build message with image_clue tags
content = []
# Add text before image (if any)
if parts[0].strip():
content.append({"type": "text", "text": parts[0].strip()})
# Add image with tags
content.append({"type": "text", "text": "<image_clue_0>"})
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}})
content.append({"type": "text", "text": "</image_clue_0>\n\n"})
# Add text after image (if any)
if len(parts) > 1 and parts[1].strip():
content.append({"type": "text", "text": parts[1].strip()})
messages = [
{
"role": "user",
"content": content
}
]
return messages
else:
if "no_tool" in prompt_type:
if "claude" in api_name:
img_result = encode_image_with_resize(image_path)
else:
img_result = encode_image(image_path)
image_base64 = img_result['base64']
question_with_options = question
messages = [
{
"role": "user",
"content": [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options)}]
}
]
return messages
else:
if "claude" in api_name:
img_result = encode_image_with_resize(image_path)
else:
img_result = encode_image(image_path)
image_base64 = img_result['base64']
width = img_result['width']
height = img_result['height']
question_with_options = question
messages = [
{
"role": "user",
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options, width=str(width), height=str(height))}]
}
]
return messages
def process_prompt_init_multi_images(question, image_path_list, prompt_template, prompt_type, api_name):
with open(prompt_template, "r") as fin:
sys = json.load(fin)
prompt_prefix = sys[prompt_type]
# Prepare image data
image_data = []
image_information = ""
for i, image_path in enumerate(image_path_list):
if "claude" in api_name:
img_result = encode_image_with_resize(image_path)
else:
img_result = encode_image(image_path)
image_base64 = img_result['base64']
width = img_result['width']
height = img_result['height']
image_data.append({
"index": i,
"base64": image_base64,
"width": width,
"height": height,
"placeholder": f"<IMAGE_PLACE_HOLDER_{i}>"
})
image_information += f"width of image_clue_{i}: {width}, height of image_clue_{i}: {height}\n"
# Format question
formatted_question = prompt_prefix.format(query=question, image_information=image_information)
# Check if placeholder exists
has_placeholders = any(f"<IMAGE_PLACE_HOLDER_{i}>" in formatted_question for i in range(len(image_path_list)))
if has_placeholders:
# Insert images at placeholder positions
if "no_tool" in prompt_type:
content = []
remaining_text = formatted_question
for img_data in image_data:
placeholder = img_data["placeholder"]
if placeholder in remaining_text:
parts = remaining_text.split(placeholder, 1)
if parts[0]:
content.append({"type": "text", "text": parts[0]})
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_data['base64']}"}})
remaining_text = parts[1]
if remaining_text:
content.append({"type": "text", "text": remaining_text})
messages = [{"role": "user", "content": content}]
return messages
else:
content = []
remaining_text = formatted_question
for img_data in image_data:
placeholder = img_data["placeholder"]
if placeholder in remaining_text:
parts = remaining_text.split(placeholder, 1)
if parts[0]:
content.append({"type": "text", "text": parts[0]})
i = img_data["index"]
content.append({"type": "text", "text": f"<image_clue_{i}>"})
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_data['base64']}"}})
content.append({"type": "text", "text": f"</image_clue_{i}>\n\n"})
remaining_text = parts[1]
if remaining_text:
content.append({"type": "text", "text": remaining_text})
messages = [{"role": "user", "content": content}]
return messages
else:
# Handle as usual if no placeholder
if "no_tool" in prompt_type:
content = []
for i, img_data in enumerate(image_data):
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_data['base64']}"}})
content.append({"type": "text", "text": formatted_question})
messages = [{"role": "user", "content": content}]
return messages
else:
content = []
for i, img_data in enumerate(image_data):
content.append({"type": "text", "text": f"<image_clue_{i}>"})
content.append({"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_data['base64']}"}})
content.append({"type": "text", "text": f"</image_clue_{i}>\n\n"})
content.append({"type": "text", "text": formatted_question})
messages = [{"role": "user", "content": content}]
return messages
def update_messages_with_execute_content(image_nums_in_input, messages, images_result, text_result, error_result, image_clue_idx):
if error_result is None:
new_messages = []
image_content = []
for message_item in messages[:-1]:
new_messages.append(message_item)
assistant_message_item = messages[-1]['content']
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}]
if images_result is not None:
print(f"#### image_clue_index: {image_clue_idx},Image_nums_in_input: {image_nums_in_input}, len of images_result: {len(images_result)}")
# for image_base64_item in images_result[image_clue_idx-image_nums_in_input:]:
for image_base64_item in images_result:
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}]
image_content += interpreter_message_images
image_clue_idx += 1
else:
image_content = [{"type": "text", "text": "None"}]
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}]
interpreter_message_item = interpreter_message_text_prefix + image_content + interpreter_message_text_profill
new_messages.append({"role": "assistant", "content": assistant_message_item})
new_messages.append({"role": "user", "content": interpreter_message_item})
else:
new_messages = []
for message_item in messages[:-1]:
new_messages.append(message_item)
assistant_message_item = messages[-1]['content']
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>{error_result}"}]
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}]
interpreter_message_item = interpreter_message_text_prefix + interpreter_message_text_profill
new_messages.append({"role": "assistant", "content": assistant_message_item})
new_messages.append({"role": "user", "content": interpreter_message_item})
return new_messages, image_clue_idx
def update_messages_with_code(messages, generated_content):
message_item = {
"role": "assistant",
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}]
}
messages.append(message_item)
return messages
def update_messages_with_text(messages, generated_content):
message_item = {
"role": "assistant",
"content": [{"type": "text", "text": f"{generated_content}"}]
}
messages.append(message_item)
return messages
def call_chatgpt_api(args, messages, client, max_tokens=10000, stop=None, temperature=0.6):
"""Call ChatGPT API with the given messages"""
try:
client_type = args.client_type
api_name = args.api_name
except:
client_type = args['client_type']
api_name = args['api_name']
if client_type == "openai" or client_type == "azure":
response = client.chat.completions.create(
model=api_name,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=1.0,
stop=stop,
timeout=300
)
response_text = response.choices[0].message.content
elif client_type == "anthropic":
message = client.messages.create(
model=api_name,
max_tokens=max_tokens,
messages=messages,
temperature=temperature,
top_p=1.0,
stop_sequences=stop
)
response_text = message.content[0].text if isinstance(message.content, list) else message.content
elif client_type == "vllm":
response = client.chat.completions.create(
model=api_name,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=1.0,
stop=stop
)
response_text = response.choices[0].message.content
else:
print("Your args.client_type must be one of openai, azure, anthropic and vllm.")
return None, None
# Check if stop sequence is encountered
stop_reason = None
if stop and any(s in response_text for s in stop):
for s in stop:
if s in response_text:
stop_reason = s
break
else:
if client_type in ["openai", "azure", "vllm"]:
stop_reason = response.choices[0].finish_reason
else:
stop_reason = "stop"
if "<code>" in response_text:
stop_reason = "</code>"
return response_text, stop_reason
def evaluate_single_data(args, data, client, executor):
try:
prompt_template = args.prompt_template
prompt = args.prompt
exe_code = args.exe_code
max_tokens = args.max_tokens
temperature = args.temperature
api_name = args.api_name
except:
prompt_template = args['prompt_template']
prompt = args['prompt']
exe_code = args['exe_code']
max_tokens = args['max_tokens']
temperature = args['temperature']
api_name = args['api_name']
image_path_list = data['image_path_list']
if "no_tool" in prompt:
if len(image_path_list) == 1:
messages = process_prompt_init(data["question"], image_path_list, prompt_template, prompt, api_name)
elif len(image_path_list) >= 2:
messages = process_prompt_init_multi_images(data["question"], image_path_list, prompt_template, prompt, api_name)
else:
if len(image_path_list) == 1:
prompt = "vistool_with_img_info_v2"
messages = process_prompt_init(data["question"], image_path_list, prompt_template, prompt, api_name)
elif len(image_path_list) >= 2:
prompt = "vistool_with_img_info_multi_image"
messages = process_prompt_init_multi_images(data["question"], image_path_list, prompt_template, prompt, api_name)
# Generate initial response
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None,
temperature=temperature
)
# Handle response
final_response = response_text
code_execution_count = 0
image_clue_idx = len(image_path_list)
while True:
# Check if code execution is needed
if exe_code and pred_stop_reason == "</code>":
# Extract code to execute
messages = update_messages_with_code(messages, response_text)
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip()
# Execute code
exe_result = execute_codes([code_to_execute], messages, executor)[0][0]
if exe_result is None:
text_result = "None"
images_result = None
else:
output, report = exe_result
if report == "Done":
error_result = None
try:
text_result = exe_result[0]['text']
except:
text_result = None
print("text result is none.")
try:
images_result = exe_result[0]['images']
except:
images_result = None
print("image result is none.")
else:
error_result = report
text_result = None
images_result = None
messages, new_image_clue_idx = update_messages_with_execute_content(len(image_path_list), messages, images_result, text_result, error_result, image_clue_idx)
image_clue_idx = new_image_clue_idx
code_execution_count += 1
# Generate next response part
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None,
temperature=temperature
)
else:
final_response = response_text
messages = update_messages_with_text(messages, response_text)
break
return messages, final_response
def evaluate_single_data_multi_images(args, data, client, executor):
try:
prompt_template = args.prompt_template
prompt = args.prompt
exe_code = args.exe_code
max_tokens = args.max_tokens
except:
prompt_template = args['prompt_template']
prompt = args['prompt']
exe_code = args['exe_code']
max_tokens = args['max_tokens']
messages = process_prompt_init_multi_images(data["question"], data['image_path_list'], prompt_template, prompt)
# Generate initial response
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None
)
# Handle response
final_response = response_text
code_execution_count = 0
image_clue_idx = data['image_nums_in_input']
while True:
# Check if code execution is needed
if exe_code and pred_stop_reason == "</code>":
# Extract code to execute
messages = update_messages_with_code(messages, response_text)
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip()
# Execute code
exe_result = execute_codes([code_to_execute], messages, executor)[0][0]
if exe_result is None:
text_result = "None"
images_result = None
else:
output, report = exe_result
if report == "Done":
error_result = None
try:
text_result = exe_result[0]['text']
except:
text_result = None
print("text result is none.")
try:
images_result = exe_result[0]['images']
except:
images_result = None
print("image result is none.")
else:
error_result = report
text_result = None
images_result = None
messages, new_image_clue_idx = update_messages_with_execute_content(data['image_nums_in_input'], messages, images_result, text_result, error_result, image_clue_idx)
image_clue_idx = new_image_clue_idx
code_execution_count += 1
# Generate next response part
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None
)
else:
final_response = response_text
messages = update_messages_with_text(messages, response_text)
break
return messages, final_response
def evaluate_single_data_video(args, data, client, executor):
try:
prompt_template = args.prompt_template
prompt = args.prompt
exe_code = args.exe_code
max_tokens = args.max_tokens
except:
prompt_template = args['prompt_template']
prompt = args['prompt']
exe_code = args['exe_code']
max_tokens = args['max_tokens']
messages = process_prompt_init_multi_images(data["question"], data['image_path_list'], prompt_template, prompt)
# Generate initial response
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None
)
# Handle response
final_response = response_text
code_execution_count = 0
image_clue_idx = data['image_nums_in_input']
while True:
# Check if code execution is needed
if exe_code and pred_stop_reason == "</code>":
# Extract code to execute
messages = update_messages_with_code(messages, response_text)
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip()
# Execute code
exe_result = execute_codes([code_to_execute], messages, executor)[0][0]
if exe_result is None:
text_result = "None"
images_result = None
else:
output, report = exe_result
if report == "Done":
error_result = None
try:
text_result = exe_result[0]['text']
except:
text_result = None
print("text result is none.")
try:
images_result = exe_result[0]['images']
except:
images_result = None
print("image result is none.")
else:
error_result = report
text_result = None
images_result = None
messages, new_image_clue_idx = update_messages_with_execute_content(data['image_nums_in_input'], messages, images_result, text_result, error_result, image_clue_idx)
image_clue_idx = new_image_clue_idx
code_execution_count += 1
# Generate next response part
response_text, pred_stop_reason = call_chatgpt_api(
args,
messages,
client,
max_tokens=max_tokens,
stop=["</code>"] if exe_code else None
)
else:
final_response = response_text
messages = update_messages_with_text(messages, response_text)
break
return messages, final_response
# New wrapper functions for safe execution with cleanup
def evaluate_batch_with_cleanup(args, data_list, client):
"""Wrapper function to ensure proper cleanup of resources when processing multiple items"""
# Initialize executor with process isolation
executor = PythonExecutor(use_process_isolation=True)
try:
results = []
for data in data_list:
try:
result = evaluate_single_data(args, data, client, executor)
results.append(result)
except Exception as e:
print(f"Error processing data item: {str(e)}")
results.append((None, f"Error: {str(e)}"))
# Reset the executor for the next item
executor.reset()
return results
finally:
# Ensure cleanup of persistent worker
del executor
def evaluate_single_with_cleanup(args, data, client):
"""Wrapper function for evaluating a single item with proper cleanup"""
# Initialize executor with process isolation
executor = PythonExecutor(use_process_isolation=True)
try:
result = evaluate_single_data(args, data, client, executor)
return result
finally:
# Ensure cleanup of persistent worker
del executor
def evaluate_multi_images_with_cleanup(args, data_list, client):
"""Wrapper function for multi-image evaluation with proper cleanup"""
# Initialize executor with process isolation
executor = PythonExecutor(use_process_isolation=True)
try:
results = []
for data in data_list:
try:
result = evaluate_single_data_multi_images(args, data, client, executor)
results.append(result)
except Exception as e:
print(f"Error processing multi-image data: {str(e)}")
results.append((None, f"Error: {str(e)}"))
# Reset the executor for the next item
executor.reset()
return results
finally:
# Ensure cleanup of persistent worker
del executor
def evaluate_video_with_cleanup(args, data_list, client):
"""Wrapper function for video evaluation with proper cleanup"""
# Initialize executor with process isolation
executor = PythonExecutor(use_process_isolation=True)
try:
results = []
for data in data_list:
try:
result = evaluate_single_data_video(args, data, client, executor)
results.append(result)
except Exception as e:
print(f"Error processing video data: {str(e)}")
results.append((None, f"Error: {str(e)}"))
# Reset the executor for the next item
executor.reset()
return results
finally:
# Ensure cleanup of persistent worker
del executor