# routes/api/whatsapp_webhook.py from fastapi import APIRouter, Request, HTTPException, status from fastapi.responses import JSONResponse import logging import json import os from typing import Tuple, Optional # Redis for idempotency import redis # Classifier from components.LLMs.Classifier import ZeroShotClassifier # User storage (as you referenced it) from storage.users import ensure_user_from_webhook # Handlers from components.handlers.whatsapp_handlers import ( handle_headlines, handle_preferences, handle_greeting, handle_help, handle_unsubscribe, handle_small_talk, handle_chat_question, ) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') router = APIRouter() classifier = ZeroShotClassifier() # ---- Redis client (idempotency) ---- REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) IDEMPOTENCY_TTL_SECONDS = 10 * 60 # 10 minutes def _mark_if_new(message_id: str) -> bool: """ Returns True if message_id is new (we should process), False if we've already seen it (skip). """ try: return bool( redis_client.set( f"wa:handled:{message_id}", "1", nx=True, # only set if not exists ex=IDEMPOTENCY_TTL_SECONDS, # expire automatically ) ) except Exception as e: # Fail open if Redis hiccups (process message) logging.warning(f"Idempotency Redis error: {e}") return True def _get_incoming_message_id(payload: dict) -> Optional[str]: """ Returns the WhatsApp message id (wamid...) if present, else None. Only for real user 'messages' events (ignores statuses/billing). """ try: entries = payload.get("entry", []) for entry in entries: changes = entry.get("changes", []) for change in changes: if change.get("field") != "messages": continue value = change.get("value", {}) or {} msgs = value.get("messages", []) or [] if not msgs: continue # Only look at the first message msg = msgs[0] return msg.get("id") except Exception as e: logging.debug(f"_get_incoming_message_id error: {e}") return None def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]: """ Extracts (from_number, message_text) from a WhatsApp webhook payload. Returns (None, None) if not a user message (e.g., statuses, billing-event). """ try: entries = payload.get("entry", []) for entry in entries: changes = entry.get("changes", []) for change in changes: field = change.get("field") # Only process message-type changes if field != "messages": continue value = change.get("value", {}) or {} # Ignore updates without 'messages' messages_list = value.get("messages", []) if not messages_list: # Likely a status/billing event continue # Only look at the first message in the list msg = messages_list[0] from_number = msg.get("from") mtype = msg.get("type") # 1) Plain text if mtype == "text": text_body = (msg.get("text", {}) or {}).get("body") if from_number and text_body: return from_number, text_body # 2) Template reply button (older/simple schema) if mtype == "button": b = msg.get("button", {}) or {} intent = b.get("payload") or b.get("text") if from_number and intent: return from_number, intent # 3) Newer interactive replies (buttons or list) if mtype == "interactive": i = msg.get("interactive", {}) or {} if "button_reply" in i: intent = i["button_reply"].get("id") or i["button_reply"].get("title") if from_number and intent: return from_number, intent if "list_reply" in i: intent = i["list_reply"].get("id") or i["list_reply"].get("title") if from_number and intent: return from_number, intent # No usable user message found return None, None except Exception as e: # Always return a tuple on error logging.exception(f"_extract_from_number_and_text error: {e}") return None, None @router.post("/message-received") async def whatsapp_webhook_receiver(request: Request): """ Receives incoming messages from WhatsApp/Gupshup webhook. - Parses JSON - Idempotency check on wamid - Extracts from_number/message_text - Upserts user profile - Classifies and routes to handlers """ try: # Read + parse body body_str = (await request.body()).decode("utf-8") logging.info(f"Raw webhook body received: {body_str}") try: incoming_message = json.loads(body_str) except json.JSONDecodeError: logging.error("❌ Failed to decode webhook body as JSON") return JSONResponse(status_code=400, content={"error": "Invalid JSON format"}) # Idempotency: skip duplicates early msg_id = _get_incoming_message_id(incoming_message) if msg_id: is_new = _mark_if_new(msg_id) if not is_new: logging.info(f"Duplicate inbound message {msg_id} ignored.") return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"}) # Extract sender + message from_number, message_text = _extract_from_number_and_text(incoming_message) if not from_number or not message_text: logging.info("Ignoring non-message webhook or missing sender/text.") return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"}) logging.info(f"Message from {from_number}: {message_text}") # Upsert user (profile + last_seen + last_msg_id) try: ensure_user_from_webhook(incoming_message, from_number, msg_id) except Exception as e: logging.warning(f"ensure_user_from_webhook failed (continuing): {e}") # Legacy fast-path (still supported) normalized = message_text.lower().strip().replace("’", "'") if normalized == "view today's headlines": return handle_headlines(from_number) # Intent classification try: label, meta = classifier.classify(message_text) logging.info(f"Intent classified: {label} | meta={meta}") except Exception as e: logging.exception(f"Classifier error: {e}") if any(k in normalized for k in ["headline", "digest", "news", "today"]): return handle_headlines(from_number) return handle_help(from_number) # Routing if label == "headlines_request": return handle_headlines(from_number) elif label == "preferences_update": return handle_preferences(from_number) elif label == "greeting": return handle_greeting(from_number) elif label == "help": return handle_help(from_number) elif label == "unsubscribe": return handle_unsubscribe(from_number) elif label == "small_talk": return handle_small_talk(from_number) elif label == "chat_question": return handle_chat_question(from_number, message_text) else: return handle_help(from_number) except Exception as e: logging.error(f"Error processing webhook: {e}", exc_info=True) return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) @router.get("/message-received") async def whatsapp_webhook_verify(request: Request): """ Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge'). """ mode = request.query_params.get("hub.mode") challenge = request.query_params.get("hub.challenge") if mode == "subscribe" and challenge: logging.info(f"Webhook verification successful. Challenge: {challenge}") # Challenge often needs to be returned as an integer try: challenge_int = int(challenge) except ValueError: # If not an int, return as string to avoid 500s return JSONResponse(status_code=200, content=challenge) return JSONResponse(status_code=200, content=challenge_int) else: logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")