import httpx import logging import os from pipecat.frames.frames import TextFrame, LLMResponseFrame from pipecat.processors.frame_processor import FrameProcessor, FrameDirection logger = logging.getLogger(__name__) class AzureOpenAILLMService(FrameProcessor): def __init__(self, preprompt: str = "", endpoint: str = "https://designcntrl-azure-openai-server.openai.azure.com/openai/deployments/ottopilot/chat/completions?api-version=2023-03-15-preview"): super().__init__() self.api_key = os.environ.get("azure_openai") if not self.api_key: logger.error("Missing Azure OpenAI API key: azure_openai") raise ValueError("Azure OpenAI API key not found in environment variable 'azure_openai'") self.preprompt = preprompt self.endpoint = endpoint self.client = httpx.AsyncClient() async def process_frame(self, frame, direction: FrameDirection): if isinstance(frame, TextFrame) and direction == FrameDirection.UPSTREAM: try: messages = [] if self.preprompt: messages.append({"role": "system", "content": self.preprompt}) messages.append({"role": "user", "content": frame.text}) headers = { "Content-Type": "application/json", "api-key": self.api_key } data = { "messages": messages, "temperature": 0.5, "max_tokens": 4000, "top_p": 1, "frequency_penalty": 0, "presence_penalty": 0 } response = await self.client.post(self.endpoint, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] continue_flag = len(content) >= 4000 await self.push_frame(LLMResponseFrame(content=content, continue_flag=continue_flag)) else: logger.error("No valid content in API response") await self.push_frame(TextFrame("Error: No valid response from LLM")) except httpx.HTTPStatusError as e: logger.error(f"API error: {e}") await self.push_frame(TextFrame(f"Error: API request failed - {str(e)}")) except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) await self.push_frame(TextFrame(f"Error: Unexpected error - {str(e)}")) else: await self.push_frame(frame, direction) async def stop(self): await self.client.aclose() logger.info("AzureOpenAILLMService stopped")