File size: 9,293 Bytes
3566f32
 
 
 
 
5e03773
8d4ff93
 
5e03773
 
 
8d4ff93
 
 
5e03773
 
 
8d4ff93
 
 
 
 
 
 
 
 
 
3566f32
 
5e03773
3566f32
8d4ff93
3566f32
5e03773
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8d4ff93
5776f9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e03773
5776f9f
 
5e03773
5776f9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bbce544
5e03773
3566f32
 
5e03773
 
 
 
 
 
 
 
3566f32
5e03773
8d4ff93
f1e54f5
d202667
 
f1e54f5
 
 
 
d202667
5e03773
 
 
 
 
 
 
 
 
bbce544
3566f32
bbce544
 
3566f32
 
 
5e03773
 
 
 
 
 
 
bbce544
 
8d4ff93
3566f32
5e03773
8d4ff93
 
 
 
 
 
 
 
 
5e03773
8d4ff93
 
 
 
 
 
 
 
 
 
 
 
 
 
3566f32
8d4ff93
3566f32
 
 
 
 
8d4ff93
3566f32
 
5e03773
 
 
3566f32
 
 
 
 
5e03773
 
 
 
 
 
 
3566f32
 
bbce544
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# routes/api/whatsapp_webhook.py
from fastapi import APIRouter, Request, HTTPException, status
from fastapi.responses import JSONResponse
import logging
import json
import os
from typing import Tuple, Optional

# Redis for idempotency
import redis

# Classifier
from components.LLMs.Classifier import ZeroShotClassifier

# User storage (as you referenced it)
from storage.users import ensure_user_from_webhook

# Handlers
from components.handlers.whatsapp_handlers import (
    handle_headlines,
    handle_preferences,
    handle_greeting,
    handle_help,
    handle_unsubscribe,
    handle_small_talk,
    handle_chat_question,
)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

router = APIRouter()
classifier = ZeroShotClassifier()

# ---- Redis client (idempotency) ----
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
IDEMPOTENCY_TTL_SECONDS = 10 * 60  # 10 minutes


def _mark_if_new(message_id: str) -> bool:
    """
    Returns True if message_id is new (we should process),
    False if we've already seen it (skip).
    """
    try:
        return bool(
            redis_client.set(
                f"wa:handled:{message_id}",
                "1",
                nx=True,  # only set if not exists
                ex=IDEMPOTENCY_TTL_SECONDS,  # expire automatically
            )
        )
    except Exception as e:
        # Fail open if Redis hiccups (process message)
        logging.warning(f"Idempotency Redis error: {e}")
        return True


def _get_incoming_message_id(payload: dict) -> Optional[str]:
    """
    Returns the WhatsApp message id (wamid...) if present, else None.
    Only for real user 'messages' events (ignores statuses/billing).
    """
    try:
        entries = payload.get("entry", [])
        for entry in entries:
            changes = entry.get("changes", [])
            for change in changes:
                if change.get("field") != "messages":
                    continue
                value = change.get("value", {}) or {}
                msgs = value.get("messages", []) or []
                if not msgs:
                    continue
                # Only look at the first message
                msg = msgs[0]
                return msg.get("id")
    except Exception as e:
        logging.debug(f"_get_incoming_message_id error: {e}")
    return None


