Spaces:
Running
Running
File size: 13,971 Bytes
76b9762 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 |
import base64
import json
import re
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
import requests
from app.core.constants import (
AUDIO_FORMAT_TO_MIMETYPE,
DATA_URL_PATTERN,
IMAGE_URL_PATTERN,
MAX_AUDIO_SIZE_BYTES,
MAX_VIDEO_SIZE_BYTES,
SUPPORTED_AUDIO_FORMATS,
SUPPORTED_ROLES,
SUPPORTED_VIDEO_FORMATS,
VIDEO_FORMAT_TO_MIMETYPE,
)
from app.log.logger import get_message_converter_logger
logger = get_message_converter_logger()
class MessageConverter(ABC):
"""消息转换器基类"""
@abstractmethod
def convert(
self, messages: List[Dict[str, Any]]
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
pass
def _get_mime_type_and_data(base64_string):
"""
从 base64 字符串中提取 MIME 类型和数据。
参数:
base64_string (str): 可能包含 MIME 类型信息的 base64 字符串
返回:
tuple: (mime_type, encoded_data)
"""
# 检查字符串是否以 "data:" 格式开始
if base64_string.startswith("data:"):
# 提取 MIME 类型和数据
pattern = DATA_URL_PATTERN
match = re.match(pattern, base64_string)
if match:
mime_type = (
"image/jpeg" if match.group(1) == "image/jpg" else match.group(1)
)
encoded_data = match.group(2)
return mime_type, encoded_data
# 如果不是预期格式,假定它只是数据部分
return None, base64_string
def _convert_image(image_url: str) -> Dict[str, Any]:
if image_url.startswith("data:image"):
mime_type, encoded_data = _get_mime_type_and_data(image_url)
return {"inline_data": {"mime_type": mime_type, "data": encoded_data}}
else:
encoded_data = _convert_image_to_base64(image_url)
return {"inline_data": {"mime_type": "image/png", "data": encoded_data}}
def _convert_image_to_base64(url: str) -> str:
"""
将图片URL转换为base64编码
Args:
url: 图片URL
Returns:
str: base64编码的图片数据
"""
response = requests.get(url)
if response.status_code == 200:
# 将图片内容转换为base64
img_data = base64.b64encode(response.content).decode("utf-8")
return img_data
else:
raise Exception(f"Failed to fetch image: {response.status_code}")
def _process_text_with_image(text: str) -> List[Dict[str, Any]]:
"""
处理可能包含图片URL的文本,提取图片并转换为base64
Args:
text: 可能包含图片URL的文本
Returns:
List[Dict[str, Any]]: 包含文本和图片的部分列表
"""
parts = []
img_url_match = re.search(IMAGE_URL_PATTERN, text)
if img_url_match:
# 提取URL
img_url = img_url_match.group(2)
# 将URL对应的图片转换为base64
try:
base64_data = _convert_image_to_base64(img_url)
parts.append(
{"inline_data": {"mimeType": "image/png", "data": base64_data}}
)
except Exception:
# 如果转换失败,回退到文本模式
parts.append({"text": text})
else:
# 没有图片URL,作为纯文本处理
parts.append({"text": text})
return parts
class OpenAIMessageConverter(MessageConverter):
"""OpenAI消息格式转换器"""
def _validate_media_data(
self, format: str, data: str, supported_formats: List[str], max_size: int
) -> tuple[Optional[str], Optional[str]]:
"""Validates format and size of Base64 media data."""
if format.lower() not in supported_formats:
logger.error(
f"Unsupported media format: {format}. Supported: {supported_formats}"
)
raise ValueError(f"Unsupported media format: {format}")
try:
decoded_data = base64.b64decode(data, validate=True)
if len(decoded_data) > max_size:
logger.error(
f"Media data size ({len(decoded_data)} bytes) exceeds limit ({max_size} bytes)."
)
raise ValueError(
f"Media data size exceeds limit of {max_size // 1024 // 1024}MB"
)
return data
except base64.binascii.Error as e:
logger.error(f"Invalid Base64 data provided: {e}")
raise ValueError("Invalid Base64 data")
except Exception as e:
logger.error(f"Error validating media data: {e}")
raise
def convert(
self, messages: List[Dict[str, Any]]
) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
converted_messages = []
system_instruction_parts = []
for idx, msg in enumerate(messages):
role = msg.get("role", "")
parts = []
if "content" in msg and isinstance(msg["content"], list):
for content_item in msg["content"]:
if not isinstance(content_item, dict):
logger.warning(
f"Skipping unexpected content item format: {type(content_item)}"
)
continue
content_type = content_item.get("type")
if content_type == "text" and content_item.get("text"):
parts.append({"text": content_item["text"]})
elif content_type == "image_url" and content_item.get(
"image_url", {}
).get("url"):
try:
parts.append(
_convert_image(content_item["image_url"]["url"])
)
except Exception as e:
logger.error(
f"Failed to convert image URL {content_item['image_url']['url']}: {e}"
)
parts.append(
{
"text": f"[Error processing image: {content_item['image_url']['url']}]"
}
)
elif content_type == "input_audio" and content_item.get(
"input_audio"
):
audio_info = content_item["input_audio"]
audio_data = audio_info.get("data")
audio_format = audio_info.get("format", "").lower()
if not audio_data or not audio_format:
logger.warning(
"Skipping audio part due to missing data or format."
)
continue
try:
validated_data = self._validate_media_data(
audio_format,
audio_data,
SUPPORTED_AUDIO_FORMATS,
MAX_AUDIO_SIZE_BYTES,
)
# Get MIME type
mime_type = AUDIO_FORMAT_TO_MIMETYPE.get(audio_format)
if not mime_type:
# Should not happen if format validation passed, but double-check
logger.error(
f"Could not find MIME type for supported format: {audio_format}"
)
raise ValueError(
f"Internal error: MIME type mapping missing for {audio_format}"
)
parts.append(
{
"inline_data": {
"mimeType": mime_type,
"data": validated_data, # Use the validated Base64 data
}
}
)
logger.debug(
f"Successfully added audio part (format: {audio_format})"
)
except ValueError as e:
logger.error(
f"Skipping audio part due to validation error: {e}"
)
parts.append({"text": f"[Error processing audio: {e}]"})
except Exception:
logger.exception("Unexpected error processing audio part.")
parts.append(
{"text": "[Unexpected error processing audio]"}
)
elif content_type == "input_video" and content_item.get(
"input_video"
):
video_info = content_item["input_video"]
video_data = video_info.get("data")
video_format = video_info.get("format", "").lower()
if not video_data or not video_format:
logger.warning(
"Skipping video part due to missing data or format."
)
continue
try:
validated_data = self._validate_media_data(
video_format,
video_data,
SUPPORTED_VIDEO_FORMATS,
MAX_VIDEO_SIZE_BYTES,
)
mime_type = VIDEO_FORMAT_TO_MIMETYPE.get(video_format)
if not mime_type:
raise ValueError(
f"Internal error: MIME type mapping missing for {video_format}"
)
parts.append(
{
"inline_data": {
"mimeType": mime_type,
"data": validated_data,
}
}
)
logger.debug(
f"Successfully added video part (format: {video_format})"
)
except ValueError as e:
logger.error(
f"Skipping video part due to validation error: {e}"
)
parts.append({"text": f"[Error processing video: {e}]"})
except Exception:
logger.exception("Unexpected error processing video part.")
parts.append(
{"text": "[Unexpected error processing video]"}
)
else:
# Log unrecognized but present types
if content_type:
logger.warning(
f"Unsupported content type or missing data in structured content: {content_type}"
)
elif (
"content" in msg and isinstance(msg["content"], str) and msg["content"]
):
parts.extend(_process_text_with_image(msg["content"]))
elif "tool_calls" in msg and isinstance(msg["tool_calls"], list):
# Keep existing tool call processing
for tool_call in msg["tool_calls"]:
function_call = tool_call.get("function", {})
# Sanitize arguments loading
arguments_str = function_call.get("arguments", "{}")
try:
function_call["args"] = json.loads(arguments_str)
except json.JSONDecodeError:
logger.warning(
f"Failed to decode tool call arguments: {arguments_str}"
)
function_call["args"] = {}
if "arguments" in function_call:
if "arguments" in function_call:
del function_call["arguments"]
parts.append({"functionCall": function_call})
if role not in SUPPORTED_ROLES:
if role == "tool":
role = "user"
else:
# 如果是最后一条消息,则认为是用户消息
if idx == len(messages) - 1:
role = "user"
else:
role = "model"
if parts:
if role == "system":
text_only_parts = [p for p in parts if "text" in p]
if len(text_only_parts) != len(parts):
logger.warning(
"Non-text parts found in system message; discarding them."
)
if text_only_parts:
system_instruction_parts.extend(text_only_parts)
else:
converted_messages.append({"role": role, "parts": parts})
system_instruction = (
None
if not system_instruction_parts
else {
"role": "system",
"parts": system_instruction_parts,
}
)
return converted_messages, system_instruction
|