ragV98 commited on
Commit
5e03773
·
1 Parent(s): 5908e3c

idempotancy integration

Browse files
Files changed (2) hide show
  1. routes/api/whatsapp_webhook.py +98 -7
  2. storage/users.py +173 -0
routes/api/whatsapp_webhook.py CHANGED
@@ -3,11 +3,18 @@ from fastapi import APIRouter, Request, HTTPException, status
3
  from fastapi.responses import JSONResponse
4
  import logging
5
  import json
 
6
  from typing import Tuple, Optional
7
 
 
 
 
8
  # Classifier
9
  from components.LLMs.Classifier import ZeroShotClassifier
10
 
 
 
 
11
  # Handlers
12
  from components.handlers.whatsapp_handlers import (
13
  handle_headlines,
@@ -20,9 +27,60 @@ from components.handlers.whatsapp_handlers import (
20
  )
21
 
22
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
 
23
  router = APIRouter()
24
  classifier = ZeroShotClassifier()
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]:
27
  """
28
  Extracts (from_number, message_text) from a WhatsApp webhook payload.
@@ -39,14 +97,10 @@ def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optiona
39
  continue
40
 
41
  value = change.get("value", {}) or {}
42
- # Ignore pure status/billing updates that don't have 'messages'
43
  messages_list = value.get("messages", [])
44
  if not messages_list:
45
- # Helpful debug so you can see why it was ignored
46
- statuses = value.get("statuses", [])
47
- if statuses:
48
- # it's a status webhook, not a user message
49
- pass
50
  continue
51
 
52
  # Only look at the first message in the list
@@ -87,9 +141,19 @@ def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optiona
87
  logging.exception(f"_extract_from_number_and_text error: {e}")
88
  return None, None
89
 
 
90
  @router.post("/message-received")
91
  async def whatsapp_webhook_receiver(request: Request):
 
 
 
 
 
 
 
 
92
  try:
 
93
  body_str = (await request.body()).decode("utf-8")
94
  logging.info(f"Raw webhook body received: {body_str}")
95
 
@@ -99,6 +163,15 @@ async def whatsapp_webhook_receiver(request: Request):
99
  logging.error("❌ Failed to decode webhook body as JSON")
100
  return JSONResponse(status_code=400, content={"error": "Invalid JSON format"})
101
 
 
 
 
 
 
 
 
 
 
102
  from_number, message_text = _extract_from_number_and_text(incoming_message)
103
  if not from_number or not message_text:
104
  logging.info("Ignoring non-message webhook or missing sender/text.")
@@ -106,10 +179,18 @@ async def whatsapp_webhook_receiver(request: Request):
106
 
107
  logging.info(f"Message from {from_number}: {message_text}")
108
 
 
 
 
 
 
 
 
109
  normalized = message_text.lower().strip().replace("’", "'")
110
  if normalized == "view today's headlines":
111
  return handle_headlines(from_number)
112
 
 
113
  try:
114
  label, meta = classifier.classify(message_text)
115
  logging.info(f"Intent classified: {label} | meta={meta}")
@@ -119,6 +200,7 @@ async def whatsapp_webhook_receiver(request: Request):
119
  return handle_headlines(from_number)
120
  return handle_help(from_number)
121
 
 
122
  if label == "headlines_request":
123
  return handle_headlines(from_number)
124
  elif label == "preferences_update":
@@ -143,12 +225,21 @@ async def whatsapp_webhook_receiver(request: Request):
143
 
144
  @router.get("/message-received")
145
  async def whatsapp_webhook_verify(request: Request):
 
 
 
146
  mode = request.query_params.get("hub.mode")
147
  challenge = request.query_params.get("hub.challenge")
148
 
149
  if mode == "subscribe" and challenge:
150
  logging.info(f"Webhook verification successful. Challenge: {challenge}")
151
- return JSONResponse(status_code=200, content=int(challenge))
 
 
 
 
 
 
152
  else:
153
  logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}")
154
  raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")
 
3
  from fastapi.responses import JSONResponse
4
  import logging
5
  import json
6
+ import os
7
  from typing import Tuple, Optional
8
 
9
+ # Redis for idempotency
10
+ import redis
11
+
12
  # Classifier
13
  from components.LLMs.Classifier import ZeroShotClassifier
14
 
15
+ # User storage (as you referenced it)
16
+ from storage.users import ensure_user_from_webhook
17
+
18
  # Handlers
19
  from components.handlers.whatsapp_handlers import (
20
  handle_headlines,
 
27
  )
28
 
29
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
30
+
31
  router = APIRouter()
32
  classifier = ZeroShotClassifier()
33
 
34
+ # ---- Redis client (idempotency) ----
35
+ REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
36
+ redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
37
+ IDEMPOTENCY_TTL_SECONDS = 10 * 60 # 10 minutes
38
+
39
+
40
+ def _mark_if_new(message_id: str) -> bool:
41
+ """
42
+ Returns True if message_id is new (we should process),
43
+ False if we've already seen it (skip).
44
+ """
45
+ try:
46
+ return bool(
47
+ redis_client.set(
48
+ f"wa:handled:{message_id}",
49
+ "1",
50
+ nx=True, # only set if not exists
51
+ ex=IDEMPOTENCY_TTL_SECONDS, # expire automatically
52
+ )
53
+ )
54
+ except Exception as e:
55
+ # Fail open if Redis hiccups (process message)
56
+ logging.warning(f"Idempotency Redis error: {e}")
57
+ return True
58
+
59
+
60
+ def _get_incoming_message_id(payload: dict) -> Optional[str]:
61
+ """
62
+ Returns the WhatsApp message id (wamid...) if present, else None.
63
+ Only for real user 'messages' events (ignores statuses/billing).
64
+ """
65
+ try:
66
+ entries = payload.get("entry", [])
67
+ for entry in entries:
68
+ changes = entry.get("changes", [])
69
+ for change in changes:
70
+ if change.get("field") != "messages":
71
+ continue
72
+ value = change.get("value", {}) or {}
73
+ msgs = value.get("messages", []) or []
74
+ if not msgs:
75
+ continue
76
+ # Only look at the first message
77
+ msg = msgs[0]
78
+ return msg.get("id")
79
+ except Exception as e:
80
+ logging.debug(f"_get_incoming_message_id error: {e}")
81
+ return None
82
+
83
+
84
  def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optional[str]]:
85
  """
