# components/store/users.py import os import time import json import logging from typing import Any, Dict, List, Optional import redis # Reuse your Upstash Redis REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) # Key helpers def _user_key(wa_id: str) -> str: return f"user:{wa_id}" def _prefs_key(wa_id: str) -> str: return f"user:{wa_id}:prefs" # ------------- Core CRUD ------------- def get_user(wa_id: str) -> Optional[Dict[str, Any]]: try: data = redis_client.hgetall(_user_key(wa_id)) if not data: return None # Normalize types if "created_at" in data: data["created_at"] = float(data["created_at"]) if "updated_at" in data: data["updated_at"] = float(data["updated_at"]) if "last_seen" in data: data["last_seen"] = float(data["last_seen"]) if "last_msg_ts" in data: data["last_msg_ts"] = float(data["last_msg_ts"]) # Preferences prefs = redis_client.get(_prefs_key(wa_id)) data["preferences"] = json.loads(prefs) if prefs else {"topics": [], "regions": [], "time": None} return data except Exception as e: logging.exception(f"get_user error for {wa_id}: {e}") return None def upsert_user( wa_id: str, display_name: Optional[str] = None, phone_e164: Optional[str] = None, source: str = "whatsapp", extra: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Create or update a user profile. Non-destructive: only updates provided fields. """ now = time.time() key = _user_key(wa_id) # pipe for atomic-ish writes p = redis_client.pipeline() try: exists = redis_client.exists(key) fields = { "wa_id": wa_id, "source": source, "updated_at": now, } if display_name is not None: fields["display_name"] = display_name if phone_e164 is not None: fields["phone_e164"] = phone_e164 if extra: # store a small JSON for misc signals (e.g., locale) fields["extra_json"] = json.dumps(extra)[:4096] if not exists: fields["created_at"] = now fields.setdefault("consent", "unknown") # 'opt_in'|'opt_out'|'unknown' p.hset(key, mapping={k: str(v) for k, v in fields.items()}) p.execute() user = get_user(wa_id) or {} return user except Exception as e: logging.exception(f"upsert_user error for {wa_id}: {e}") return {"wa_id": wa_id, "error": str(e)} def set_last_seen(wa_id: str, ts: Optional[float] = None) -> None: try: redis_client.hset(_user_key(wa_id), mapping={ "last_seen": str(ts if ts is not None else time.time()) }) except Exception as e: logging.exception(f"set_last_seen error for {wa_id}: {e}") def record_inbound_message(wa_id: str, message_id: str, ts: Optional[float] = None) -> None: try: redis_client.hset(_user_key(wa_id), mapping={ "last_msg_id": message_id, "last_msg_ts": str(ts if ts is not None else time.time()) }) except Exception as e: logging.exception(f"record_inbound_message error for {wa_id}: {e}") def set_preferences(wa_id: str, topics: Optional[List[str]] = None, regions: Optional[List[str]] = None, time_pref: Optional[str] = None) -> Dict[str, Any]: """ Persist user preferences as a small JSON blob. - topics: e.g., ["india","finance"] - regions: e.g., ["IN","EU"] - time_pref: e.g., "09:00" """ try: current = get_user(wa_id) or {} prefs = current.get("preferences", {}) or {} if topics is not None: prefs["topics"] = [t.strip().lower() for t in topics if t.strip()] if regions is not None: prefs["regions"] = [r.strip().upper() for r in regions if r.strip()] if time_pref is not None: prefs["time"] = time_pref redis_client.set(_prefs_key(wa_id), json.dumps(prefs)) # Touch updated_at redis_client.hset(_user_key(wa_id), mapping={"updated_at": str(time.time())}) current["preferences"] = prefs return current except Exception as e: logging.exception(f"set_preferences error for {wa_id}: {e}") return {"wa_id": wa_id, "error": str(e)} # ------------- Webhook helpers ------------- def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]: """ Try multiple shapes: value.contacts[0].profile.name """ try: for entry in payload.get("entry", []): for change in entry.get("changes", []): if change.get("field") != "messages": continue val = change.get("value", {}) or {} contacts = val.get("contacts", []) or [] if contacts: prof = contacts[0].get("profile", {}) or {} name = prof.get("name") if name: return name except Exception: pass return None def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_id: Optional[str]) -> Dict[str, Any]: """ Ensure the user exists/updated using webhook payload signals. - from_number: WhatsApp 'from' (wa_id / E.164) - message_id: wamid... (for last message stamping) """ display_name = _extract_contact_name(payload) user = upsert_user( wa_id=from_number, display_name=display_name, phone_e164=from_number, source="whatsapp", extra=None, ) set_last_seen(from_number) if message_id: record_inbound_message(from_number, message_id) return user