ragV98 commited on
Commit
7a411f2
·
1 Parent(s): a2c9de2

onboarding integration

Browse files
components/gateways/headlines_to_wa.py CHANGED
@@ -4,7 +4,7 @@ import logging
4
  import os
5
  import re
6
  import time
7
- from typing import Dict, Optional
8
 
9
  import redis
10
  import requests
@@ -233,8 +233,8 @@ def _post_to_gupshup(payload: Dict[str, object], action: str) -> Dict[str, objec
233
  # -----------------------------
234
  # Business logic
235
  # -----------------------------
236
- def fetch_cached_headlines() -> str:
237
- """Fetch and format the daily headlines from Redis into a WhatsApp-friendly message."""
238
  try:
239
  raw = redis_client.get("daily_news_feed_cache")
240
  if not raw:
@@ -252,7 +252,27 @@ def fetch_cached_headlines() -> str:
252
  message_parts.append(f"Nuse Daily - {formatted_date}")
253
  message_parts.append("")
254
 
255
- for topic_key, display_info in TOPIC_DISPLAY_MAP.items():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  stories_for_topic = data.get(topic_key)
257
  if not stories_for_topic:
258
  continue
@@ -285,28 +305,12 @@ def send_to_whatsapp(message_text: str, destination_number: str) -> Dict[str, ob
285
  return _post_to_gupshup(payload, "standard text WhatsApp message")
286
 
287
 
288
- def send_seen_receipt(
289
- destination_number: str,
290
- message_id: Optional[str],
291
- provider: str = "gupshup",
292
- ) -> Dict[str, object]:
293
- """
294
- Mark an inbound message as read.
295
-
296
- provider:
297
- - "meta_cloud": message originated via Meta WhatsApp Cloud (your Cloud webhook & phone number)
298
- - "gupshup": message originated via Gupshup (default). In this case we SKIP the Cloud call.
299
-
300
- Returns structured result with 'status' in {"success","failed","skipped"}.
301
- """
302
  if not message_id:
303
  logging.debug("Skipping seen receipt for %s due to missing message id", destination_number)
304
  return {"status": "skipped", "reason": "missing_message_id"}
305
 
306
- if (provider or "").lower() != "meta_cloud":
307
- logging.debug("Skipping seen: provider is %s (not meta_cloud)", provider)
308
- return {"status": "skipped", "reason": "non_meta_origin"}
309
-
310
  logging.info(
311
  "Seen receipt intent (Meta Cloud): msg_id=%s | phone_id=%s | token_prefix=%s",
312
  message_id,
@@ -329,6 +333,7 @@ def send_seen_receipt(
329
  "messaging_product": "whatsapp",
330
  "status": "read",
331
  "message_id": message_id,
 
332
  }
333
 
334
  try:
 
4
  import os
5
  import re
6
  import time
7
+ from typing import Dict, List, Optional
8
 
9
  import redis
10
  import requests
 
233
  # -----------------------------
234
  # Business logic
235
  # -----------------------------
236
+ def fetch_cached_headlines(selected_topics: Optional[List[str]] = None) -> str:
237
+ """Fetch and format the daily headlines into a WhatsApp-friendly message."""
238
  try:
239
  raw = redis_client.get("daily_news_feed_cache")
240
  if not raw:
 
252
  message_parts.append(f"Nuse Daily - {formatted_date}")
253
  message_parts.append("")
254
 
255
+ normalized_topics: List[str] = []
256
+ if selected_topics:
257
+ seen = set()
258
+ for topic in selected_topics:
259
+ slug = (topic or "").strip().lower()
260
+ if slug in TOPIC_DISPLAY_MAP and slug not in seen:
261
+ normalized_topics.append(slug)
262
+ seen.add(slug)
263
+
264
+ topics_to_render = normalized_topics or list(TOPIC_DISPLAY_MAP.keys())
265
+
266
+ if selected_topics and not normalized_topics:
267
+ message_parts.append(
268
+ "(I couldn’t match those preferences yet, so here’s the full digest.)"
269
+ )
270
+ message_parts.append("")
271
+
272
+ for topic_key in topics_to_render:
273
+ display_info = TOPIC_DISPLAY_MAP.get(topic_key)
274
+ if not display_info:
275
+ continue
276
  stories_for_topic = data.get(topic_key)
277
  if not stories_for_topic:
278
  continue
 
305
  return _post_to_gupshup(payload, "standard text WhatsApp message")
306
 
307
 
308
+ def send_seen_receipt(destination_number: str, message_id: Optional[str]) -> Dict[str, object]:
309
+ """Mark an inbound message as read via the WhatsApp Cloud API."""
 
 
 
 
 
 
 
 
 
 
 
 
310
  if not message_id:
311
  logging.debug("Skipping seen receipt for %s due to missing message id", destination_number)
312
  return {"status": "skipped", "reason": "missing_message_id"}
313
 
 
 
 
 
314
  logging.info(
315
  "Seen receipt intent (Meta Cloud): msg_id=%s | phone_id=%s | token_prefix=%s",
316
  message_id,
 
333
  "messaging_product": "whatsapp",
334
  "status": "read",
335
  "message_id": message_id,
336
+ "typing_indicator": {"type": "text"},
337
  }
338
 
339
  try:
components/handlers/wa/__init__.py CHANGED
@@ -7,6 +7,11 @@ from .shortcuts import (
7
  handle_small_talk,
8
  handle_unsubscribe,
9
  )
 
 
 
 
 
10
  from .chat import handle_chat_question
11
  from .research import handle_general_research
12
 
@@ -20,4 +25,7 @@ __all__ = [
20
  "handle_small_talk",
21
  "handle_chat_question",
22
  "handle_general_research",
 
 
 
23
  ]
 
7
  handle_small_talk,
8
  handle_unsubscribe,
9
  )
10
+ from .onboarding import (
11
+ initiate_onboarding,
12
+ handle_onboarding_preferences_text,
13
+ handle_onboarding_preferences_flow_submission,
14
+ )
15
  from .chat import handle_chat_question
16
  from .research import handle_general_research
17
 
 
25
  "handle_small_talk",
26
  "handle_chat_question",
27
  "handle_general_research",
28
+ "initiate_onboarding",
29
+ "handle_onboarding_preferences_text",
30
+ "handle_onboarding_preferences_flow_submission",
31
  ]
components/handlers/wa/onboarding.py ADDED
@@ -0,0 +1,258 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ import re
4
+ from typing import List, Optional, Dict, Any
5
+
6
+ import requests
7
+ from fastapi.responses import JSONResponse
8
+
9
+ from components.gateways.headlines_to_wa import TOPIC_DISPLAY_MAP, fetch_cached_headlines
10
+ from storage.users import set_onboarding_stage, set_preferences
11
+ from .common import safe_send
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+ # ----------------------------------
16
+ # Configuration for WhatsApp Flows
17
+ # ----------------------------------
18
+ META_GRAPH_VERSION = os.environ.get("META_GRAPH_VERSION", "v23.0").strip()
19
+ META_PHONE_NUMBER_ID = (os.environ.get("WHATSAPP_CLOUD_PHONE_NUMBER_ID") or os.environ.get("META_PHONE_NUMBER_ID") or "").strip()
20
+ META_ACCESS_TOKEN = (os.environ.get("WHATSAPP_CLOUD_TOKEN") or os.environ.get("META_ACCESS_TOKEN") or "").strip()
21
+ TOPICS_FLOW_ID = (os.environ.get("TOPICS_FLOW_ID") or "").strip()
22
+ TOPICS_FLOW_MODE = (os.environ.get("TOPICS_FLOW_MODE") or "").strip() # e.g., "draft" while testing
23
+
24
+ # ----------------------------------
25
+ # Topics config
26
+ # ----------------------------------
27
+ _TOPICS_ORDER = ["india", "world", "tech", "finance", "sports"]
28
+ _TOPIC_SYNONYMS = {
29
+ "india": ["india"],
30
+ "world": ["world", "global"],
31
+ "tech": ["tech", "technology"],
32
+ "finance": [
33
+ "finance",
34
+ "business",
35
+ "markets",
36
+ "business and markets",
37
+ "business markets",
38
+ "business & markets",
39
+ "business&markets",
40
+ ],
41
+ "sports": ["sports", "sport"],
42
+ }
43
+
44
+ # ----------------------------------
45
+ # Copy
46
+ # ----------------------------------
47
+ _WELCOME_MESSAGE_GENERIC = "Hi there! I’m Nuse — your personal guide to the news you care about."
48
+ _PREFERENCES_PROMPT = (
49
+ "To get you started, pick the topics you’d like to follow:\n"
50
+ "• India\n• World\n• Technology\n• Business & Markets\n• Sports\n\n"
51
+ "Reply with one or more (comma or space separated)."
52
+ )
53
+ _INVALID_TOPICS_MESSAGE = (
54
+ "I didn’t recognise those topics. Please choose from India, World, Technology, Business & Markets, Sports."
55
+ )
56
+
57
+
58
+ # ----------------------------------
59
+ # Name retrieval (best-effort)
60
+ # ----------------------------------
61
+ def _get_display_name(wa_id: str) -> Optional[str]:
62
+ """
63
+ Try to fetch a friendly display name for the user.
64
+ We attempt common helper functions if your storage layer exposes them.
65
+ Return None if unavailable.
66
+ """
67
+ try:
68
+ # Prefer a dedicated accessor if your storage layer provides it
69
+ from storage.users import get_profile # type: ignore
70
+ profile = get_profile(wa_id) or {}
71
+ name = (profile.get("name") or profile.get("first_name") or profile.get("display_name") or "").strip()
72
+ return name or None
73
+ except Exception:
74
+ pass
75
+
76
+ try:
77
+ # Some projects expose a direct helper
78
+ from storage.users import get_name # type: ignore
79
+ name = (get_name(wa_id) or "").strip()
80
+ return name or None
81
+ except Exception:
82
+ pass
83
+
84
+ # Nothing found
85
+ return None
86
+
87
+
88
+ # ----------------------------------
89
+ # WhatsApp Flow sending (Checkbox Group)
90
+ # ----------------------------------
91
+ def _flows_available() -> bool:
92
+ return bool(META_PHONE_NUMBER_ID and META_ACCESS_TOKEN and TOPICS_FLOW_ID)
93
+
94
+
95
+ def _send_topics_flow(wa_id: str) -> Dict[str, Any]:
96
+ """
97
+ Send a WhatsApp Flow (interactive: flow) that contains a Checkbox Group
98
+ for multi-select topics. Returns a dict with status: success/failed.
99
+ """
100
+ if not _flows_available():
101
+ return {"status": "failed", "reason": "flows_unavailable"}
102
+
103
+ url = f"https://graph.facebook.com/{META_GRAPH_VERSION}/{META_PHONE_NUMBER_ID}/messages"
104
+ headers = {
105
+ "Authorization": f"Bearer {META_ACCESS_TOKEN}",
106
+ "Content-Type": "application/json",
107
+ }
108
+
109
+ # NOTE:
110
+ # - The Flow you create in WhatsApp Manager should have a Checkbox Group with options
111
+ # whose IDs match our topic slugs (india, world, tech, finance, sports).
112
+ # - The "parameters" below assume a basic Flow. You may need to align keys (flow_message_version, etc.)
113
+ # with your Flow definition and any data_exchange/init endpoints you configure.
114
+ interactive = {
115
+ "type": "flow",
116
+ "header": {"type": "text", "text": "Pick your news topics"},
117
+ "body": {"text": "Choose one or more topics and submit."},
118
+ "action": {
119
+ "name": "flow",
120
+ "parameters": {
121
+ "flow_id": TOPICS_FLOW_ID,
122
+ # In testing, many teams use "draft" mode; remove/omit for production
123
+ **({"mode": TOPICS_FLOW_MODE} if TOPICS_FLOW_MODE else {}),
124
+ # If you use data_exchange/INIT, you can add tokens or context here as needed.
125
+ "flow_message_version": "3"
126
+ },
127
+ },
128
+ }
129
+
130
+ payload = {
131
+ "messaging_product": "whatsapp",
132
+ "recipient_type": "individual",
133
+ "to": wa_id,
134
+ "type": "interactive",
135
+ "interactive": interactive,
136
+ }
137
+
138
+ try:
139
+ resp = requests.post(url, json=payload, headers=headers, timeout=10)
140
+ if resp.status_code >= 400:
141
+ logger.error("Flow send failed: %s | %s", resp.status_code, resp.text)
142
+ return {"status": "failed", "code": resp.status_code, "error": resp.text}
143
+ return {"status": "success", "details": resp.json()}
144
+ except requests.RequestException as e:
145
+ logger.error("Flow send error: %s", e)
146
+ return {"status": "failed", "error": str(e)}
147
+
148
+
149
+ # ----------------------------------
150
+ # Free-text parsing fallback
151
+ # ----------------------------------
152
+ def _normalise_preferences(message_text: str) -> List[str]:
153
+ cleaned = (message_text or "").lower().replace("&", " and ")
154
+ cleaned = re.sub(r"[^a-z\s,]", " ", cleaned)
155
+ cleaned = re.sub(r"\s+", " ", cleaned).strip()
156
+ if not cleaned:
157
+ return []
158
+
159
+ if " all " in f" {cleaned} ":
160
+ return list(_TOPICS_ORDER)
161
+
162
+ padded = f" {cleaned} "
163
+ selections: List[str] = []
164
+ for slug in _TOPICS_ORDER:
165
+ if slug in selections:
166
+ continue
167
+ for term in _TOPIC_SYNONYMS.get(slug, []):
168
+ term_norm = re.sub(r"\s+", " ", term.replace("&", " and ").lower()).strip()
169
+ if not term_norm:
170
+ continue
171
+ if f" {term_norm} " in padded:
172
+ selections.append(slug)
173
+ break
174
+ return selections
175
+
176
+
177
+ # ----------------------------------
178
+ # Public API
179
+ # ----------------------------------
180
+ def initiate_onboarding(wa_id: str) -> JSONResponse:
181
+ # Personalized welcome
182
+ name = _get_display_name(wa_id)
183
+ if name:
184
+ welcome = f"Hi {name}! I’m Nuse — your personal guide to the news you care about."
185
+ else:
186
+ welcome = _WELCOME_MESSAGE_GENERIC
187
+
188
+ safe_send(welcome, wa_id)
189
+
190
+ # Try sending the Flow (multi-select). If unavailable, fall back to text prompt.
191
+ flow_result = _send_topics_flow(wa_id)
192
+ if flow_result.get("status") == "success":
193
+ set_onboarding_stage(wa_id, "awaiting_preferences_flow")
194
+ return JSONResponse(status_code=200, content={"status": "onboarding_started", "mode": "flow"})
195
+ else:
196
+ logger.info("Flows unavailable or send failed (%s); falling back to text prompt.", flow_result.get("reason") or flow_result.get("error"))
197
+ safe_send(_PREFERENCES_PROMPT, wa_id)
198
+ set_onboarding_stage(wa_id, "awaiting_preferences_text")
199
+ return JSONResponse(status_code=200, content={"status": "onboarding_started", "mode": "text"})
200
+
201
+
202
+ def handle_onboarding_preferences_text(wa_id: str, message_text: str) -> JSONResponse:
203
+ """
204
+ Fallback path: user replied with free text listing topics.
205
+ """
206
+ set_onboarding_stage(wa_id, "awaiting_preferences_text")
207
+ topics = _normalise_preferences(message_text)
208
+ if not topics:
209
+ safe_send(_INVALID_TOPICS_MESSAGE, wa_id)
210
+ safe_send(_PREFERENCES_PROMPT, wa_id)
211
+ return JSONResponse(status_code=200, content={"status": "awaiting_preferences_text"})
212
+
213
+ set_preferences(wa_id, topics=topics)
214
+ set_onboarding_stage(wa_id, "onboarded")
215
+
216
+ display_labels = [TOPIC_DISPLAY_MAP[slug]["name"] for slug in _TOPICS_ORDER if slug in topics]
217
+ confirmation = "Great! I’ll keep you posted on " + ", ".join(display_labels) + "."
218
+ safe_send(confirmation, wa_id)
219
+
220
+ # Your fetch_cached_headlines function in your codebase seems to support filtering
221
+ digest = fetch_cached_headlines(selected_topics=topics)
222
+ safe_send(digest, wa_id)
223
+
224
+ return JSONResponse(status_code=200, content={"status": "preferences_captured", "topics": topics})
225
+
226
+
227
+ def handle_onboarding_preferences_flow_submission(wa_id: str, selected_topic_ids: List[str]) -> JSONResponse:
228
+ """
229
+ Call this from your webhook handler when you receive the Flow submission payload.
230
+ `selected_topic_ids` should contain the IDs of the checked boxes in your Flow.
231
+ Make sure those IDs match our topic slugs: india, world, tech, finance, sports.
232
+ """
233
+ # Normalize & validate against the allow-list
234
+ topics = [t for t in _TOPICS_ORDER if t in set(selected_topic_ids)]
235
+ if not topics:
236
+ # If user submitted nothing, nudge with the flow again
237
+ safe_send("Looks like you didn’t select any topics. Please choose at least one.", wa_id)
238
+ res = _send_topics_flow(wa_id)
239
+ if res.get("status") != "success":
240
+ # Fall back to text prompt if flow fails here
241
+ safe_send(_PREFERENCES_PROMPT, wa_id)
242
+ set_onboarding_stage(wa_id, "awaiting_preferences_text")
243
+ else:
244
+ set_onboarding_stage(wa_id, "awaiting_preferences_flow")
245
+ return JSONResponse(status_code=200, content={"status": "awaiting_preferences_flow"})
246
+
247
+ # Persist and confirm
248
+ set_preferences(wa_id, topics=topics)
249
+ set_onboarding_stage(wa_id, "onboarded")
250
+
251
+ display_labels = [TOPIC_DISPLAY_MAP[slug]["name"] for slug in _TOPICS_ORDER if slug in topics]
252
+ confirmation = "Great! I’ll keep you posted on " + ", ".join(display_labels) + "."
253
+ safe_send(confirmation, wa_id)
254
+
255
+ digest = fetch_cached_headlines(selected_topics=topics)
256
+ safe_send(digest, wa_id)
257
+
258
+ return JSONResponse(status_code=200, content={"status": "preferences_captured", "topics": topics})
components/handlers/wa/shortcuts.py CHANGED
@@ -2,6 +2,7 @@ import logging
2
  from fastapi.responses import JSONResponse
3
 
4
  from components.gateways.headlines_to_wa import fetch_cached_headlines
 
5
 
6
  from .common import safe_send
7
 
@@ -10,7 +11,11 @@ logger = logging.getLogger(__name__)
10
 
11
 
12
  def handle_headlines(from_number: str) -> JSONResponse:
13
- full_message_text = fetch_cached_headlines()
 
 
 
 
14
 
15
  if full_message_text.startswith("❌") or full_message_text.startswith("⚠️"):
16
  logger.error("Failed to fetch digest for %s: %s", from_number, full_message_text)
@@ -38,8 +43,8 @@ def handle_headlines(from_number: str) -> JSONResponse:
38
  def handle_preferences(from_number: str) -> JSONResponse:
39
  msg = (
40
  "Let’s tune your feed. Reply with topics you like:\n"
41
- "• worldindiafinancesportsentertainment\n\n"
42
- "You can send multiple, e.g.: india, finance"
43
  )
44
  safe_send(msg, to=from_number)
45
  return JSONResponse(status_code=200, content={"status": "success", "message": "Preferences prompt sent"})
 
2
  from fastapi.responses import JSONResponse
3
 
4
  from components.gateways.headlines_to_wa import fetch_cached_headlines
5
+ from storage.users import get_user
6
 
7
  from .common import safe_send
8
 
 
11
 
12
 
13
  def handle_headlines(from_number: str) -> JSONResponse:
14
+ user = get_user(from_number) or {}
15
+ prefs = user.get("preferences", {}) if isinstance(user, dict) else {}
16
+ preferred_topics = prefs.get("topics") if isinstance(prefs, dict) else None
17
+
18
+ full_message_text = fetch_cached_headlines(selected_topics=preferred_topics)
19
 
20
  if full_message_text.startswith("❌") or full_message_text.startswith("⚠️"):
21
  logger.error("Failed to fetch digest for %s: %s", from_number, full_message_text)
 
43
  def handle_preferences(from_number: str) -> JSONResponse:
44
  msg = (
45
  "Let’s tune your feed. Reply with topics you like:\n"
46
+ "• IndiaWorldTechnologyBusiness & Markets Sports\n\n"
47
+ "You can send multiple, e.g.: India, Technology"
48
  )
49
  safe_send(msg, to=from_number)
50
  return JSONResponse(status_code=200, content={"status": "success", "message": "Preferences prompt sent"})
routes/api/whatsapp_webhook.py CHANGED
@@ -26,6 +26,9 @@ from components.handlers.wa import (
26
  handle_small_talk,
27
  handle_chat_question,
28
  handle_general_research,
 
 
 
29
  )
30
 
31
  try: # Optional memory integration
@@ -93,6 +96,7 @@ def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optiona
93
  Extracts (from_number, message_text) from a WhatsApp webhook payload.
94
  Returns (None, None) if not a user message (e.g., statuses, billing-event).
95
  """
 
96
  try:
97
  entries = payload.get("entry", [])
98
  for entry in entries:
@@ -113,6 +117,8 @@ def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optiona
113
  # Only look at the first message in the list
114
  msg = messages_list[0]
115
  from_number = msg.get("from")
 
 
116
  mtype = msg.get("type")
117
 
118
  # 1) Plain text
@@ -141,12 +147,70 @@ def _extract_from_number_and_text(payload: dict) -> Tuple[Optional[str], Optiona
141
  return from_number, intent
142
 
143
  # No usable user message found
144
- return None, None
145
 
146
  except Exception as e:
147
  # Always return a tuple on error
148
  logging.exception(f"_extract_from_number_and_text error: {e}")
149
- return None, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
 
152
  def _extract_message_timestamp(payload: dict) -> Optional[float]:
@@ -206,9 +270,15 @@ async def whatsapp_webhook_receiver(request: Request):
206
  logging.info(f"Duplicate inbound message {msg_id} ignored.")
207
  return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})
208
 
209
- # Extract sender + message
210
  from_number, message_text = _extract_from_number_and_text(incoming_message)
211
- if not from_number or not message_text:
 
 
 
 
 
 
212
  logging.info("Ignoring non-message webhook or missing sender/text.")
213
  return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"})
214
 
@@ -223,10 +293,39 @@ async def whatsapp_webhook_receiver(request: Request):
223
  message_ts = _extract_message_timestamp(incoming_message)
224
 
225
  # Upsert user (profile + last_seen + last_msg_id)
 
 
226
  try:
227
- ensure_user_from_webhook(incoming_message, from_number, msg_id)
228
  except Exception as e:
229
  logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
 
231
  if _store_chat_turn:
232
  try:
 
26
  handle_small_talk,
27
  handle_chat_question,
28
  handle_general_research,
29
+ initiate_onboarding,
30
+ handle_onboarding_preferences_text,
31
+ handle_onboarding_preferences_flow_submission,
32
  )
33
 
34
  try: # Optional memory integration
 
96
  Extracts (from_number, message_text) from a WhatsApp webhook payload.
97
  Returns (None, None) if not a user message (e.g., statuses, billing-event).
98
  """
99
+ last_sender: Optional[str] = None
100
  try:
101
  entries = payload.get("entry", [])
102
  for entry in entries:
 
117
  # Only look at the first message in the list
118
  msg = messages_list[0]
119
  from_number = msg.get("from")
120
+ if from_number:
121
+ last_sender = from_number
122
  mtype = msg.get("type")
123
 
124
  # 1) Plain text
 
147
  return from_number, intent
148
 
149
  # No usable user message found
150
+ return last_sender, None
151
 
152
  except Exception as e:
153
  # Always return a tuple on error
154
  logging.exception(f"_extract_from_number_and_text error: {e}")
155
+ return last_sender, None
156
+
157
+
158
+ def _extract_flow_topic_ids(payload: dict) -> Optional[list]:
159
+ try:
160
+ entries = payload.get("entry", [])
161
+ for entry in entries:
162
+ changes = entry.get("changes", [])
163
+ for change in changes:
164
+ if change.get("field") != "messages":
165
+ continue
166
+ value = change.get("value", {}) or {}
167
+ messages_list = value.get("messages", []) or []
168
+ if not messages_list:
169
+ continue
170
+ msg = messages_list[0]
171
+ if msg.get("type") != "interactive":
172
+ continue
173
+ interactive = msg.get("interactive", {}) or {}
174
+ if interactive.get("type") != "flow":
175
+ continue
176
+
177
+ flow_payload = (
178
+ interactive.get("flow")
179
+ or interactive.get("flow_response")
180
+ or interactive.get("flow_reply")
181
+ or {}
182
+ )
183
+
184
+ selected: list = []
185
+
186
+ def _collect(node):
187
+ if isinstance(node, dict):
188
+ node_type = str(node.get("type", "")).lower()
189
+ if node_type in {"checkbox", "checkboxes", "checkbox_group"}:
190
+ for option in node.get("options", []) or []:
191
+ if not isinstance(option, dict):
192
+ continue
193
+ if option.get("selected") or option.get("checked"):
194
+ val = (
195
+ option.get("id")
196
+ or option.get("value")
197
+ or option.get("title")
198
+ )
199
+ if val:
200
+ selected.append(str(val).strip().lower())
201
+ for value in node.values():
202
+ _collect(value)
203
+ elif isinstance(node, list):
204
+ for item in node:
205
+ _collect(item)
206
+
207
+ _collect(flow_payload)
208
+
209
+ if selected:
210
+ return selected
211
+ except Exception as exc: # pragma: no cover - defensive path
212
+ logging.debug("_extract_flow_topic_ids error: %s", exc)
213
+ return None
214
 
215
 
216
  def _extract_message_timestamp(payload: dict) -> Optional[float]:
 
270
  logging.info(f"Duplicate inbound message {msg_id} ignored.")
271
  return JSONResponse(status_code=200, content={"status": "ignored", "message": "Duplicate message"})
272
 
273
+ # Extract sender + message/flow payload
274
  from_number, message_text = _extract_from_number_and_text(incoming_message)
275
+ flow_topic_ids = _extract_flow_topic_ids(incoming_message) or []
276
+
277
+ if not from_number:
278
+ logging.info("Ignoring webhook without sender id.")
279
+ return JSONResponse(status_code=200, content={"status": "ignored", "message": "No sender"})
280
+
281
+ if not message_text and not flow_topic_ids:
282
  logging.info("Ignoring non-message webhook or missing sender/text.")
283
  return JSONResponse(status_code=200, content={"status": "ignored", "message": "No user message"})
284
 
 
293
  message_ts = _extract_message_timestamp(incoming_message)
294
 
295
  # Upsert user (profile + last_seen + last_msg_id)
296
+ user: Optional[dict] = None
297
+ user_created = False
298
  try:
299
+ user, user_created = ensure_user_from_webhook(incoming_message, from_number, msg_id)
300
  except Exception as e:
301
  logging.warning(f"ensure_user_from_webhook failed (continuing): {e}")
302
+ user = None
303
+ user_created = False
304
+
305
+ onboarding_stage = (user or {}).get("onboarding_stage") if isinstance(user, dict) else None
306
+
307
+ if user_created:
308
+ if typing_started:
309
+ send_typing_indicator(from_number, status="paused")
310
+ typing_started = False
311
+ return initiate_onboarding(from_number)
312
+
313
+ if onboarding_stage == "awaiting_preferences_flow":
314
+ if flow_topic_ids:
315
+ if typing_started:
316
+ send_typing_indicator(from_number, status="paused")
317
+ typing_started = False
318
+ return handle_onboarding_preferences_flow_submission(from_number, flow_topic_ids)
319
+ if typing_started:
320
+ send_typing_indicator(from_number, status="paused")
321
+ typing_started = False
322
+ return handle_onboarding_preferences_text(from_number, message_text)
323
+
324
+ if onboarding_stage == "awaiting_preferences_text":
325
+ if typing_started:
326
+ send_typing_indicator(from_number, status="paused")
327
+ typing_started = False
328
+ return handle_onboarding_preferences_text(from_number, message_text)
329
 
330
  if _store_chat_turn:
331
  try:
storage/users.py CHANGED
@@ -3,7 +3,7 @@ import os
3
  import time
4
  import json
5
  import logging
6
- from typing import Any, Dict, List, Optional
7
 
8
  import redis
9
 
@@ -129,6 +129,21 @@ def set_preferences(wa_id: str, topics: Optional[List[str]] = None,
129
  logging.exception(f"set_preferences error for {wa_id}: {e}")
130
  return {"wa_id": wa_id, "error": str(e)}
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  # ------------- Webhook helpers -------------
133
 
134
  def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]:
@@ -153,12 +168,19 @@ def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]:
153
  return None
