|
|
|
from fastapi import APIRouter, Request, HTTPException, status |
|
from fastapi.responses import JSONResponse |
|
import logging |
|
import json |
|
import os |
|
from typing import Tuple, Optional |
|
|
|
|
|
import redis |
|
|
|
|
|
from components.LLMs.Classifier import ZeroShotClassifier |
|
|
|
|
|
from storage.users import ensure_user_from_webhook |
|
|
|
|
|
from components.handlers.whatsapp_handlers import ( |
|
handle_headlines, |
|
handle_preferences, |
|
handle_greeting, |
|
handle_help, |
|
handle_unsubscribe, |
|
handle_small_talk, |
|
handle_chat_question, |
|
) |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
router = APIRouter() |
|
classifier = ZeroShotClassifier() |
|
|
|
|
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
IDEMPOTENCY_TTL_SECONDS = 10 * 60 |
|
|
|
|
|
def _mark_if_new(message_id: str) -> bool: |
|
""" |
|
Returns True if message_id is new (we should process), |
|
False if we've already seen it (skip). |
|
""" |
|
try: |
|
return bool( |
|
redis_client.set( |
|
f"wa:handled:{message_id}", |
|
"1", |
|
nx=True, |
|
ex=IDEMPOTENCY_TTL_SECONDS, |
|
) |
|
) |
|
except Exception as e: |
|
|
|
logging.warning(f"Idempotency Redis error: {e}") |
|
return True |
|
|
|
|
|
def _get_incoming_message_id(payload: dict) -> Optional[str]: |
|
""" |
|
Returns the WhatsApp message id (wamid...) if present, else None. |
|
Only for real user 'messages' events (ignores statuses/billing). |
|
""" |
|
try: |
|
entries = payload.get("entry", []) |
|
for entry in entries: |
|
changes = entry.get("changes", []) |
|
for change in changes: |
|
if change.get("field") != "messages": |
|
continue |
|
value = change.get("value", {}) or {} |
|
msgs = value.get("messages", []) or [] |
|
if not msgs: |
|
continue |
|
|
|
msg = msgs[0] |
|
return msg.get("id") |
|
except Exception as e: |
|
logging.debug(f"_get_incoming_message_id error: {e}") |
|
return None |
|
|
|
|
|
def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]: |
|
""" |
|
Extracts (from_number, message_text) from a WhatsApp webhook payload. |
|
Returns (None, None) if not a user message (e.g., statuses, billing-event). |
|
""" |
|
try: |
|
entries = payload.get("entry", []) |
|
for entry in entries: |
|
changes = entry.get("changes", []) |
|
for change in changes: |
|
field = change.get("field") |
|
|
|
if field != "messages": |
|
continue |
|
|
|
value = change.get("value", {}) or {} |
|
|
|
messages_list = value.get("messages", []) |
|
if not messages_list: |
|
|
|
continue |
|
|
|
|
|
msg = messages_list[0] |
|
from_number = msg.get("from") |
|
mtype = msg.get("type") |
|
|
|
|
|
if mtype == "text": |
|
text_body = (msg.get("text", {}) or {}).get("body") |
|
if from_number and text_body: |
|
return from_number, text_body |
|
|
|
|
|
if mtype == "button": |
|
b = msg.get("button", {}) or {} |
|
intent = b.get("payload") or b.get("text") |
|
if from_number and intent: |
|
return from_number, intent |
|
|
|
|
|
if mtype == "interactive": |
|
i = msg.get("interactive", {}) or {} |
|
if "button_reply" in i: |
|
intent = i["button_reply"].get("id") or i["button_reply"].get("title") |
|
if from_number and intent: |
|
return from_number, intent |
|
if "list_reply" in i: |
|
intent = i["list_reply"].get("id") or i["list_reply"].get("title") |
|
if from_number and intent: |
|
return from_number, intent |
|
|
|
|
|
return None, None |
|
|
|
except Exception as e: |
|
|
|
logging.exception(f"_extract_from_number_and_text error: {e}") |
|
return None, None |
|
|
|
|
|
@router.post("/message-received") |
|
async def whatsapp_webhook_receiver(request: Request): |
|
""" |
|
Receives incoming messages from WhatsApp/Gupshup webhook. |
|
- Parses JSON |
|
- Idempotency check on wamid |
|
- Extracts from_number/message_text |
|
- Upserts user profile |
|
- Classifies and routes to handlers |
|
""" |
|
try: |
|
|
|
body_str = (await request.body()).decode("utf-8") |
|
logging.info(f"Raw webhook body received: {body_str}") |
|
|
|
try: |
|
incoming_message = json.loads(body_str) |
|
except json.JSONDecodeError: |
|
logging.error("❌ Failed to decode webhook body as JSON") |
|
return JSONResponse(status_code=400, content={"error": "Invalid JSON format"}) |
|
|
|
|
|
msg_id = _get_incoming_message_id(incoming_message) |
|
if msg_id: |
|
is_new = _mark_if_new(msg_id) |
|
if not is_new: |
|
logging.info(f"Duplicate inbound message {msg_id} ignored.") |
|
return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"}) |
|
|
|
|
|
from_number, message_text = _extract_from_number_and_text(incoming_message) |
|
if not from_number or not message_text: |
|
logging.info("Ignoring non-message webhook or missing sender/text.") |
|
return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"}) |
|
|
|
logging.info(f"Message from {from_number}: {message_text}") |
|
|
|
|
|
try: |
|
ensure_user_from_webhook(incoming_message, from_number, msg_id) |
|
except Exception as e: |
|
logging.warning(f"ensure_user_from_webhook failed (continuing): {e}") |
|
|
|
|
|
normalized = message_text.lower().strip().replace("’", "'") |
|
if normalized == "view today's headlines": |
|
return handle_headlines(from_number) |
|
|
|
|
|
try: |
|
label, meta = classifier.classify(message_text) |
|
logging.info(f"Intent classified: {label} | meta={meta}") |
|
except Exception as e: |
|
logging.exception(f"Classifier error: {e}") |
|
if any(k in normalized for k in ["headline", "digest", "news", "today"]): |
|
return handle_headlines(from_number) |
|
return handle_help(from_number) |
|
|
|
|
|
if label == "headlines_request": |
|
return handle_headlines(from_number) |
|
elif label == "preferences_update": |
|
return handle_preferences(from_number) |
|
elif label == "greeting": |
|
return handle_greeting(from_number) |
|
elif label == "help": |
|
return handle_help(from_number) |
|
elif label == "unsubscribe": |
|
return handle_unsubscribe(from_number) |
|
elif label == "small_talk": |
|
return handle_small_talk(from_number) |
|
elif label == "chat_question": |
|
return handle_chat_question(from_number, message_text) |
|
else: |
|
return handle_help(from_number) |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing webhook: {e}", exc_info=True) |
|
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) |
|
|
|
|
|
@router.get("/message-received") |
|
async def whatsapp_webhook_verify(request: Request): |
|
""" |
|
Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge'). |
|
""" |
|
mode = request.query_params.get("hub.mode") |
|
challenge = request.query_params.get("hub.challenge") |
|
|
|
if mode == "subscribe" and challenge: |
|
logging.info(f"Webhook verification successful. Challenge: {challenge}") |
|
|
|
try: |
|
challenge_int = int(challenge) |
|
except ValueError: |
|
|
|
return JSONResponse(status_code=200, content=challenge) |
|
return JSONResponse(status_code=200, content=challenge_int) |
|
else: |
|
logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}") |
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed") |
|
|