pipecat / azure_openai.py
Deadmon's picture
Update azure_openai.py
cae33ea verified
import httpx
import logging
import os
from pipecat.frames.frames import TextFrame, LLMResponseFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
logger = logging.getLogger(__name__)
class AzureOpenAILLMService(FrameProcessor):
def __init__(self, preprompt: str = "", endpoint: str = "https://designcntrl-azure-openai-server.openai.azure.com/openai/deployments/ottopilot/chat/completions?api-version=2023-03-15-preview"):
super().__init__()
self.api_key = os.environ.get("azure_openai")
if not self.api_key:
logger.error("Missing Azure OpenAI API key: azure_openai")
raise ValueError("Azure OpenAI API key not found in environment variable 'azure_openai'")
self.preprompt = preprompt
self.endpoint = endpoint
self.client = httpx.AsyncClient()
async def process_frame(self, frame, direction: FrameDirection):
if isinstance(frame, TextFrame) and direction == FrameDirection.UPSTREAM:
try:
messages = []
if self.preprompt:
messages.append({"role": "system", "content": self.preprompt})
messages.append({"role": "user", "content": frame.text})
headers = {
"Content-Type": "application/json",
"api-key": self.api_key
}
data = {
"messages": messages,
"temperature": 0.5,
"max_tokens": 4000,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0
}
response = await self.client.post(self.endpoint, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
continue_flag = len(content) >= 4000
await self.push_frame(LLMResponseFrame(content=content, continue_flag=continue_flag))
else:
logger.error("No valid content in API response")
await self.push_frame(TextFrame("Error: No valid response from LLM"))
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e}")
await self.push_frame(TextFrame(f"Error: API request failed - {str(e)}"))
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
await self.push_frame(TextFrame(f"Error: Unexpected error - {str(e)}"))
else:
await self.push_frame(frame, direction)
async def stop(self):
await self.client.aclose()
logger.info("AzureOpenAILLMService stopped")