Spaces:
Build error
Build error
import os | |
import httpx | |
from dotenv import load_dotenv | |
from typing import Dict, Any | |
from datetime import datetime | |
import logging | |
import asyncio | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Define the WhatsApp API URL and Access Token | |
WHATSAPP_API_URL = os.environ.get("WHATSAPP_API_URL") | |
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN") | |
# Validate environment variables | |
if not WHATSAPP_API_URL or not ACCESS_TOKEN: | |
logger.warning("Environment variables for WHATSAPP_API_URL or ACCESS_TOKEN are not set!") | |
# Helper function to send a reply | |
async def send_reply(to: str, body: str) -> Dict[str, Any]: | |
headers = { | |
"Authorization": f"Bearer {ACCESS_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"messaging_product": "whatsapp", | |
"to": to, | |
"type": "text", | |
"text": { | |
"body": body | |
} | |
} | |
async with httpx.AsyncClient() as client: | |
response = await client.post(WHATSAPP_API_URL, json=data, headers=headers) | |
if response.status_code != 200: | |
error_detail = response.json() | |
logger.error(f"Failed to send reply: {error_detail}") | |
raise Exception(f"Failed to send reply with status code {response.status_code}: {error_detail}") | |
return response.json() | |
# Helper function to generate a reply based on message content | |
async def generate_reply(sender: str, content: str, timestamp: int) -> str: | |
try: | |
received_time = datetime.fromtimestamp(int(timestamp) / 1000) # Assuming timestamp is in milliseconds | |
if "hello" in content.lower(): | |
return f"Hi {sender}, how can I assist you today?" | |
elif "test" in content.lower(): | |
return f"Hi {sender}, this is a reply to your test message." | |
elif received_time.hour < 12: | |
return f"Good morning, {sender}! How can I help you?" | |
else: | |
return f"Hello {sender}, I hope you're having a great day!" | |
except Exception as e: | |
logger.error(f"Error generating reply: {str(e)}", exc_info=True) | |
return f"Sorry {sender}, I couldn't process your message. Please try again." | |
# Process message with retry logic | |
async def process_message_with_retry( | |
sender_id: str, | |
content: str, | |
timestamp: int | |
) -> Dict[str, Any]: | |
"""Process message with retry logic""" | |
retries = 3 | |
delay = 2 # Initial delay in seconds | |
for attempt in range(retries): | |
try: | |
generated_reply = await generate_reply(sender_id, content, timestamp) | |
response = await send_reply(sender_id, generated_reply) | |
return {"status": "success", "reply": generated_reply, "response": response} | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed: {str(e)}", exc_info=True) | |
if attempt < retries - 1: | |
await asyncio.sleep(delay) | |
delay *= 2 # Exponential backoff | |
else: | |
raise Exception(f"All {retries} attempts failed.") from e | |
# Example usage | |
# asyncio.run(process_message_with_retry("1234567890", "hello", 1700424056000)) | |