86
  Extracts (from_number, message_text) from a WhatsApp webhook payload.
 
97
  continue
98
 
99
  value = change.get("value", {}) or {}
100
+ # Ignore updates without 'messages'
101
  messages_list = value.get("messages", [])
102
  if not messages_list:
103
+ # Likely a status/billing event
 
 
 
 
104
  continue
105
 
106
  # Only look at the first message in the list
 
141
  logging.exception(f"_extract_from_number_and_text error: {e}")
142
  return None, None
143
 
144
+
145
  @router.post("/message-received")
146
  async def whatsapp_webhook_receiver(request: Request):
147
+ """
148
+ Receives incoming messages from WhatsApp/Gupshup webhook.
149
+ - Parses JSON
150
+ - Idempotency check on wamid
151
+ - Extracts from_number/message_text
152
+ - Upserts user profile
153
+ - Classifies and routes to handlers
154
+ """
155
  try:
156
+ # Read + parse body
157
  body_str = (await request.body()).decode("utf-8")
158
  logging.info(f"Raw webhook body received: {body_str}")
159
 
 
163
  logging.error("❌ Failed to decode webhook body as JSON")
164
  return JSONResponse(status_code=400, content={"error": "Invalid JSON format"})
165
 
166
+ # Idempotency: skip duplicates early
167
+ msg_id = _get_incoming_message_id(incoming_message)
168
+ if msg_id:
169
+ is_new = _mark_if_new(msg_id)
170
+ if not is_new:
171
+ logging.info(f"Duplicate inbound message {msg_id} ignored.")
172
+ return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})
173
+
174
+ # Extract sender + message
175
  from_number, message_text = _extract_from_number_and_text(incoming_message)
176
  if not from_number or not message_text:
177
  logging.info("Ignoring non-message webhook or missing sender/text.")
 
179
 
180
  logging.info(f"Message from {from_number}: {message_text}")
181
 
182
+ # Upsert user (profile + last_seen + last_msg_id)
183
+ try:
184
+ ensure_user_from_webhook(incoming_message, from_number, msg_id)
185
+ except Exception as e:
186
+ logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")
187
+
188
+ # Legacy fast-path (still supported)
189
  normalized = message_text.lower().strip().replace("’", "'")
