Spaces:
Configuration error
Configuration error
""" | |
This file contains the transformation logic for the Gemini realtime API. | |
""" | |
import json | |
import os | |
import uuid | |
from typing import Any, Dict, List, Optional, Union, cast | |
from litellm import verbose_logger | |
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj | |
from litellm.llms.base_llm.realtime.transformation import BaseRealtimeConfig | |
from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import ( | |
VertexGeminiConfig, | |
) | |
from litellm.responses.litellm_completion_transformation.transformation import ( | |
LiteLLMCompletionResponsesConfig, | |
) | |
from litellm.types.llms.gemini import ( | |
AutomaticActivityDetection, | |
BidiGenerateContentRealtimeInput, | |
BidiGenerateContentRealtimeInputConfig, | |
BidiGenerateContentServerContent, | |
BidiGenerateContentServerMessage, | |
BidiGenerateContentSetup, | |
) | |
from litellm.types.llms.openai import ( | |
OpenAIRealtimeContentPartDone, | |
OpenAIRealtimeConversationItemCreated, | |
OpenAIRealtimeDoneEvent, | |
OpenAIRealtimeEvents, | |
OpenAIRealtimeEventTypes, | |
OpenAIRealtimeOutputItemDone, | |
OpenAIRealtimeResponseAudioDone, | |
OpenAIRealtimeResponseContentPartAdded, | |
OpenAIRealtimeResponseDelta, | |
OpenAIRealtimeResponseDoneObject, | |
OpenAIRealtimeResponseTextDone, | |
OpenAIRealtimeStreamResponseBaseObject, | |
OpenAIRealtimeStreamResponseOutputItemAdded, | |
OpenAIRealtimeStreamSession, | |
OpenAIRealtimeStreamSessionEvents, | |
OpenAIRealtimeTurnDetection, | |
) | |
from litellm.types.llms.vertex_ai import ( | |
GeminiResponseModalities, | |
HttpxBlobType, | |
HttpxContentType, | |
) | |
from litellm.types.realtime import ( | |
ALL_DELTA_TYPES, | |
RealtimeModalityResponseTransformOutput, | |
RealtimeResponseTransformInput, | |
RealtimeResponseTypedDict, | |
) | |
from litellm.utils import get_empty_usage | |
from ..common_utils import encode_unserializable_types | |
MAP_GEMINI_FIELD_TO_OPENAI_EVENT: Dict[str, OpenAIRealtimeEventTypes] = { | |
"setupComplete": OpenAIRealtimeEventTypes.SESSION_CREATED, | |
"serverContent.generationComplete": OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE, | |
"serverContent.turnComplete": OpenAIRealtimeEventTypes.RESPONSE_DONE, | |
"serverContent.interrupted": OpenAIRealtimeEventTypes.RESPONSE_DONE, | |
} | |
class GeminiRealtimeConfig(BaseRealtimeConfig): | |
def validate_environment( | |
self, headers: dict, model: str, api_key: Optional[str] = None | |
) -> dict: | |
return headers | |
def get_complete_url( | |
self, api_base: Optional[str], model: str, api_key: Optional[str] = None | |
) -> str: | |
""" | |
Example output: | |
"BACKEND_WS_URL = "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent""; | |
""" | |
if api_base is None: | |
api_base = "wss://generativelanguage.googleapis.com" | |
if api_key is None: | |
api_key = os.environ.get("GEMINI_API_KEY") | |
if api_key is None: | |
raise ValueError("api_key is required for Gemini API calls") | |
api_base = api_base.replace("https://", "wss://") | |
api_base = api_base.replace("http://", "ws://") | |
return f"{api_base}/ws/google.ai.generativelanguage.v1beta.GenerativeService.BidiGenerateContent?key={api_key}" | |
def map_model_turn_event( | |
self, model_turn: HttpxContentType | |
) -> OpenAIRealtimeEventTypes: | |
""" | |
Map the model turn event to the OpenAI realtime events. | |
Returns either: | |
- response.text.delta - model_turn: {"parts": [{"text": "..."}]} | |
- response.audio.delta - model_turn: {"parts": [{"inlineData": {"mimeType": "audio/pcm", "data": "..."}}]} | |
Assumes parts is a single element list. | |
""" | |
if "parts" in model_turn: | |
parts = model_turn["parts"] | |
if len(parts) != 1: | |
verbose_logger.warning( | |
f"Realtime: Expected 1 part, got {len(parts)} for Gemini model turn event." | |
) | |
part = parts[0] | |
if "text" in part: | |
return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
elif "inlineData" in part: | |
return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
else: | |
raise ValueError(f"Unexpected part type: {part}") | |
raise ValueError(f"Unexpected model turn event, no 'parts' key: {model_turn}") | |
def map_generation_complete_event( | |
self, delta_type: Optional[ALL_DELTA_TYPES] | |
) -> OpenAIRealtimeEventTypes: | |
if delta_type == "text": | |
return OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
elif delta_type == "audio": | |
return OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
else: | |
raise ValueError(f"Unexpected delta type: {delta_type}") | |
def get_audio_mime_type(self, input_audio_format: str = "pcm16"): | |
mime_types = { | |
"pcm16": "audio/pcm", | |
"g711_ulaw": "audio/pcmu", | |
"g711_alaw": "audio/pcma", | |
} | |
return mime_types.get(input_audio_format, "application/octet-stream") | |
def map_automatic_turn_detection( | |
self, value: OpenAIRealtimeTurnDetection | |
) -> AutomaticActivityDetection: | |
automatic_activity_dection = AutomaticActivityDetection() | |
if "create_response" in value and isinstance(value["create_response"], bool): | |
automatic_activity_dection["disabled"] = not value["create_response"] | |
else: | |
automatic_activity_dection["disabled"] = True | |
if "prefix_padding_ms" in value and isinstance(value["prefix_padding_ms"], int): | |
automatic_activity_dection["prefixPaddingMs"] = value["prefix_padding_ms"] | |
if "silence_duration_ms" in value and isinstance( | |
value["silence_duration_ms"], int | |
): | |
automatic_activity_dection["silenceDurationMs"] = value[ | |
"silence_duration_ms" | |
] | |
return automatic_activity_dection | |
def get_supported_openai_params(self, model: str) -> List[str]: | |
return [ | |
"instructions", | |
"temperature", | |
"max_response_output_tokens", | |
"modalities", | |
"tools", | |
"input_audio_transcription", | |
"turn_detection", | |
] | |
def map_openai_params( | |
self, optional_params: dict, non_default_params: dict | |
) -> dict: | |
if "generationConfig" not in optional_params: | |
optional_params["generationConfig"] = {} | |
for key, value in non_default_params.items(): | |
if key == "instructions": | |
optional_params["systemInstruction"] = HttpxContentType( | |
role="user", parts=[{"text": value}] | |
) | |
elif key == "temperature": | |
optional_params["generationConfig"]["temperature"] = value | |
elif key == "max_response_output_tokens": | |
optional_params["generationConfig"]["maxOutputTokens"] = value | |
elif key == "modalities": | |
optional_params["generationConfig"]["responseModalities"] = [ | |
modality.upper() for modality in cast(List[str], value) | |
] | |
elif key == "tools": | |
from litellm.llms.vertex_ai.gemini.vertex_and_google_ai_studio_gemini import ( | |
VertexGeminiConfig, | |
) | |
vertex_gemini_config = VertexGeminiConfig() | |
vertex_gemini_config._map_function(value) | |
optional_params["generationConfig"][ | |
"tools" | |
] = vertex_gemini_config._map_function(value) | |
elif key == "input_audio_transcription" and value is not None: | |
optional_params["inputAudioTranscription"] = {} | |
elif key == "turn_detection": | |
value_typed = cast(OpenAIRealtimeTurnDetection, value) | |
transformed_audio_activity_config = self.map_automatic_turn_detection( | |
value_typed | |
) | |
if ( | |
len(transformed_audio_activity_config) > 0 | |
): # if the config is not empty, add it to the optional params | |
optional_params[ | |
"realtimeInputConfig" | |
] = BidiGenerateContentRealtimeInputConfig( | |
automaticActivityDetection=transformed_audio_activity_config | |
) | |
if len(optional_params["generationConfig"]) == 0: | |
optional_params.pop("generationConfig") | |
return optional_params | |
def transform_realtime_request( | |
self, | |
message: str, | |
model: str, | |
session_configuration_request: Optional[str] = None, | |
) -> List[str]: | |
realtime_input_dict: BidiGenerateContentRealtimeInput = {} | |
try: | |
json_message = json.loads(message) | |
except json.JSONDecodeError: | |
if isinstance(message, bytes): | |
message_str = message.decode("utf-8", errors="replace") | |
else: | |
message_str = str(message) | |
raise ValueError(f"Invalid JSON message: {message_str}") | |
## HANDLE SESSION UPDATE ## | |
messages: List[str] = [] | |
if "type" in json_message and json_message["type"] == "session.update": | |
client_session_configuration_request = self.map_openai_params( | |
optional_params={}, non_default_params=json_message["session"] | |
) | |
client_session_configuration_request["model"] = f"models/{model}" | |
messages.append( | |
json.dumps( | |
{ | |
"setup": client_session_configuration_request, | |
} | |
) | |
) | |
# elif session_configuration_request is None: | |
# default_session_configuration_request = self.session_configuration_request(model) | |
# messages.append(default_session_configuration_request) | |
## HANDLE INPUT AUDIO BUFFER ## | |
if ( | |
"type" in json_message | |
and json_message["type"] == "input_audio_buffer.append" | |
): | |
realtime_input_dict["audio"] = HttpxBlobType( | |
mimeType=self.get_audio_mime_type(), data=json_message["audio"] | |
) | |
else: | |
realtime_input_dict["text"] = message | |
if len(realtime_input_dict) != 1: | |
raise ValueError( | |
f"Only one argument can be set, got {len(realtime_input_dict)}:" | |
f" {list(realtime_input_dict.keys())}" | |
) | |
realtime_input_dict = cast( | |
BidiGenerateContentRealtimeInput, | |
encode_unserializable_types(cast(Dict[str, object], realtime_input_dict)), | |
) | |
messages.append(json.dumps({"realtime_input": realtime_input_dict})) | |
return messages | |
def transform_session_created_event( | |
self, | |
model: str, | |
logging_session_id: str, | |
session_configuration_request: Optional[str] = None, | |
) -> OpenAIRealtimeStreamSessionEvents: | |
if session_configuration_request: | |
session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
session_configuration_request | |
).get("setup", {}) | |
else: | |
session_configuration_request_dict = {} | |
_model = session_configuration_request_dict.get("model") or model | |
generation_config = ( | |
session_configuration_request_dict.get("generationConfig", {}) or {} | |
) | |
gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
_modalities = [ | |
modality.lower() for modality in cast(List[str], gemini_modalities) | |
] | |
_system_instruction = session_configuration_request_dict.get( | |
"systemInstruction" | |
) | |
session = OpenAIRealtimeStreamSession( | |
id=logging_session_id, | |
modalities=_modalities, | |
) | |
if _system_instruction is not None and isinstance(_system_instruction, str): | |
session["instructions"] = _system_instruction | |
if _model is not None and isinstance(_model, str): | |
session["model"] = _model.strip( | |
"models/" | |
) # keep it consistent with how openai returns the model name | |
return OpenAIRealtimeStreamSessionEvents( | |
type="session.created", | |
session=session, | |
event_id=str(uuid.uuid4()), | |
) | |
def _is_new_content_delta( | |
self, | |
previous_messages: Optional[List[OpenAIRealtimeEvents]] = None, | |
) -> bool: | |
if previous_messages is None or len(previous_messages) == 0: | |
return True | |
if "type" in previous_messages[-1] and previous_messages[-1]["type"].endswith( | |
"delta" | |
): | |
return False | |
return True | |
def return_new_content_delta_events( | |
self, | |
response_id: str, | |
output_item_id: str, | |
conversation_id: str, | |
delta_type: ALL_DELTA_TYPES, | |
session_configuration_request: Optional[str] = None, | |
) -> List[OpenAIRealtimeEvents]: | |
if session_configuration_request is None: | |
raise ValueError( | |
"session_configuration_request is required for Gemini API calls" | |
) | |
session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
session_configuration_request | |
).get("setup", {}) | |
generation_config = session_configuration_request_dict.get( | |
"generationConfig", {} | |
) | |
gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
_modalities = [ | |
modality.lower() for modality in cast(List[str], gemini_modalities) | |
] | |
_temperature = generation_config.get("temperature") | |
_max_output_tokens = generation_config.get("maxOutputTokens") | |
response_items: List[OpenAIRealtimeEvents] = [] | |
## - return response.created | |
response_created = OpenAIRealtimeStreamResponseBaseObject( | |
type="response.created", | |
event_id="event_{}".format(uuid.uuid4()), | |
response={ | |
"object": "realtime.response", | |
"id": response_id, | |
"status": "in_progress", | |
"output": [], | |
"conversation_id": conversation_id, | |
"modalities": _modalities, | |
"temperature": _temperature, | |
"max_output_tokens": _max_output_tokens, | |
}, | |
) | |
response_items.append(response_created) | |
## - return response.output_item.added ← adds ‘item_id’ same for all subsequent events | |
response_output_item_added = OpenAIRealtimeStreamResponseOutputItemAdded( | |
type="response.output_item.added", | |
response_id=response_id, | |
output_index=0, | |
item={ | |
"id": output_item_id, | |
"object": "realtime.item", | |
"type": "message", | |
"status": "in_progress", | |
"role": "assistant", | |
"content": [], | |
}, | |
) | |
response_items.append(response_output_item_added) | |
## - return conversation.item.created | |
conversation_item_created = OpenAIRealtimeConversationItemCreated( | |
type="conversation.item.created", | |
event_id="event_{}".format(uuid.uuid4()), | |
item={ | |
"id": output_item_id, | |
"object": "realtime.item", | |
"type": "message", | |
"status": "in_progress", | |
"role": "assistant", | |
"content": [], | |
}, | |
) | |
response_items.append(conversation_item_created) | |
## - return response.content_part.added | |
response_content_part_added = OpenAIRealtimeResponseContentPartAdded( | |
type="response.content_part.added", | |
content_index=0, | |
output_index=0, | |
event_id="event_{}".format(uuid.uuid4()), | |
item_id=output_item_id, | |
part={ | |
"type": "text", | |
"text": "", | |
} | |
if delta_type == "text" | |
else { | |
"type": "audio", | |
"transcript": "", | |
}, | |
response_id=response_id, | |
) | |
response_items.append(response_content_part_added) | |
return response_items | |
def transform_content_delta_events( | |
self, | |
message: BidiGenerateContentServerContent, | |
output_item_id: str, | |
response_id: str, | |
delta_type: ALL_DELTA_TYPES, | |
) -> OpenAIRealtimeResponseDelta: | |
delta = "" | |
try: | |
if "modelTurn" in message and "parts" in message["modelTurn"]: | |
for part in message["modelTurn"]["parts"]: | |
if "text" in part: | |
delta += part["text"] | |
elif "inlineData" in part: | |
delta += part["inlineData"]["data"] | |
except Exception as e: | |
raise ValueError( | |
f"Error transforming content delta events: {e}, got message: {message}" | |
) | |
return OpenAIRealtimeResponseDelta( | |
type="response.text.delta" | |
if delta_type == "text" | |
else "response.audio.delta", | |
content_index=0, | |
event_id="event_{}".format(uuid.uuid4()), | |
item_id=output_item_id, | |
output_index=0, | |
response_id=response_id, | |
delta=delta, | |
) | |
def transform_content_done_event( | |
self, | |
delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]], | |
current_output_item_id: Optional[str], | |
current_response_id: Optional[str], | |
delta_type: ALL_DELTA_TYPES, | |
) -> Union[OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone]: | |
if delta_chunks: | |
delta = "".join([delta_chunk["delta"] for delta_chunk in delta_chunks]) | |
else: | |
delta = "" | |
if current_output_item_id is None or current_response_id is None: | |
raise ValueError( | |
"current_output_item_id and current_response_id cannot be None for a 'done' event." | |
) | |
if delta_type == "text": | |
return OpenAIRealtimeResponseTextDone( | |
type="response.text.done", | |
content_index=0, | |
event_id="event_{}".format(uuid.uuid4()), | |
item_id=current_output_item_id, | |
output_index=0, | |
response_id=current_response_id, | |
text=delta, | |
) | |
elif delta_type == "audio": | |
return OpenAIRealtimeResponseAudioDone( | |
type="response.audio.done", | |
content_index=0, | |
event_id="event_{}".format(uuid.uuid4()), | |
item_id=current_output_item_id, | |
output_index=0, | |
response_id=current_response_id, | |
) | |
def return_additional_content_done_events( | |
self, | |
current_output_item_id: Optional[str], | |
current_response_id: Optional[str], | |
delta_done_event: Union[ | |
OpenAIRealtimeResponseTextDone, OpenAIRealtimeResponseAudioDone | |
], | |
delta_type: ALL_DELTA_TYPES, | |
) -> List[OpenAIRealtimeEvents]: | |
""" | |
- return response.content_part.done | |
- return response.output_item.done | |
""" | |
if current_output_item_id is None or current_response_id is None: | |
raise ValueError( | |
"current_output_item_id and current_response_id cannot be None for a 'done' event." | |
) | |
returned_items: List[OpenAIRealtimeEvents] = [] | |
delta_done_event_text = cast(Optional[str], delta_done_event.get("text")) | |
# response.content_part.done | |
response_content_part_done = OpenAIRealtimeContentPartDone( | |
type="response.content_part.done", | |
content_index=0, | |
event_id="event_{}".format(uuid.uuid4()), | |
item_id=current_output_item_id, | |
output_index=0, | |
part={"type": "text", "text": delta_done_event_text} | |
if delta_done_event_text and delta_type == "text" | |
else { | |
"type": "audio", | |
"transcript": "", # gemini doesn't return transcript for audio | |
}, | |
response_id=current_response_id, | |
) | |
returned_items.append(response_content_part_done) | |
# response.output_item.done | |
response_output_item_done = OpenAIRealtimeOutputItemDone( | |
type="response.output_item.done", | |
event_id="event_{}".format(uuid.uuid4()), | |
output_index=0, | |
response_id=current_response_id, | |
item={ | |
"id": current_output_item_id, | |
"object": "realtime.item", | |
"type": "message", | |
"status": "completed", | |
"role": "assistant", | |
"content": [ | |
{"type": "text", "text": delta_done_event_text} | |
if delta_done_event_text and delta_type == "text" | |
else { | |
"type": "audio", | |
"transcript": "", | |
} | |
], | |
}, | |
) | |
returned_items.append(response_output_item_done) | |
return returned_items | |
def get_nested_value(obj: dict, path: str) -> Any: | |
keys = path.split(".") | |
current = obj | |
for key in keys: | |
if isinstance(current, dict) and key in current: | |
current = current[key] | |
else: | |
return None | |
return current | |
def update_current_delta_chunks( | |
self, | |
transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]], | |
current_delta_chunks: Optional[List[OpenAIRealtimeResponseDelta]], | |
) -> Optional[List[OpenAIRealtimeResponseDelta]]: | |
try: | |
if isinstance(transformed_message, list): | |
current_delta_chunks = [] | |
any_delta_chunk = False | |
for event in transformed_message: | |
if event["type"] == "response.text.delta": | |
current_delta_chunks.append( | |
cast(OpenAIRealtimeResponseDelta, event) | |
) | |
any_delta_chunk = True | |
if not any_delta_chunk: | |
current_delta_chunks = ( | |
None # reset current_delta_chunks if no delta chunks | |
) | |
else: | |
if ( | |
transformed_message["type"] == "response.text.delta" | |
): # ONLY ACCUMULATE TEXT DELTA CHUNKS - AUDIO WILL CAUSE SERVER MEMORY ISSUES | |
if current_delta_chunks is None: | |
current_delta_chunks = [] | |
current_delta_chunks.append( | |
cast(OpenAIRealtimeResponseDelta, transformed_message) | |
) | |
else: | |
current_delta_chunks = None | |
return current_delta_chunks | |
except Exception as e: | |
raise ValueError( | |
f"Error updating current delta chunks: {e}, got transformed_message: {transformed_message}" | |
) | |
def update_current_item_chunks( | |
self, | |
transformed_message: Union[OpenAIRealtimeEvents, List[OpenAIRealtimeEvents]], | |
current_item_chunks: Optional[List[OpenAIRealtimeOutputItemDone]], | |
) -> Optional[List[OpenAIRealtimeOutputItemDone]]: | |
try: | |
if isinstance(transformed_message, list): | |
current_item_chunks = [] | |
any_item_chunk = False | |
for event in transformed_message: | |
if event["type"] == "response.output_item.done": | |
current_item_chunks.append( | |
cast(OpenAIRealtimeOutputItemDone, event) | |
) | |
any_item_chunk = True | |
if not any_item_chunk: | |
current_item_chunks = ( | |
None # reset current_item_chunks if no item chunks | |
) | |
else: | |
if transformed_message["type"] == "response.output_item.done": | |
if current_item_chunks is None: | |
current_item_chunks = [] | |
current_item_chunks.append( | |
cast(OpenAIRealtimeOutputItemDone, transformed_message) | |
) | |
else: | |
current_item_chunks = None | |
return current_item_chunks | |
except Exception as e: | |
raise ValueError( | |
f"Error updating current item chunks: {e}, got transformed_message: {transformed_message}" | |
) | |
def transform_response_done_event( | |
self, | |
message: BidiGenerateContentServerMessage, | |
current_response_id: Optional[str], | |
current_conversation_id: Optional[str], | |
output_items: Optional[List[OpenAIRealtimeOutputItemDone]], | |
session_configuration_request: Optional[str] = None, | |
) -> OpenAIRealtimeDoneEvent: | |
if current_conversation_id is None or current_response_id is None: | |
raise ValueError( | |
f"current_conversation_id and current_response_id must all be set for a 'done' event. Got=current_conversation_id: {current_conversation_id}, current_response_id: {current_response_id}" | |
) | |
if session_configuration_request: | |
session_configuration_request_dict: BidiGenerateContentSetup = json.loads( | |
session_configuration_request | |
).get("setup", {}) | |
else: | |
session_configuration_request_dict = {} | |
generation_config = session_configuration_request_dict.get( | |
"generationConfig", {} | |
) | |
temperature = generation_config.get("temperature") | |
max_output_tokens = generation_config.get("max_output_tokens") | |
gemini_modalities = generation_config.get("responseModalities", ["TEXT"]) | |
_modalities = [ | |
modality.lower() for modality in cast(List[str], gemini_modalities) | |
] | |
if "usageMetadata" in message: | |
_chat_completion_usage = VertexGeminiConfig._calculate_usage( | |
completion_response=message, | |
) | |
else: | |
_chat_completion_usage = get_empty_usage() | |
responses_api_usage = LiteLLMCompletionResponsesConfig._transform_chat_completion_usage_to_responses_usage( | |
_chat_completion_usage, | |
) | |
response_done_event = OpenAIRealtimeDoneEvent( | |
type="response.done", | |
event_id="event_{}".format(uuid.uuid4()), | |
response=OpenAIRealtimeResponseDoneObject( | |
object="realtime.response", | |
id=current_response_id, | |
status="completed", | |
output=[output_item["item"] for output_item in output_items] | |
if output_items | |
else [], | |
conversation_id=current_conversation_id, | |
modalities=_modalities, | |
usage=responses_api_usage.model_dump(), | |
), | |
) | |
if temperature is not None: | |
response_done_event["response"]["temperature"] = temperature | |
if max_output_tokens is not None: | |
response_done_event["response"]["max_output_tokens"] = max_output_tokens | |
return response_done_event | |
def handle_openai_modality_event( | |
self, | |
openai_event: OpenAIRealtimeEventTypes, | |
json_message: dict, | |
realtime_response_transform_input: RealtimeResponseTransformInput, | |
delta_type: ALL_DELTA_TYPES, | |
) -> RealtimeModalityResponseTransformOutput: | |
current_output_item_id = realtime_response_transform_input[ | |
"current_output_item_id" | |
] | |
current_response_id = realtime_response_transform_input["current_response_id"] | |
current_conversation_id = realtime_response_transform_input[ | |
"current_conversation_id" | |
] | |
current_delta_chunks = realtime_response_transform_input["current_delta_chunks"] | |
session_configuration_request = realtime_response_transform_input[ | |
"session_configuration_request" | |
] | |
returned_message: List[OpenAIRealtimeEvents] = [] | |
if ( | |
openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
): | |
current_response_id = current_response_id or "resp_{}".format(uuid.uuid4()) | |
if not current_output_item_id: | |
# send the list of standard 'new' content.delta events | |
current_output_item_id = "item_{}".format(uuid.uuid4()) | |
current_conversation_id = current_conversation_id or "conv_{}".format( | |
uuid.uuid4() | |
) | |
returned_message = self.return_new_content_delta_events( | |
session_configuration_request=session_configuration_request, | |
response_id=current_response_id, | |
output_item_id=current_output_item_id, | |
conversation_id=current_conversation_id, | |
delta_type=delta_type, | |
) | |
# send the list of standard 'new' content.delta events | |
transformed_message = self.transform_content_delta_events( | |
BidiGenerateContentServerContent(**json_message["serverContent"]), | |
current_output_item_id, | |
current_response_id, | |
delta_type=delta_type, | |
) | |
returned_message.append(transformed_message) | |
elif ( | |
openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
): | |
transformed_content_done_event = self.transform_content_done_event( | |
current_output_item_id=current_output_item_id, | |
current_response_id=current_response_id, | |
delta_chunks=current_delta_chunks, | |
delta_type=delta_type, | |
) | |
returned_message = [transformed_content_done_event] | |
additional_items = self.return_additional_content_done_events( | |
current_output_item_id=current_output_item_id, | |
current_response_id=current_response_id, | |
delta_done_event=transformed_content_done_event, | |
delta_type=delta_type, | |
) | |
returned_message.extend(additional_items) | |
return { | |
"returned_message": returned_message, | |
"current_output_item_id": current_output_item_id, | |
"current_response_id": current_response_id, | |
"current_conversation_id": current_conversation_id, | |
"current_delta_chunks": current_delta_chunks, | |
"current_delta_type": delta_type, | |
} | |
def map_openai_event( | |
self, | |
key: str, | |
value: dict, | |
current_delta_type: Optional[ALL_DELTA_TYPES], | |
json_message: dict, | |
) -> OpenAIRealtimeEventTypes: | |
model_turn_event = value.get("modelTurn") | |
generation_complete_event = value.get("generationComplete") | |
openai_event: Optional[OpenAIRealtimeEventTypes] = None | |
if model_turn_event: # check if model turn event | |
openai_event = self.map_model_turn_event(model_turn_event) | |
elif generation_complete_event: | |
openai_event = self.map_generation_complete_event( | |
delta_type=current_delta_type | |
) | |
else: | |
# Check if this key or any nested key matches our mapping | |
for map_key, openai_event in MAP_GEMINI_FIELD_TO_OPENAI_EVENT.items(): | |
if map_key == key or ( | |
"." in map_key | |
and GeminiRealtimeConfig.get_nested_value(json_message, map_key) | |
is not None | |
): | |
openai_event = openai_event | |
break | |
if openai_event is None: | |
raise ValueError(f"Unknown openai event: {key}, value: {value}") | |
return openai_event | |
def transform_realtime_response( | |
self, | |
message: Union[str, bytes], | |
model: str, | |
logging_obj: LiteLLMLoggingObj, | |
realtime_response_transform_input: RealtimeResponseTransformInput, | |
) -> RealtimeResponseTypedDict: | |
""" | |
Keep this state less - leave the state management (e.g. tracking current_output_item_id, current_response_id, current_conversation_id, current_delta_chunks) to the caller. | |
""" | |
try: | |
json_message = json.loads(message) | |
except json.JSONDecodeError: | |
if isinstance(message, bytes): | |
message_str = message.decode("utf-8", errors="replace") | |
else: | |
message_str = str(message) | |
raise ValueError(f"Invalid JSON message: {message_str}") | |
logging_session_id = logging_obj.litellm_trace_id | |
current_output_item_id = realtime_response_transform_input[ | |
"current_output_item_id" | |
] | |
current_response_id = realtime_response_transform_input["current_response_id"] | |
current_conversation_id = realtime_response_transform_input[ | |
"current_conversation_id" | |
] | |
current_delta_chunks = realtime_response_transform_input["current_delta_chunks"] | |
session_configuration_request = realtime_response_transform_input[ | |
"session_configuration_request" | |
] | |
current_item_chunks = realtime_response_transform_input["current_item_chunks"] | |
current_delta_type: Optional[ | |
ALL_DELTA_TYPES | |
] = realtime_response_transform_input["current_delta_type"] | |
returned_message: List[OpenAIRealtimeEvents] = [] | |
for key, value in json_message.items(): | |
# Check if this key or any nested key matches our mapping | |
openai_event = self.map_openai_event( | |
key=key, | |
value=value, | |
current_delta_type=current_delta_type, | |
json_message=json_message, | |
) | |
if openai_event == OpenAIRealtimeEventTypes.SESSION_CREATED: | |
transformed_message = self.transform_session_created_event( | |
model, | |
logging_session_id, | |
realtime_response_transform_input["session_configuration_request"], | |
) | |
session_configuration_request = json.dumps(transformed_message) | |
returned_message.append(transformed_message) | |
elif openai_event == OpenAIRealtimeEventTypes.RESPONSE_DONE: | |
transformed_response_done_event = self.transform_response_done_event( | |
message=BidiGenerateContentServerMessage(**json_message), # type: ignore | |
current_response_id=current_response_id, | |
current_conversation_id=current_conversation_id, | |
session_configuration_request=session_configuration_request, | |
output_items=None, | |
) | |
returned_message.append(transformed_response_done_event) | |
elif ( | |
openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DELTA | |
or openai_event == OpenAIRealtimeEventTypes.RESPONSE_TEXT_DONE | |
or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DELTA | |
or openai_event == OpenAIRealtimeEventTypes.RESPONSE_AUDIO_DONE | |
): | |
_returned_message = self.handle_openai_modality_event( | |
openai_event, | |
json_message, | |
realtime_response_transform_input, | |
delta_type="text" if "text" in openai_event.value else "audio", | |
) | |
returned_message.extend(_returned_message["returned_message"]) | |
current_output_item_id = _returned_message["current_output_item_id"] | |
current_response_id = _returned_message["current_response_id"] | |
current_conversation_id = _returned_message["current_conversation_id"] | |
current_delta_chunks = _returned_message["current_delta_chunks"] | |
current_delta_type = _returned_message["current_delta_type"] | |
else: | |
raise ValueError(f"Unknown openai event: {openai_event}") | |
if len(returned_message) == 0: | |
if isinstance(message, bytes): | |
message_str = message.decode("utf-8", errors="replace") | |
else: | |
message_str = str(message) | |
raise ValueError(f"Unknown message type: {message_str}") | |
current_delta_chunks = self.update_current_delta_chunks( | |
transformed_message=returned_message, | |
current_delta_chunks=current_delta_chunks, | |
) | |
current_item_chunks = self.update_current_item_chunks( | |
transformed_message=returned_message, | |
current_item_chunks=current_item_chunks, | |
) | |
return { | |
"response": returned_message, | |
"current_output_item_id": current_output_item_id, | |
"current_response_id": current_response_id, | |
"current_delta_chunks": current_delta_chunks, | |
"current_conversation_id": current_conversation_id, | |
"current_item_chunks": current_item_chunks, | |
"current_delta_type": current_delta_type, | |
"session_configuration_request": session_configuration_request, | |
} | |
def requires_session_configuration(self) -> bool: | |
return True | |
def session_configuration_request(self, model: str) -> str: | |
""" | |
``` | |
{ | |
"model": string, | |
"generationConfig": { | |
"candidateCount": integer, | |
"maxOutputTokens": integer, | |
"temperature": number, | |
"topP": number, | |
"topK": integer, | |
"presencePenalty": number, | |
"frequencyPenalty": number, | |
"responseModalities": [string], | |
"speechConfig": object, | |
"mediaResolution": object | |
}, | |
"systemInstruction": string, | |
"tools": [object] | |
} | |
``` | |
""" | |
response_modalities: List[GeminiResponseModalities] = ["AUDIO"] | |
output_audio_transcription = False | |
# if "audio" in model: ## UNCOMMENT THIS WHEN AUDIO IS SUPPORTED | |
# output_audio_transcription = True | |
setup_config: BidiGenerateContentSetup = { | |
"model": f"models/{model}", | |
"generationConfig": {"responseModalities": response_modalities}, | |
} | |
if output_audio_transcription: | |
setup_config["outputAudioTranscription"] = {} | |
return json.dumps( | |
{ | |
"setup": setup_config, | |
} | |
) | |