fastAPIv2 / routes /api /whatsapp_webhook.py
ragV98's picture
idempotancy integration
5e03773
# routes/api/whatsapp_webhook.py
from fastapi import APIRouter, Request, HTTPException, status
from fastapi.responses import JSONResponse
import logging
import json
import os
from typing import Tuple, Optional
# Redis for idempotency
import redis
# Classifier
from components.LLMs.Classifier import ZeroShotClassifier
# User storage (as you referenced it)
from storage.users import ensure_user_from_webhook
# Handlers
from components.handlers.whatsapp_handlers import (
handle_headlines,
handle_preferences,
handle_greeting,
handle_help,
handle_unsubscribe,
handle_small_talk,
handle_chat_question,
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
router = APIRouter()
classifier = ZeroShotClassifier()
# ---- Redis client (idempotency) ----
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
IDEMPOTENCY_TTL_SECONDS = 10 * 60 # 10 minutes
def _mark_if_new(message_id: str) -> bool:
"""
Returns True if message_id is new (we should process),
False if we've already seen it (skip).
"""
try:
return bool(
redis_client.set(
f"wa:handled:{message_id}",
"1",
nx=True, # only set if not exists
ex=IDEMPOTENCY_TTL_SECONDS, # expire automatically
)
)
except Exception as e:
# Fail open if Redis hiccups (process message)
logging.warning(f"Idempotency Redis error: {e}")
return True
def _get_incoming_message_id(payload: dict) -> Optional[str]:
"""
Returns the WhatsApp message id (wamid...) if present, else None.
Only for real user 'messages' events (ignores statuses/billing).
"""
try:
entries = payload.get("entry", [])
for entry in entries:
changes = entry.get("changes", [])
for change in changes:
if change.get("field") != "messages":
continue
value = change.get("value", {}) or {}
msgs = value.get("messages", []) or []
if not msgs:
continue
# Only look at the first message
msg = msgs[0]
return msg.get("id")
except Exception as e:
logging.debug(f"_get_incoming_message_id error: {e}")
return None
def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]:
"""
Extracts (from_number, message_text) from a WhatsApp webhook payload.
Returns (None, None) if not a user message (e.g., statuses, billing-event).
"""
try:
entries = payload.get("entry", [])
for entry in entries:
changes = entry.get("changes", [])
for change in changes:
field = change.get("field")
# Only process message-type changes
if field != "messages":
continue
value = change.get("value", {}) or {}
# Ignore updates without 'messages'
messages_list = value.get("messages", [])
if not messages_list:
# Likely a status/billing event
continue
# Only look at the first message in the list
msg = messages_list[0]
from_number = msg.get("from")
mtype = msg.get("type")
# 1) Plain text
if mtype == "text":
text_body = (msg.get("text", {}) or {}).get("body")
if from_number and text_body:
return from_number, text_body
# 2) Template reply button (older/simple schema)
if mtype == "button":
b = msg.get("button", {}) or {}
intent = b.get("payload") or b.get("text")
if from_number and intent:
return from_number, intent
# 3) Newer interactive replies (buttons or list)
if mtype == "interactive":
i = msg.get("interactive", {}) or {}
if "button_reply" in i:
intent = i["button_reply"].get("id") or i["button_reply"].get("title")
if from_number and intent:
return from_number, intent
if "list_reply" in i:
intent = i["list_reply"].get("id") or i["list_reply"].get("title")
if from_number and intent:
return from_number, intent
# No usable user message found
return None, None
except Exception as e:
# Always return a tuple on error
logging.exception(f"_extract_from_number_and_text error: {e}")
return None, None
@router.post("/message-received")
async def whatsapp_webhook_receiver(request: Request):
"""
Receives incoming messages from WhatsApp/Gupshup webhook.
- Parses JSON
- Idempotency check on wamid
- Extracts from_number/message_text
- Upserts user profile
- Classifies and routes to handlers
"""
try:
# Read + parse body
body_str = (await request.body()).decode("utf-8")
logging.info(f"Raw webhook body received: {body_str}")
try:
incoming_message = json.loads(body_str)
except json.JSONDecodeError:
logging.error("❌ Failed to decode webhook body as JSON")
return JSONResponse(status_code=400, content={"error": "Invalid JSON format"})
# Idempotency: skip duplicates early
msg_id = _get_incoming_message_id(incoming_message)
if msg_id:
is_new = _mark_if_new(msg_id)
if not is_new:
logging.info(f"Duplicate inbound message {msg_id} ignored.")
return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})
# Extract sender + message
from_number, message_text = _extract_from_number_and_text(incoming_message)
if not from_number or not message_text:
logging.info("Ignoring non-message webhook or missing sender/text.")
return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"})
logging.info(f"Message from {from_number}: {message_text}")
# Upsert user (profile + last_seen + last_msg_id)
try:
ensure_user_from_webhook(incoming_message, from_number, msg_id)
except Exception as e:
logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")
# Legacy fast-path (still supported)
normalized = message_text.lower().strip().replace("’", "'")
if normalized == "view today's headlines":
return handle_headlines(from_number)
# Intent classification
try:
label, meta = classifier.classify(message_text)
logging.info(f"Intent classified: {label} | meta={meta}")
except Exception as e:
logging.exception(f"Classifier error: {e}")
if any(k in normalized for k in ["headline", "digest", "news", "today"]):
return handle_headlines(from_number)
return handle_help(from_number)
# Routing
if label == "headlines_request":
return handle_headlines(from_number)
elif label == "preferences_update":
return handle_preferences(from_number)
elif label == "greeting":
return handle_greeting(from_number)
elif label == "help":
return handle_help(from_number)
elif label == "unsubscribe":
return handle_unsubscribe(from_number)
elif label == "small_talk":
return handle_small_talk(from_number)
elif label == "chat_question":
return handle_chat_question(from_number, message_text)
else:
return handle_help(from_number)
except Exception as e:
logging.error(f"Error processing webhook: {e}", exc_info=True)
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.get("/message-received")
async def whatsapp_webhook_verify(request: Request):
"""
Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge').
"""
mode = request.query_params.get("hub.mode")
challenge = request.query_params.get("hub.challenge")
if mode == "subscribe" and challenge:
logging.info(f"Webhook verification successful. Challenge: {challenge}")
# Challenge often needs to be returned as an integer
try:
challenge_int = int(challenge)
except ValueError:
# If not an int, return as string to avoid 500s
return JSONResponse(status_code=200, content=challenge)
return JSONResponse(status_code=200, content=challenge_int)
else:
logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")