Spaces:
Build error
Build error
File size: 3,275 Bytes
db6bff5 bcd422d db6bff5 c3cb991 6c03001 c3cb991 bcd422d db6bff5 bcd422d c3cb991 bcd422d db6bff5 bcd422d db6bff5 bcd422d db6bff5 bcd422d db6bff5 bcd422d db6bff5 bcd422d db6bff5 bcd422d c3cb991 bcd422d c3cb991 bcd422d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
import os
import httpx
from dotenv import load_dotenv
from typing import Dict, Any
from datetime import datetime
import logging
import asyncio
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Define the WhatsApp API URL and Access Token
WHATSAPP_API_URL = os.environ.get("WHATSAPP_API_URL")
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN")
# Validate environment variables
if not WHATSAPP_API_URL or not ACCESS_TOKEN:
logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!")
# Helper function to send a reply
async def send_reply(to: str, body: str) -> Dict[str, Any]:
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}",
"Content-Type": "application/json"
}
data = {
"messaging_product": "whatsapp",
"to": to,
"type": "text",
"text": {
"body": body
}
}
async with httpx.AsyncClient() as client:
response = await client.post(WHATSAPP_API_URL, json=data, headers=headers)
if response.status_code != 200:
error_detail = response.json()
logger.error(f"Failed to send reply: {error_detail}")
raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}")
return response.json()
# Helper function to generate a reply based on message content
async def generate_reply(sender: str, content: str, timestamp: int) -> str:
try:
received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds
if "hello" in content.lower():
return f"Hi {sender}, how can I assist you today?"
elif "test" in content.lower():
return f"Hi {sender}, this is a reply to your test message."
elif received_time.hour < 12:
return f"Good morning, {sender}! How can I help you?"
else:
return f"Hello {sender}, I hope you're having a great day!"
except Exception as e:
logger.error(f"Error generating reply: {str(e)}", exc_info=True)
return f"Sorry {sender}, I couldn't process your message. Please try again."
# Process message with retry logic
async def process_message_with_retry(
sender_id: str,
content: str,
timestamp: int
) -> Dict[str, Any]:
"""Process message with retry logic"""
retries = 3
delay = 2 # Initial delay in seconds
for attempt in range(retries):
try:
generated_reply = await generate_reply(sender_id, content, timestamp)
response = await send_reply(sender_id, generated_reply)
return {"status": "success", "reply": generated_reply, "response": response}
except Exception as e:
logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True)
if attempt < retries - 1:
await asyncio.sleep(delay)
delay *= 2 # Exponential backoff
else:
raise Exception(f"All {retries} attempts failed.") from e
# Example usage
# asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000))
|