|
|
|
import os |
|
import time |
|
import json |
|
import logging |
|
from typing import Any, Dict, List, Optional |
|
|
|
import redis |
|
|
|
|
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
|
|
|
|
def _user_key(wa_id: str) -> str: |
|
return f"user:{wa_id}" |
|
|
|
def _prefs_key(wa_id: str) -> str: |
|
return f"user:{wa_id}:prefs" |
|
|
|
|
|
|
|
def get_user(wa_id: str) -> Optional[Dict[str, Any]]: |
|
try: |
|
data = redis_client.hgetall(_user_key(wa_id)) |
|
if not data: |
|
return None |
|
|
|
if "created_at" in data: |
|
data["created_at"] = float(data["created_at"]) |
|
if "updated_at" in data: |
|
data["updated_at"] = float(data["updated_at"]) |
|
if "last_seen" in data: |
|
data["last_seen"] = float(data["last_seen"]) |
|
if "last_msg_ts" in data: |
|
data["last_msg_ts"] = float(data["last_msg_ts"]) |
|
|
|
prefs = redis_client.get(_prefs_key(wa_id)) |
|
data["preferences"] = json.loads(prefs) if prefs else {"topics": [], "regions": [], "time": None} |
|
return data |
|
except Exception as e: |
|
logging.exception(f"get_user error for {wa_id}: {e}") |
|
return None |
|
|
|
|
|
def upsert_user( |
|
wa_id: str, |
|
display_name: Optional[str] = None, |
|
phone_e164: Optional[str] = None, |
|
source: str = "whatsapp", |
|
extra: Optional[Dict[str, Any]] = None, |
|
) -> Dict[str, Any]: |
|
""" |
|
Create or update a user profile. Non-destructive: only updates provided fields. |
|
""" |
|
now = time.time() |
|
key = _user_key(wa_id) |
|
|
|
p = redis_client.pipeline() |
|
try: |
|
exists = redis_client.exists(key) |
|
fields = { |
|
"wa_id": wa_id, |
|
"source": source, |
|
"updated_at": now, |
|
} |
|
if display_name is not None: |
|
fields["display_name"] = display_name |
|
if phone_e164 is not None: |
|
fields["phone_e164"] = phone_e164 |
|
if extra: |
|
|
|
fields["extra_json"] = json.dumps(extra)[:4096] |
|
|
|
if not exists: |
|
fields["created_at"] = now |
|
fields.setdefault("consent", "unknown") |
|
p.hset(key, mapping={k: str(v) for k, v in fields.items()}) |
|
p.execute() |
|
user = get_user(wa_id) or {} |
|
return user |
|
except Exception as e: |
|
logging.exception(f"upsert_user error for {wa_id}: {e}") |
|
return {"wa_id": wa_id, "error": str(e)} |
|
|
|
|
|
def set_last_seen(wa_id: str, ts: Optional[float] = None) -> None: |
|
try: |
|
redis_client.hset(_user_key(wa_id), mapping={ |
|
"last_seen": str(ts if ts is not None else time.time()) |
|
}) |
|
except Exception as e: |
|
logging.exception(f"set_last_seen error for {wa_id}: {e}") |
|
|
|
|
|
def record_inbound_message(wa_id: str, message_id: str, ts: Optional[float] = None) -> None: |
|
try: |
|
redis_client.hset(_user_key(wa_id), mapping={ |
|
"last_msg_id": message_id, |
|
"last_msg_ts": str(ts if ts is not None else time.time()) |
|
}) |
|
except Exception as e: |
|
logging.exception(f"record_inbound_message error for {wa_id}: {e}") |
|
|
|
|
|
def set_preferences(wa_id: str, topics: Optional[List[str]] = None, |
|
regions: Optional[List[str]] = None, time_pref: Optional[str] = None) -> Dict[str, Any]: |
|
""" |
|
Persist user preferences as a small JSON blob. |
|
- topics: e.g., ["india","finance"] |
|
- regions: e.g., ["IN","EU"] |
|
- time_pref: e.g., "09:00" |
|
""" |
|
try: |
|
current = get_user(wa_id) or {} |
|
prefs = current.get("preferences", {}) or {} |
|
if topics is not None: |
|
prefs["topics"] = [t.strip().lower() for t in topics if t.strip()] |
|
if regions is not None: |
|
prefs["regions"] = [r.strip().upper() for r in regions if r.strip()] |
|
if time_pref is not None: |
|
prefs["time"] = time_pref |
|
redis_client.set(_prefs_key(wa_id), json.dumps(prefs)) |
|
|
|
redis_client.hset(_user_key(wa_id), mapping={"updated_at": str(time.time())}) |
|
current["preferences"] = prefs |
|
return current |
|
except Exception as e: |
|
logging.exception(f"set_preferences error for {wa_id}: {e}") |
|
return {"wa_id": wa_id, "error": str(e)} |
|
|
|
|
|
|
|
def _extract_contact_name(payload: Dict[str, Any]) -> Optional[str]: |
|
""" |
|
Try multiple shapes: |
|
value.contacts[0].profile.name |
|
""" |
|
try: |
|
for entry in payload.get("entry", []): |
|
for change in entry.get("changes", []): |
|
if change.get("field") != "messages": |
|
continue |
|
val = change.get("value", {}) or {} |
|
contacts = val.get("contacts", []) or [] |
|
if contacts: |
|
prof = contacts[0].get("profile", {}) or {} |
|
name = prof.get("name") |
|
if name: |
|
return name |
|
except Exception: |
|
pass |
|
return None |
|
|
|
|
|
def ensure_user_from_webhook(payload: Dict[str, Any], from_number: str, message_id: Optional[str]) -> Dict[str, Any]: |
|
""" |
|
Ensure the user exists/updated using webhook payload signals. |
|
- from_number: WhatsApp 'from' (wa_id / E.164) |
|
- message_id: wamid... (for last message stamping) |
|
""" |
|
display_name = _extract_contact_name(payload) |
|
user = upsert_user( |
|
wa_id=from_number, |
|
display_name=display_name, |
|
phone_e164=from_number, |
|
source="whatsapp", |
|
extra=None, |
|
) |
|
set_last_seen(from_number) |
|
if message_id: |
|
record_inbound_message(from_number, message_id) |
|
return user |
|
|