GeminiBalance / app /handler /message_converter.py
CatPtain's picture
Upload 77 files
76b9762 verified
import base64
import json
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
import requests
from app.core.constants import (
AUDIO_FORMAT_TO_MIMETYPE,
DATA_URL_PATTERN,
IMAGE_URL_PATTERN,
MAX_AUDIO_SIZE_BYTES,
MAX_VIDEO_SIZE_BYTES,
SUPPORTED_AUDIO_FORMATS,
SUPPORTED_ROLES,
SUPPORTED_VIDEO_FORMATS,
VIDEO_FORMAT_TO_MIMETYPE,
)
from app.log.logger import get_message_converter_logger
logger = get_message_converter_logger()
class MessageConverter(ABC):
"""消息转换器基类"""
@abstractmethod
def convert(
self, messages: List[Dict[str, Any]]
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
pass
def _get_mime_type_and_data(base64_string):
"""
从 base64 字符串中提取 MIME 类型和数据。
参数:
base64_string (str): 可能包含 MIME 类型信息的 base64 字符串
返回:
tuple: (mime_type, encoded_data)
"""
# 检查字符串是否以 "data:" 格式开始
if base64_string.startswith("data:"):
# 提取 MIME 类型和数据
pattern = DATA_URL_PATTERN
match = re.match(pattern, base64_string)
if match:
mime_type = (
"image/jpeg" if match.group(1) == "image/jpg" else match.group(1)
)
encoded_data = match.group(2)
return mime_type, encoded_data
# 如果不是预期格式,假定它只是数据部分
return None, base64_string
def _convert_image(image_url: str) -> Dict[str, Any]:
if image_url.startswith("data:image"):
mime_type, encoded_data = _get_mime_type_and_data(image_url)
return {"inline_data": {"mime_type": mime_type, "data": encoded_data}}
else:
encoded_data = _convert_image_to_base64(image_url)
return {"inline_data": {"mime_type": "image/png", "data": encoded_data}}
def _convert_image_to_base64(url: str) -> str:
"""
将图片URL转换为base64编码
Args:
url: 图片URL
Returns:
str: base64编码的图片数据
"""
response = requests.get(url)
if response.status_code == 200:
# 将图片内容转换为base64
img_data = base64.b64encode(response.content).decode("utf-8")
return img_data
else:
raise Exception(f"Failed to fetch image: {response.status_code}")
def _process_text_with_image(text: str) -> List[Dict[str, Any]]:
"""
处理可能包含图片URL的文本,提取图片并转换为base64
Args:
text: 可能包含图片URL的文本
Returns:
List[Dict[str, Any]]: 包含文本和图片的部分列表
"""
parts = []
img_url_match = re.search(IMAGE_URL_PATTERN, text)
if img_url_match:
# 提取URL
img_url = img_url_match.group(2)
# 将URL对应的图片转换为base64
try:
base64_data = _convert_image_to_base64(img_url)
parts.append(
{"inline_data": {"mimeType": "image/png", "data": base64_data}}
)
except Exception:
# 如果转换失败,回退到文本模式
parts.append({"text": text})
else:
# 没有图片URL,作为纯文本处理
parts.append({"text": text})
return parts
class OpenAIMessageConverter(MessageConverter):
"""OpenAI消息格式转换器"""
def _validate_media_data(
self, format: str, data: str, supported_formats: List[str], max_size: int
) -> tuple[Optional[str], Optional[str]]:
"""Validates format and size of Base64 media data."""
if format.lower() not in supported_formats:
logger.error(
f"Unsupported media format: {format}. Supported: {supported_formats}"
)
raise ValueError(f"Unsupported media format: {format}")
try:
decoded_data = base64.b64decode(data, validate=True)
if len(decoded_data) > max_size:
logger.error(
f"Media data size ({len(decoded_data)} bytes) exceeds limit ({max_size} bytes)."
)
raise ValueError(
f"Media data size exceeds limit of {max_size // 1024 // 1024}MB"
)
return data
except base64.binascii.Error as e:
logger.error(f"Invalid Base64 data provided: {e}")
raise ValueError("Invalid Base64 data")
except Exception as e:
logger.error(f"Error validating media data: {e}")
raise
def convert(
self, messages: List[Dict[str, Any]]
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
converted_messages = []
system_instruction_parts = []
for idx, msg in enumerate(messages):
role = msg.get("role", "")
parts = []
if "content" in msg and isinstance(msg["content"], list):
for content_item in msg["content"]:
if not isinstance(content_item, dict):
logger.warning(
f"Skipping unexpected content item format: {type(content_item)}"
)
continue
content_type = content_item.get("type")
if content_type == "text" and content_item.get("text"):
parts.append({"text": content_item["text"]})
elif content_type == "image_url" and content_item.get(
"image_url", {}
).get("url"):
try:
parts.append(
_convert_image(content_item["image_url"]["url"])
)
except Exception as e:
logger.error(
f"Failed to convert image URL {content_item['image_url']['url']}: {e}"
)
parts.append(
{
"text": f"[Error processing image: {content_item['image_url']['url']}]"
}
)
elif content_type == "input_audio" and content_item.get(
"input_audio"
):
audio_info = content_item["input_audio"]
audio_data = audio_info.get("data")
audio_format = audio_info.get("format", "").lower()
if not audio_data or not audio_format:
logger.warning(
"Skipping audio part due to missing data or format."
)
continue
try:
validated_data = self._validate_media_data(
audio_format,
audio_data,
SUPPORTED_AUDIO_FORMATS,
MAX_AUDIO_SIZE_BYTES,
)
# Get MIME type
mime_type = AUDIO_FORMAT_TO_MIMETYPE.get(audio_format)
if not mime_type:
# Should not happen if format validation passed, but double-check
logger.error(
f"Could not find MIME type for supported format: {audio_format}"
)
raise ValueError(
f"Internal error: MIME type mapping missing for {audio_format}"
)
parts.append(
{
"inline_data": {
"mimeType": mime_type,
"data": validated_data, # Use the validated Base64 data
}
}
)
logger.debug(
f"Successfully added audio part (format: {audio_format})"
)
except ValueError as e:
logger.error(
f"Skipping audio part due to validation error: {e}"
)
parts.append({"text": f"[Error processing audio: {e}]"})
except Exception:
logger.exception("Unexpected error processing audio part.")
parts.append(
{"text": "[Unexpected error processing audio]"}
)
elif content_type == "input_video" and content_item.get(
"input_video"
):
video_info = content_item["input_video"]
video_data = video_info.get("data")
video_format = video_info.get("format", "").lower()
if not video_data or not video_format:
logger.warning(
"Skipping video part due to missing data or format."
)
continue
try:
validated_data = self._validate_media_data(
video_format,
video_data,
SUPPORTED_VIDEO_FORMATS,
MAX_VIDEO_SIZE_BYTES,
)
mime_type = VIDEO_FORMAT_TO_MIMETYPE.get(video_format)
if not mime_type:
raise ValueError(
f"Internal error: MIME type mapping missing for {video_format}"
)
parts.append(
{
"inline_data": {
"mimeType": mime_type,
"data": validated_data,
}
}
)
logger.debug(
f"Successfully added video part (format: {video_format})"
)
except ValueError as e:
logger.error(
f"Skipping video part due to validation error: {e}"
)
parts.append({"text": f"[Error processing video: {e}]"})
except Exception:
logger.exception("Unexpected error processing video part.")
parts.append(
{"text": "[Unexpected error processing video]"}
)
else:
# Log unrecognized but present types
if content_type:
logger.warning(
f"Unsupported content type or missing data in structured content: {content_type}"
)
elif (
"content" in msg and isinstance(msg["content"], str) and msg["content"]
):
parts.extend(_process_text_with_image(msg["content"]))
elif "tool_calls" in msg and isinstance(msg["tool_calls"], list):
# Keep existing tool call processing
for tool_call in msg["tool_calls"]:
function_call = tool_call.get("function", {})
# Sanitize arguments loading
arguments_str = function_call.get("arguments", "{}")
try:
function_call["args"] = json.loads(arguments_str)
except json.JSONDecodeError:
logger.warning(
f"Failed to decode tool call arguments: {arguments_str}"
)
function_call["args"] = {}
if "arguments" in function_call:
if "arguments" in function_call:
del function_call["arguments"]
parts.append({"functionCall": function_call})
if role not in SUPPORTED_ROLES:
if role == "tool":
role = "user"
else:
# 如果是最后一条消息,则认为是用户消息
if idx == len(messages) - 1:
role = "user"
else:
role = "model"
if parts:
if role == "system":
text_only_parts = [p for p in parts if "text" in p]
if len(text_only_parts) != len(parts):
logger.warning(
"Non-text parts found in system message; discarding them."
)
if text_only_parts:
system_instruction_parts.extend(text_only_parts)
else:
converted_messages.append({"role": role, "parts": parts})
system_instruction = (
None
if not system_instruction_parts
else {
"role": "system",
"parts": system_instruction_parts,
}
)
return converted_messages, system_instruction