190
  if normalized == "view today's headlines":
191
  return handle_headlines(from_number)
192
 
193
+ # Intent classification
194
  try:
195
  label, meta = classifier.classify(message_text)
196
  logging.info(f"Intent classified: {label} | meta={meta}")
 
200
  return handle_headlines(from_number)
201
  return handle_help(from_number)
202
 
203
+ # Routing
204
  if label == "headlines_request":
205
  return handle_headlines(from_number)
206
  elif label == "preferences_update":
 
225
 
226
  @router.get("/message-received")
227
  async def whatsapp_webhook_verify(request: Request):
228
+ """
229
+ Webhook verification endpoint (GET with 'hub.mode' and 'hub.challenge').
230
+ """
231
  mode = request.query_params.get("hub.mode")
232
  challenge = request.query_params.get("hub.challenge")
233
 
234
  if mode == "subscribe" and challenge:
235
  logging.info(f"Webhook verification successful. Challenge: {challenge}")
236
+ # Challenge often needs to be returned as an integer
237
+ try:
238
+ challenge_int = int(challenge)
239
+ except ValueError:
240
+ # If not an int, return as string to avoid 500s
241
+ return JSONResponse(status_code=200, content=challenge)
242
+ return JSONResponse(status_code=200, content=challenge_int)
243
  else:
244
  logging.warning(f"Webhook verification failed. Mode: {mode}, Challenge: {challenge}")
245
  raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Verification failed")