154
 
155
 
156
- def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_id: Optional[str]) -> Dict[str, Any]:
 
 
 
 
157
  """
158
  Ensure the user exists/updated using webhook payload signals.
159
  - from_number: WhatsApp 'from' (wa_id / E.164)
160
  - message_id: wamid... (for last message stamping)
161
  """
 
 
 
162
  display_name = _extract_contact_name(payload)
163
  user = upsert_user(
164
  wa_id=from_number,
@@ -170,4 +192,6 @@ def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_
170
  set_last_seen(from_number)
171
  if message_id:
172
  record_inbound_message(from_number, message_id)
173
- return user
 
 
 
3
  import time
4
  import json
5
  import logging
6
+ from typing import Any, Dict, List, Optional, Tuple
7
 
8
  import redis
9
 
 
129
  logging.exception(f"set_preferences error for {wa_id}: {e}")
130
  return {"wa_id": wa_id, "error": str(e)}
131
 
132
+
133
+ def set_onboarding_stage(wa_id: str, stage: str) -> None:
134
+ try:
135
+ redis_client.hset(_user_key(wa_id), mapping={"onboarding_stage": stage})
136
+ except Exception as e:
137
+ logging.exception(f"set_onboarding_stage error for {wa_id}: {e}")
138
+
139
+
140
+ def get_onboarding_stage(wa_id: str) -> Optional[str]:
141
+ try:
142
+ return redis_client.hget(_user_key(wa_id), "onboarding_stage")
143
+ except Exception as e:
144
+ logging.exception(f"get_onboarding_stage error for {wa_id}: {e}")
145
+ return None
146
+
147
  # ------------- Webhook helpers -------------
148
 
149
  def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]:
 
168
  return None
169
 
170
 
171
+ def ensure_user_from_webhook(
172
+ payload: Dict[str, Any],
173
+ from_number: str,
174
+ message_id: Optional[str],
175
+ ) -> Tuple[Dict[str, Any], bool]:
176
  """
177
  Ensure the user exists/updated using webhook payload signals.
178
  - from_number: WhatsApp 'from' (wa_id / E.164)
179
  - message_id: wamid... (for last message stamping)
180
  """
181
+ existing = get_user(from_number)
182
+ created = existing is None
183
+
184
  display_name = _extract_contact_name(payload)
185
  user = upsert_user(
186
  wa_id=from_number,
 
192
  set_last_seen(from_number)
193
  if message_id:
194
  record_inbound_message(from_number, message_id)
195
+ if created:
196
+ user = get_user(from_number) or user
197
+ return user, created