Spaces:
Build error
Build error
| import os | |
| import httpx | |
| from dotenv import load_dotenv | |
| from typing import Dict, Any, Optional, List | |
| from datetime import datetime | |
| import logging | |
| import asyncio | |
| from openai import AsyncOpenAI | |
| import json | |
| import google.generativeai as genai | |
| import PIL.Image | |
| from typing import List, Dict, Any, Optional | |
| from app.utils.load_env import ACCESS_TOKEN, WHATSAPP_API_URL, GEMINI_API, OPENAI_API | |
| from app.utils.system_prompt import system_prompt | |
| from app.services.search_engine import google_search | |
| # Load environment variables | |
| load_dotenv() | |
| # Define function specifications for Gemini | |
| function_declarations = [ | |
| { | |
| "name": "google_search", | |
| "description": "Perform a Google search and retrieve search results", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The search query to perform" | |
| }, | |
| "num_results": { | |
| "type": "string", | |
| "description": "Number of search results to retrieve (1-10)", | |
| "default": "3" | |
| } | |
| }, | |
| "required": ["query"] | |
| } | |
| } | |
| ] | |
| genai.configure(api_key=GEMINI_API) | |
| client = AsyncOpenAI(api_key = OPENAI_API) | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Validate environment variables | |
| if not WHATSAPP_API_URL or not ACCESS_TOKEN: | |
| logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!") | |
| # Helper function to send a reply | |
| async def send_reply(to: str, body: str, whatsapp_token: str, whatsapp_url:str) -> Dict[str, Any]: | |
| headers = { | |
| "Authorization": f"Bearer {whatsapp_token}", | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "messaging_product": "whatsapp", | |
| "to": to, | |
| "type": "text", | |
| "text": { | |
| "body": body | |
| } | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post(whatsapp_url, json=data, headers=headers) | |
| if response.status_code != 200: | |
| error_detail = response.json() | |
| logger.error(f"Failed to send reply: {error_detail}") | |
| raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}") | |
| return response.json() | |
| # Helper function to generate a reply based on message content | |
| async def generate_reply(sender: str, content: str, timestamp: int) -> str: | |
| try: | |
| received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds | |
| if "hello" in content.lower(): | |
| return f"Hi {sender}, how can I assist you today?" | |
| elif "test" in content.lower(): | |
| return f"Hi {sender}, this is a reply to your test message." | |
| elif received_time.hour < 12: | |
| return f"Good morning, {sender}! How can I help you?" | |
| else: | |
| return f"Hello {sender}, I hope you're having a great day!" | |
| except Exception as e: | |
| logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| return f"Sorry {sender}, I couldn't process your message. Please try again." | |
| async def process_message_with_llm( | |
| sender_id: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| rag_system: Any, | |
| whatsapp_token: str, | |
| whatsapp_url:str, | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| """Process message with retry logic.""" | |
| try: | |
| logger.info(f"Processing message for sender: {sender_id}") | |
| generated_reply = await generate_response_from_gemini( | |
| sender=sender_id, | |
| content=content, | |
| history=history, | |
| rag_system=rag_system, | |
| image_file_path=image_file_path, | |
| doc_path=doc_path, | |
| video_file_path=video_file_path | |
| ) | |
| logger.info(f"Generated reply: {generated_reply}") | |
| response = await send_reply(sender_id, generated_reply, whatsapp_token, whatsapp_url) | |
| # return generated_reply | |
| return generated_reply | |
| except Exception as e: | |
| logger.error(f"Error in process_message_with_retry: {str(e)}", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| async def generate_response_from_gemini( | |
| sender: str, | |
| content: str, | |
| history: List[Dict[str, str]], | |
| rag_system: Any = None, | |
| image_file_path: Optional[str] = None, | |
| doc_path: Optional[str] = None, | |
| video_file_path: Optional[str] = None, | |
| ) -> str: | |
| try: | |
| logger.info(f"Generating response for sender: {sender}") | |
| # Initialize the model | |
| model = genai.GenerativeModel("gemini-1.5-pro-002", system_instruction= system_prompt) | |
| # Start chat with history | |
| chat = model.start_chat(history=history) | |
| if rag_system: | |
| keywords = await rag_system.extract_keywords_async(content) | |
| # Implement RAG: Retrieve relevant documents | |
| retrieved_docs = await rag_system.adv_query(content, keywords=keywords, top_k=1) | |
| if retrieved_docs: | |
| logger.info(f"Retrieved {len(retrieved_docs)} documents for context.") | |
| # Format the retrieved documents as a context string | |
| context = "\n\n".join([f"Content: {doc['text']}" for doc in retrieved_docs]) | |
| # Option 1: Append to history as a system message | |
| history.append({"role": "system", "content": f"Relevant documents:\n{context}"}) | |
| # Reinitialize chat with updated history | |
| chat = model.start_chat(history=history) | |
| # Process image | |
| if image_file_path: | |
| logger.info(f"Processing image at {image_file_path}") | |
| image_data = PIL.Image.open(image_file_path) | |
| response = await chat.send_message_async(image_data) | |
| return response.text | |
| # Process document | |
| if doc_path: | |
| logger.info(f"Processing document at {doc_path}") | |
| doc_data = genai.upload_file(doc_path) | |
| response = await chat.send_message_async(doc_data) | |
| return response.text | |
| # Process video (if supported) | |
| if video_file_path: | |
| logger.info(f"Processing video at {video_file_path}") | |
| video_data = genai.upload_file(video_file_path) | |
| response = await chat.send_message_async(video_data) | |
| return response.text | |
| # Implement video processing logic here | |
| pass # Placeholder for video processing logic | |
| # Send the user's message | |
| response = await chat.send_message_async(content) | |
| # response = await handle_function_call(response) | |
| return response.text | |
| except Exception as e: | |
| logger.error("Error in generate_response_from_gemini:", exc_info=True) | |
| return "Sorry, I couldn't generate a response at this time." | |
| async def handle_function_call(chat): | |
| """ | |
| Handle function calls from the Gemini API. | |
| Args: | |
| chat (ChatSession): The current chat session. | |
| Returns: | |
| The response after resolving function calls. | |
| """ | |
| # Continue the conversation and handle any function calls | |
| while True: | |
| response = chat.send_message_async(chat.history[-1]) | |
| # Check if there are any function calls to handle | |
| if response.candidates[0].content.parts[0].function_call: | |
| function_call = response.candidates[0].content.parts[0].function_call | |
| function_name = function_call.name | |
| function_args = json.loads(function_call.args) | |
| # Dispatch to the appropriate function | |
| if function_name == "google_search": | |
| # Handle async function call | |
| result = await google_search( | |
| query=function_args['query'], | |
| num_results=function_args.get('num_results', '3') | |
| ) | |
| # Send the function result back to continue the conversation | |
| response = chat.send_message_async( | |
| part={ | |
| "function_response": { | |
| "name": function_name, | |
| "response": result | |
| } | |
| } | |
| ) | |
| else: | |
| # No more function calls, return the final response | |
| return response | |
| # Process message with retry logic | |
| # async def process_message_with_retry( | |
| # sender_id: str, | |
| # content: str, | |
| # history: List[str], | |
| # timestamp: Optional[int] = None, | |
| # media: Optional[Dict[str, Any]] = None, | |
| # image_file_path: Optional[str] = None, | |
| # doc_path: Optional[str] = None, | |
| # ) -> Dict[str, Any]: | |
| # """Process message with retry logic""" | |
| # retries = 1 | |
| # delay = 0.1 # Initial delay in seconds | |
| # # for attempt in range(retries): | |
| # try: | |
| # logger.info(f"Sending message to the Gemini model...") | |
| # generated_reply = await generate_response_from_gemini(sender = sender_id, content=content, history = history, timestamp = timestamp, image_file_path = image_file_path, media=media, doc_path = doc_path) | |
| # logger.info(f"Reply generated: {generated_reply}") | |
| # response = await send_reply(sender_id, generated_reply) | |
| # return generated_reply | |
| # return {"status": "success", "reply": generated_reply, "response": response} | |
| # except Exception as e: | |
| # logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
| # return {"status": "error", "reply": "Sorry, I couldn't generate a response at this time."} | |
| # logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True) | |
| # if attempt < retries - 1: | |
| # await asyncio.sleep(delay) | |
| # delay *= 2 # Exponential backoff | |
| # else: | |
| # raise Exception(f"All {retries} attempts failed.") from e | |
| # Example usage | |
| # asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000)) | |
| # async def generate_response_from_gemini(sender: str, content: str, timestamp: str, history: List[Dict[str, str]], media: Optional[Dict[str, Any]] = None, image_file_path: Optional[str] = None, doc_path: Optional[str] = None) -> str: | |
| # try: | |
| # print(f"Sender: {sender}") | |
| # print(f"Content: {content}") | |
| # print(f"Timestamp: {timestamp}") | |
| # print(f"History: {history}") | |
| # print(f"Media: {media}") | |
| # # Initialize the model | |
| # model = genai.GenerativeModel("gemini-1.5-pro-002") | |
| # # Define the chat history | |
| # chat = model.start_chat( | |
| # history=history | |
| # ) | |
| # logger.info(f"file_path: {image_file_path}") | |
| # if image_file_path: # Should be bytes or a file-like object | |
| # prompt = "Describe the following image:" | |
| # image_data = PIL.Image.open(image_file_path) | |
| # print("Sending image to the Gemini model...") | |
| # response = await chat.send_message_async(image_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # if doc_path: | |
| # doc_data = genai.upload_file(doc_path) | |
| # print("Sending document to the Gemini model...") | |
| # response = await chat.send_message_async(doc_data) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # # Send the user's message | |
| # print("Sending message to the Gemini model...") | |
| # response = await chat.send_message_async(content) | |
| # print(f"Model response: {response.text}") | |
| # return response.text | |
| # except Exception as e: | |
| # print("Error generating reply from Gemini:", e) | |
| # return "Sorry, I couldn't generate a response at this time." | |
| async def generate_response_from_chatgpt(sender: str, content: str, timestamp: str, history: str) -> str: | |
| """ | |
| Generate a reply using OpenAI's ChatGPT API. | |
| """ | |
| try: | |
| # # Initialize chat history if not provided | |
| # chat_history = chat_history or [] | |
| # # Append the current user message to the chat history | |
| # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| messages = [ | |
| {"role": "system", "content": "You're an investor, a serial founder, and you've sold many startups. You understand nothing but business."}, | |
| {"role": "system", "content": f"Message History: {history}"}, | |
| {"role": "user", "content": f"From {sender} at {timestamp}: {content}"} | |
| ] | |
| print(f"Messages: {messages}") | |
| response = await client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=messages, | |
| max_tokens=200, | |
| temperature=0.5 | |
| ) | |
| chatgpt_response = response.choices[0].message.content.strip() | |
| # Append the assistant's response to the chat history | |
| # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| return chatgpt_response | |
| except Exception as e: | |
| print("Error generating reply:", e) | |
| return "Sorry, I couldn't generate a response at this time." | |
| # async def generate_response_from_chatgpt( | |
| # sender: str, | |
| # content: str, | |
| # timestamp: str, | |
| # image: Optional[bytes] = None, | |
| # file: Optional[bytes] = None, | |
| # file_name: Optional[str] = None, | |
| # chat_history: Optional[List[Dict[str, str]]] = None, | |
| # ) -> Dict[str, Any]: | |
| # """ | |
| # Generate a reply using OpenAI's GPT-4 API, including support for images, files, and maintaining chat history. | |
| # """ | |
| # try: | |
| # # Initialize chat history if not provided | |
| # chat_history = chat_history or [] | |
| # # Append the current user message to the chat history | |
| # chat_history.append({"role": "user", "content": f"From {sender} at {timestamp}: {content}"}) | |
| # # Prepare files for the request | |
| # files = [] | |
| # if image: | |
| # files.append({"name": "image.png", "type": "image/png", "content": image}) | |
| # if file: | |
| # files.append({"name": file_name or "file.txt", "type": "application/octet-stream", "content": file}) | |
| # logger.debug(f"Chat History Before Response: {chat_history}") | |
| # # Send the request to the GPT-4 API | |
| # response = await client.chat.completions.create( | |
| # model="gpt-4-vision", # Ensure this is the correct model for multimodal support | |
| # messages=chat_history, | |
| # files=files if files else None, # Include files if present | |
| # max_tokens=200, | |
| # temperature=0.5, | |
| # ) | |
| # # Parse the assistant's response | |
| # chatgpt_response = response.choices[0].message.content.strip() | |
| # # Append the assistant's response to the chat history | |
| # chat_history.append({"role": "assistant", "content": chatgpt_response}) | |
| # logger.debug(f"Chat History After Response: {chat_history}") | |
| # # Return both the assistant's response and the updated chat history | |
| # return {"response": chatgpt_response, "chat_history": chat_history} | |
| # except Exception as e: | |
| # logger.error("Error generating reply", exc_info=True) | |
| # return {"response": "Sorry, I couldn't generate a response at this time.", "chat_history": chat_history} | |