storage/users.py ADDED
@@ -0,0 +1,173 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # components/store/users.py
2
+ import os
3
+ import time
4
+ import json
5
+ import logging
6
+ from typing import Any, Dict, List, Optional
7
+
8
+ import redis
9
+
10
+ # Reuse your Upstash Redis
11
+ REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
12
+ redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
13
+
14
+ # Key helpers
15
+ def _user_key(wa_id: str) -> str:
16
+ return f"user:{wa_id}"
17
+
18
+ def _prefs_key(wa_id: str) -> str:
19
+ return f"user:{wa_id}:prefs"
20
+
21
+ # ------------- Core CRUD -------------
22
+
23
+ def get_user(wa_id: str) -> Optional[Dict[str, Any]]:
24
+ try:
25
+ data = redis_client.hgetall(_user_key(wa_id))
26
+ if not data:
27
+ return None
28
+ # Normalize types
29
+ if "created_at" in data:
30
+ data["created_at"] = float(data["created_at"])
31
+ if "updated_at" in data:
32
+ data["updated_at"] = float(data["updated_at"])
33
+ if "last_seen" in data:
34
+ data["last_seen"] = float(data["last_seen"])
35
+ if "last_msg_ts" in data:
36
+ data["last_msg_ts"] = float(data["last_msg_ts"])
37
+ # Preferences
38
+ prefs = redis_client.get(_prefs_key(wa_id))
39
+ data["preferences"] = json.loads(prefs) if prefs else {"topics": [], "regions": [], "time": None}
40
+ return data
41
+ except Exception as e:
42
+ logging.exception(f"get_user error for {wa_id}: {e}")
43
+ return None
44
+
45
+
46
+ def upsert_user(
47
+ wa_id: str,
48
+ display_name: Optional[str] = None,
49
+ phone_e164: Optional[str] = None,
50
+ source: str = "whatsapp",
51
+ extra: Optional[Dict[str, Any]] = None,
52
+ ) -> Dict[str, Any]:
53
+ """
54
+ Create or update a user profile. Non-destructive: only updates provided fields.
55
+ """
56
+ now = time.time()
57
+ key = _user_key(wa_id)
58
+ # pipe for atomic-ish writes
59
+ p = redis_client.pipeline()
60
+ try:
61
+ exists = redis_client.exists(key)
62
+ fields = {
63
+ "wa_id": wa_id,
64
+ "source": source,
65
+ "updated_at": now,
66
+ }
67
+ if display_name is not None:
68
+ fields["display_name"] = display_name
69
+ if phone_e164 is not None:
70
+ fields["phone_e164"] = phone_e164
71
+ if extra:
72
+ # store a small JSON for misc signals (e.g., locale)
73
+ fields["extra_json"] = json.dumps(extra)[:4096]
74
+
75
+ if not exists:
76
+ fields["created_at"] = now
77
+ fields.setdefault("consent", "unknown") # 'opt_in'|'opt_out'|'unknown'
78
+ p.hset(key, mapping={k: str(v) for k, v in fields.items()})
79
+ p.execute()
80
+ user = get_user(wa_id) or {}
81
+ return user
82
+ except Exception as e:
83
+ logging.exception(f"upsert_user error for {wa_id}: {e}")
84
+ return {"wa_id": wa_id, "error": str(e)}
85
+
86
+
87
+ def set_last_seen(wa_id: str, ts: Optional[float] = None) -> None:
88
+ try:
89
+ redis_client.hset(_user_key(wa_id), mapping={
90
+ "last_seen": str(ts if ts is not None else time.time())
91
+ })
92
+ except Exception as e:
93
+ logging.exception(f"set_last_seen error for {wa_id}: {e}")
94
+
95
+
96
+ def record_inbound_message(wa_id: str, message_id: str, ts: Optional[float] = None) -> None:
97
+ try:
98
+ redis_client.hset(_user_key(wa_id), mapping={
99
+ "last_msg_id": message_id,
100
+ "last_msg_ts": str(ts if ts is not None else time.time())
101
+ })
102
+ except Exception as e:
103
+ logging.exception(f"record_inbound_message error for {wa_id}: {e}")
104
+
105
+
106
+ def set_preferences(wa_id: str, topics: Optional[List[str]] = None,
107
+ regions: Optional[List[str]] = None, time_pref: Optional[str] = None) -> Dict[str, Any]:
108
+ """
109
+ Persist user preferences as a small JSON blob.
110
+ - topics: e.g., ["india","finance"]
111
+ - regions: e.g., ["IN","EU"]
112
+ - time_pref: e.g., "09:00"
113
+ """
114
+ try:
115
+ current = get_user(wa_id) or {}
116
+ prefs = current.get("preferences", {}) or {}
117
+ if topics is not None:
118
+ prefs["topics"] = [t.strip().lower() for t in topics if t.strip()]
119
+ if regions is not None:
120
+ prefs["regions"] = [r.strip().upper() for r in regions if r.strip()]
121
+ if time_pref is not None:
122
+ prefs["time"] = time_pref
123
+ redis_client.set(_prefs_key(wa_id), json.dumps(prefs))
124
+ # Touch updated_at
125
+ redis_client.hset(_user_key(wa_id), mapping={"updated_at": str(time.time())})
126
+ current["preferences"] = prefs
127
+ return current
128
+ except Exception as e:
129
+ logging.exception(f"set_preferences error for {wa_id}: {e}")
130
+ return {"wa_id": wa_id, "error": str(e)}
131
+
132
+ # ------------- Webhook helpers -------------
133
+
134
+ def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]:
135
+ """
136
+ Try multiple shapes:
137
+ value.contacts[0].profile.name
138
+ """
139
+ try:
140
+ for entry in payload.get("entry", []):
141
+ for change in entry.get("changes", []):
142
+ if change.get("field") != "messages":
143
+ continue
144
+ val = change.get("value", {}) or {}
145
+ contacts = val.get("contacts", []) or []
146
+ if contacts:
147
+ prof = contacts[0].get("profile", {}) or {}
148
+ name = prof.get("name")
149
+ if name:
150
+ return name
151
+ except Exception:
152
+ pass
153
+ return None
154
+
155
+
156
+ def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_id: Optional[str]) -> Dict[str, Any]:
157
+ """
158
+ Ensure the user exists/updated using webhook payload signals.
159
+ - from_number: WhatsApp 'from' (wa_id / E.164)
160
+ - message_id: wamid... (for last message stamping)
161
+ """
162
+ display_name = _extract_contact_name(payload)
163
+ user = upsert_user(
164
+ wa_id=from_number,
165
+ display_name=display_name,
166
+ phone_e164=from_number,
167
+ source="whatsapp",
168
+ extra=None,
169
+ )
170
+ set_last_seen(from_number)
171
+ if message_id:
172
+ record_inbound_message(from_number, message_id)
173
+ return user