def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]:
    """
    Extracts (from_number, message_text) from a WhatsApp webhook payload.
    Returns (None, None) if not a user message (e.g., statuses, billing-event).
    """
    try:
        entries = payload.get("entry", [])
        for entry in entries:
            changes = entry.get("changes", [])
            for change in changes:
                field = change.get("field")
                # Only process message-type changes
                if field != "messages":
                    continue

                value = change.get("value", {}) or {}
                # Ignore updates without 'messages'
                messages_list = value.get("messages", [])
                if not messages_list:
                    # Likely a status/billing event
                    continue

                # Only look at the first message in the list
                msg = messages_list[0]
                from_number = msg.get("from")
                mtype = msg.get("type")

                # 1) Plain text
                if mtype == "text":
                    text_body = (msg.get("text", {}) or {}).get("body")
                    if from_number and text_body:
                        return from_number, text_body

                # 2) Template reply button (older/simple schema)
                if mtype == "button":
                    b = msg.get("button", {}) or {}
                    intent = b.get("payload") or b.get("text")
                    if from_number and intent:
                        return from_number, intent

                # 3) Newer interactive replies (buttons or list)
                if mtype == "interactive":
                    i = msg.get("interactive", {}) or {}
                    if "button_reply" in i:
                        intent = i["button_reply"].get("id") or i["button_reply"].get("title")
                        if from_number and intent:
                            return from_number, intent
                    if "list_reply" in i:
                        intent = i["list_reply"].get("id") or i["list_reply"].get("title")
                        if from_number and intent:
                            return from_number, intent

        # No usable user message found
        return None, None

    except Exception as e:
        # Always return a tuple on error
        logging.exception(f"_extract_from_number_and_text error: {e}")
        return None, None


@router.post("/message-received")
async def whatsapp_webhook_receiver(request: Request):
    """
    Receives incoming messages from WhatsApp/Gupshup webhook.
    - Parses JSON
    - Idempotency check on wamid
    - Extracts from_number/message_text
    - Upserts user profile
    - Classifies and routes to handlers
    """
    try:
        # Read + parse body
        body_str = (await request.body()).decode("utf-8")
        logging.info(f"Raw webhook body received: {body_str}")

        try:
            incoming_message = json.loads(body_str)
        except json.JSONDecodeError:
            logging.error("❌ Failed to decode webhook body as JSON")
            return JSONResponse(status_code=400, content={"error": "Invalid JSON format"})

        # Idempotency: skip duplicates early
        msg_id = _get_incoming_message_id(incoming_message)
        if msg_id:
            is_new = _mark_if_new(msg_id)
            if not is_new:
                logging.info(f"Duplicate inbound message {msg_id} ignored.")
                return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})

        # Extract sender + message
        from_number, message_text = _extract_from_number_and_text(incoming_message)
        if not from_number or not message_text:
            logging.info("Ignoring non-message webhook or missing sender/text.")
            return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"})

        logging.info(f"Message from {from_number}: {message_text}")

        # Upsert user (profile + last_seen + last_msg_id)
        try:
            ensure_user_from_webhook(incoming_message, from_number, msg_id)
        except Exception as e:
            logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")

        # Legacy fast-path (still supported)
        normalized = message_text.lower().strip().replace("’", "'")
        if normalized == "view today's headlines":
            return handle_headlines(from_number)

        # Intent classification
        try:
            label, meta = classifier.classify(message_text)
            logging.info(f"Intent classified: {label} | meta={meta}")
        except Exception as e:
            logging.exception(f"Classifier error: {e}")
            if any(k in normalized for k in ["headline", "digest", "news", "today"]):
                return handle_headlines(from_number)
            return handle_help(from_number)

        # Routing
        if label == "headlines_request":
            return handle_headlines(from_number)
        elif label == "preferences_update":
            return handle_preferences(from_number)
        elif label == "greeting":
            return handle_greeting(from_number)
        elif label == "help":
            return handle_help(from_number)
        elif label == "unsubscribe":
            return handle_unsubscribe(from_number)
        elif label == "small_talk":
            return handle_small_talk(from_number)
        elif label == "chat_question":
            return handle_chat_question(from_number, message_text)
        else:
            return handle_help(from_number)

    except Exception as e:
        logging.error(f"Error processing webhook: {e}", exc_info=True)
        return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})


@router.get("/message-received")
async def whatsapp_webhook_verify(request: Request):
    """
    Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge').
    """
    mode = request.query_params.get("hub.mode")
    challenge = request.query_params.get("hub.challenge")

    if mode == "subscribe" and challenge:
        logging.info(f"Webhook verification successful. Challenge: {challenge}")
        # Challenge often needs to be returned as an integer
        try:
            challenge_int = int(challenge)
        except ValueError:
            # If not an int, return as string to avoid 500s
            return JSONResponse(status_code=200, content=challenge)
        return JSONResponse(status_code=200, content=challenge_int)
    else:
        logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}")
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")