chat / app /services /message.py
ariansyahdedy's picture
Add reply function
bcd422d
raw
history blame
3.28 kB
import os
import httpx
from dotenv import load_dotenv
from typing import Dict, Any
from datetime import datetime
import logging
import asyncio
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Define the WhatsApp API URL and Access Token
WHATSAPP_API_URL = os.environ.get("WHATSAPP_API_URL")
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN")
# Validate environment variables
if not WHATSAPP_API_URL or not ACCESS_TOKEN:
logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!")
# Helper function to send a reply
async def send_reply(to: str, body: str) -> Dict[str, Any]:
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json"
}
data = {
"messaging_product": "whatsapp",
"to": to,
"type": "text",
"text": {
"body": body
}
}
async with httpx.AsyncClient() as client:
response = await client.post(WHATSAPP_API_URL, json=data, headers=headers)
if response.status_code != 200:
error_detail = response.json()
logger.error(f"Failed to send reply: {error_detail}")
raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}")
return response.json()
# Helper function to generate a reply based on message content
async def generate_reply(sender: str, content: str, timestamp: int) -> str:
try:
received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds
if "hello" in content.lower():
return f"Hi {sender}, how can I assist you today?"
elif "test" in content.lower():
return f"Hi {sender}, this is a reply to your test message."
elif received_time.hour < 12:
return f"Good morning, {sender}! How can I help you?"
else:
return f"Hello {sender}, I hope you're having a great day!"
except Exception as e:
logger.error(f"Error generating reply: {str(e)}", exc_info=True)
return f"Sorry {sender}, I couldn't process your message. Please try again."
# Process message with retry logic
async def process_message_with_retry(
sender_id: str,
content: str,
timestamp: int
) -> Dict[str, Any]:
"""Process message with retry logic"""
retries = 3
delay = 2 # Initial delay in seconds
for attempt in range(retries):
try:
generated_reply = await generate_reply(sender_id, content, timestamp)
response = await send_reply(sender_id, generated_reply)
return {"status": "success", "reply": generated_reply, "response": response}
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True)
if attempt < retries - 1:
await asyncio.sleep(delay)
delay *= 2 # Exponential backoff
else:
raise Exception(f"All {retries} attempts failed.") from e
# Example usage
# asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000))