from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from fastapi.responses import Response from fastapi.exceptions import HTTPException from typing import Dict from app.services.message import generate_reply, send_reply, process_message_with_retry import logging from datetime import datetime import time # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI() # In-memory cache with timestamp cleanup class MessageCache: def __init__(self, max_age_hours: int = 24): self.messages: Dict[str, float] = {} self.max_age_seconds = max_age_hours * 3600 def add(self, message_id: str) -> None: self.cleanup() self.messages[message_id] = time.time() def exists(self, message_id: str) -> bool: self.cleanup() return message_id in self.messages def cleanup(self) -> None: current_time = time.time() self.messages = { msg_id: timestamp for msg_id, timestamp in self.messages.items() if current_time - timestamp < self.max_age_seconds } message_cache = MessageCache() @app.post("/webhook") async def webhook(request: Request): request_id = f"req_{int(time.time()*1000)}" logger.info(f"Processing webhook request {request_id}") # Parse incoming request payload = await request.json() processed_count = 0 error_count = 0 results = [] changes = payload.get("entry", [])[0].get("changes", []) for change in changes: messages = change.get("value", {}).get("messages", []) for message in messages: message_id = message.get("id") timestamp = message.get("timestamp") content = message.get("text", {}).get("body") sender_id = message.get("from") # Check for duplicate message ID # Check for duplicate message if message_cache.exists(message_id): logger.info(f"Duplicate message detected: {message_id}") continue try: # Process message with retry logic result = await process_message_with_retry( sender_id, content, timestamp ) message_cache.add(message_id) processed_count += 1 results.append(result) except Exception as e: error_count += 1 logger.error( f"Failed to process message {message_id}: {str(e)}", exc_info=True ) results.append({ "status": "error", "message_id": message_id, "error": str(e) }) # Return detailed response response_data = { "request_id": request_id, "processed": processed_count, "errors": error_count, "results": results } logger.info( f"Webhook processing completed - " f"Processed: {processed_count}, Errors: {error_count}" ) return JSONResponse( content=response_data, status_code=status.HTTP_200_OK ) @app.get("/webhook") async def verify_webhook(request: Request): mode = request.query_params.get('hub.mode') token = request.query_params.get('hub.verify_token') challenge = request.query_params.get('hub.challenge') # Replace 'your_verification_token' with the token you set in Facebook Business Manager if mode == 'subscribe' and token == 'test': # Return the challenge as plain text return Response(content=challenge, media_type="text/plain") else: raise HTTPException(status_code=403, detail="Verification failed")