File size: 9,293 Bytes
3566f32 5e03773 8d4ff93 5e03773 8d4ff93 5e03773 8d4ff93 3566f32 5e03773 3566f32 8d4ff93 3566f32 5e03773 8d4ff93 5776f9f 5e03773 5776f9f 5e03773 5776f9f bbce544 5e03773 3566f32 5e03773 3566f32 5e03773 8d4ff93 f1e54f5 d202667 f1e54f5 d202667 5e03773 bbce544 3566f32 bbce544 3566f32 5e03773 bbce544 8d4ff93 3566f32 5e03773 8d4ff93 5e03773 8d4ff93 3566f32 8d4ff93 3566f32 8d4ff93 3566f32 5e03773 3566f32 5e03773 3566f32 bbce544 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# routes/api/whatsapp_webhook.py
from fastapi import APIRouter, Request, HTTPException, status
from fastapi.responses import JSONResponse
import logging
import json
import os
from typing import Tuple, Optional
# Redis for idempotency
import redis
# Classifier
from components.LLMs.Classifier import ZeroShotClassifier
# User storage (as you referenced it)
from storage.users import ensure_user_from_webhook
# Handlers
from components.handlers.whatsapp_handlers import (
handle_headlines,
handle_preferences,
handle_greeting,
handle_help,
handle_unsubscribe,
handle_small_talk,
handle_chat_question,
)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
router = APIRouter()
classifier = ZeroShotClassifier()
# ---- Redis client (idempotency) ----
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
IDEMPOTENCY_TTL_SECONDS = 10 * 60 # 10 minutes
def _mark_if_new(message_id: str) -> bool:
"""
Returns True if message_id is new (we should process),
False if we've already seen it (skip).
"""
try:
return bool(
redis_client.set(
f"wa:handled:{message_id}",
"1",
nx=True, # only set if not exists
ex=IDEMPOTENCY_TTL_SECONDS, # expire automatically
)
)
except Exception as e:
# Fail open if Redis hiccups (process message)
logging.warning(f"Idempotency Redis error: {e}")
return True
def _get_incoming_message_id(payload: dict) -> Optional[str]:
"""
Returns the WhatsApp message id (wamid...) if present, else None.
Only for real user 'messages' events (ignores statuses/billing).
"""
try:
entries = payload.get("entry", [])
for entry in entries:
changes = entry.get("changes", [])
for change in changes:
if change.get("field") != "messages":
continue
value = change.get("value", {}) or {}
msgs = value.get("messages", []) or []
if not msgs:
continue
# Only look at the first message
msg = msgs[0]
return msg.get("id")
except Exception as e:
logging.debug(f"_get_incoming_message_id error: {e}")
return None
def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]:
"""
Extracts (from_number, message_text) from a WhatsApp webhook payload.
Returns (None, None) if not a user message (e.g., statuses, billing-event).
"""
try:
entries = payload.get("entry", [])
for entry in entries:
changes = entry.get("changes", [])
for change in changes:
field = change.get("field")
# Only process message-type changes
if field != "messages":
continue
value = change.get("value", {}) or {}
# Ignore updates without 'messages'
messages_list = value.get("messages", [])
if not messages_list:
# Likely a status/billing event
continue
# Only look at the first message in the list
msg = messages_list[0]
from_number = msg.get("from")
mtype = msg.get("type")
# 1) Plain text
if mtype == "text":
text_body = (msg.get("text", {}) or {}).get("body")
if from_number and text_body:
return from_number, text_body
# 2) Template reply button (older/simple schema)
if mtype == "button":
b = msg.get("button", {}) or {}
intent = b.get("payload") or b.get("text")
if from_number and intent:
return from_number, intent
# 3) Newer interactive replies (buttons or list)
if mtype == "interactive":
i = msg.get("interactive", {}) or {}
if "button_reply" in i:
intent = i["button_reply"].get("id") or i["button_reply"].get("title")
if from_number and intent:
return from_number, intent
if "list_reply" in i:
intent = i["list_reply"].get("id") or i["list_reply"].get("title")
if from_number and intent:
return from_number, intent
# No usable user message found
return None, None
except Exception as e:
# Always return a tuple on error
logging.exception(f"_extract_from_number_and_text error: {e}")
return None, None
@router.post("/message-received")
async def whatsapp_webhook_receiver(request: Request):
"""
Receives incoming messages from WhatsApp/Gupshup webhook.
- Parses JSON
- Idempotency check on wamid
- Extracts from_number/message_text
- Upserts user profile
- Classifies and routes to handlers
"""
try:
# Read + parse body
body_str = (await request.body()).decode("utf-8")
logging.info(f"Raw webhook body received: {body_str}")
try:
incoming_message = json.loads(body_str)
except json.JSONDecodeError:
logging.error("❌ Failed to decode webhook body as JSON")
return JSONResponse(status_code=400, content={"error": "Invalid JSON format"})
# Idempotency: skip duplicates early
msg_id = _get_incoming_message_id(incoming_message)
if msg_id:
is_new = _mark_if_new(msg_id)
if not is_new:
logging.info(f"Duplicate inbound message {msg_id} ignored.")
return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})
# Extract sender + message
from_number, message_text = _extract_from_number_and_text(incoming_message)
if not from_number or not message_text:
logging.info("Ignoring non-message webhook or missing sender/text.")
return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"})
logging.info(f"Message from {from_number}: {message_text}")
# Upsert user (profile + last_seen + last_msg_id)
try:
ensure_user_from_webhook(incoming_message, from_number, msg_id)
except Exception as e:
logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")
# Legacy fast-path (still supported)
normalized = message_text.lower().strip().replace("’", "'")
if normalized == "view today's headlines":
return handle_headlines(from_number)
# Intent classification
try:
label, meta = classifier.classify(message_text)
logging.info(f"Intent classified: {label} | meta={meta}")
except Exception as e:
logging.exception(f"Classifier error: {e}")
if any(k in normalized for k in ["headline", "digest", "news", "today"]):
return handle_headlines(from_number)
return handle_help(from_number)
# Routing
if label == "headlines_request":
return handle_headlines(from_number)
elif label == "preferences_update":
return handle_preferences(from_number)
elif label == "greeting":
return handle_greeting(from_number)
elif label == "help":
return handle_help(from_number)
elif label == "unsubscribe":
return handle_unsubscribe(from_number)
elif label == "small_talk":
return handle_small_talk(from_number)
elif label == "chat_question":
return handle_chat_question(from_number, message_text)
else:
return handle_help(from_number)
except Exception as e:
logging.error(f"Error processing webhook: {e}", exc_info=True)
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
@router.get("/message-received")
async def whatsapp_webhook_verify(request: Request):
"""
Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge').
"""
mode = request.query_params.get("hub.mode")
challenge = request.query_params.get("hub.challenge")
if mode == "subscribe" and challenge:
logging.info(f"Webhook verification successful. Challenge: {challenge}")
# Challenge often needs to be returned as an integer
try:
challenge_int = int(challenge)
except ValueError:
# If not an int, return as string to avoid 500s
return JSONResponse(status_code=200, content=challenge)
return JSONResponse(status_code=200, content=challenge_int)
else:
logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}")
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")
|