fastAPIv2 / storage /users.py
ragV98's picture
idempotancy integration
5e03773
raw
history blame
5.91 kB
# components/store/users.py
import os
import time
import json
import logging
from typing import Any, Dict, List, Optional
import redis
# Reuse your Upstash Redis
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# Key helpers
def _user_key(wa_id: str) -> str:
return f"user:{wa_id}"
def _prefs_key(wa_id: str) -> str:
return f"user:{wa_id}:prefs"
# ------------- Core CRUD -------------
def get_user(wa_id: str) -> Optional[Dict[str, Any]]:
try:
data = redis_client.hgetall(_user_key(wa_id))
if not data:
return None
# Normalize types
if "created_at" in data:
data["created_at"] = float(data["created_at"])
if "updated_at" in data:
data["updated_at"] = float(data["updated_at"])
if "last_seen" in data:
data["last_seen"] = float(data["last_seen"])
if "last_msg_ts" in data:
data["last_msg_ts"] = float(data["last_msg_ts"])
# Preferences
prefs = redis_client.get(_prefs_key(wa_id))
data["preferences"] = json.loads(prefs) if prefs else {"topics": [], "regions": [], "time": None}
return data
except Exception as e:
logging.exception(f"get_user error for {wa_id}: {e}")
return None
def upsert_user(
wa_id: str,
display_name: Optional[str] = None,
phone_e164: Optional[str] = None,
source: str = "whatsapp",
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Create or update a user profile. Non-destructive: only updates provided fields.
"""
now = time.time()
key = _user_key(wa_id)
# pipe for atomic-ish writes
p = redis_client.pipeline()
try:
exists = redis_client.exists(key)
fields = {
"wa_id": wa_id,
"source": source,
"updated_at": now,
}
if display_name is not None:
fields["display_name"] = display_name
if phone_e164 is not None:
fields["phone_e164"] = phone_e164
if extra:
# store a small JSON for misc signals (e.g., locale)
fields["extra_json"] = json.dumps(extra)[:4096]
if not exists:
fields["created_at"] = now
fields.setdefault("consent", "unknown") # 'opt_in'|'opt_out'|'unknown'
p.hset(key, mapping={k: str(v) for k, v in fields.items()})
p.execute()
user = get_user(wa_id) or {}
return user
except Exception as e:
logging.exception(f"upsert_user error for {wa_id}: {e}")
return {"wa_id": wa_id, "error": str(e)}
def set_last_seen(wa_id: str, ts: Optional[float] = None) -> None:
try:
redis_client.hset(_user_key(wa_id), mapping={
"last_seen": str(ts if ts is not None else time.time())
})
except Exception as e:
logging.exception(f"set_last_seen error for {wa_id}: {e}")
def record_inbound_message(wa_id: str, message_id: str, ts: Optional[float] = None) -> None:
try:
redis_client.hset(_user_key(wa_id), mapping={
"last_msg_id": message_id,
"last_msg_ts": str(ts if ts is not None else time.time())
})
except Exception as e:
logging.exception(f"record_inbound_message error for {wa_id}: {e}")
def set_preferences(wa_id: str, topics: Optional[List[str]] = None,
regions: Optional[List[str]] = None, time_pref: Optional[str] = None) -> Dict[str, Any]:
"""
Persist user preferences as a small JSON blob.
- topics: e.g., ["india","finance"]
- regions: e.g., ["IN","EU"]
- time_pref: e.g., "09:00"
"""
try:
current = get_user(wa_id) or {}
prefs = current.get("preferences", {}) or {}
if topics is not None:
prefs["topics"] = [t.strip().lower() for t in topics if t.strip()]
if regions is not None:
prefs["regions"] = [r.strip().upper() for r in regions if r.strip()]
if time_pref is not None:
prefs["time"] = time_pref
redis_client.set(_prefs_key(wa_id), json.dumps(prefs))
# Touch updated_at
redis_client.hset(_user_key(wa_id), mapping={"updated_at": str(time.time())})
current["preferences"] = prefs
return current
except Exception as e:
logging.exception(f"set_preferences error for {wa_id}: {e}")
return {"wa_id": wa_id, "error": str(e)}
# ------------- Webhook helpers -------------
def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]:
"""
Try multiple shapes:
value.contacts[0].profile.name
"""
try:
for entry in payload.get("entry", []):
for change in entry.get("changes", []):
if change.get("field") != "messages":
continue
val = change.get("value", {}) or {}
contacts = val.get("contacts", []) or []
if contacts:
prof = contacts[0].get("profile", {}) or {}
name = prof.get("name")
if name:
return name
except Exception:
pass
return None
def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_id: Optional[str]) -> Dict[str, Any]:
"""
Ensure the user exists/updated using webhook payload signals.
- from_number: WhatsApp 'from' (wa_id / E.164)
- message_id: wamid... (for last message stamping)
"""
display_name = _extract_contact_name(payload)
user = upsert_user(
wa_id=from_number,
display_name=display_name,
phone_e164=from_number,
source="whatsapp",
extra=None,
)
set_last_seen(from_number)
if message_id:
record_inbound_message(from_number, message_id)
return user