# rag_utils.py import os, json, time from typing import List, Dict, Tuple import numpy as np # Embedding / Vector index from sentence_transformers import SentenceTransformer import faiss # -------- Paths -------- ROOT_DIR = os.path.dirname(__file__) DATA_DIR = os.path.join(ROOT_DIR, "data") CACHE_DIR = "/tmp" # Spaces 可寫 TAROT_JSON = os.path.join(DATA_DIR, "tarot_data_full.json") NUM_JSON = os.path.join(DATA_DIR, "numerology_data_full.json") EXT_JSONL = os.path.join(DATA_DIR, "external_chunks.jsonl") # 可選(爬蟲結果) TAROT_IDX = os.path.join(CACHE_DIR, "faiss_tarot.index") TAROT_META = os.path.join(CACHE_DIR, "faiss_tarot_meta.json") NUM_IDX = os.path.join(CACHE_DIR, "faiss_num.index") NUM_META = os.path.join(CACHE_DIR, "faiss_num_meta.json") EXT_IDX = os.path.join(CACHE_DIR, "faiss_ext.index") EXT_META = os.path.join(CACHE_DIR, "faiss_ext_meta.json") EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 輕量好用 _model = None def get_model(): global _model if _model is None: _model = SentenceTransformer(EMBED_MODEL) return _model # ---------- Utils ---------- def _mtime(path: str) -> float: return os.path.getmtime(path) if os.path.exists(path) else 0.0 def _should_rebuild(src_paths: List[str], idx_paths: List[str]) -> bool: """任一來源較新,或索引不存在 → 重建""" src_time = max((_mtime(p) for p in src_paths if p and os.path.exists(p)), default=0.0) idx_time = min((_mtime(p) for p in idx_paths if p and os.path.exists(p)), default=0.0) if not all(os.path.exists(p) for p in idx_paths): return True return src_time > idx_time def _encode(texts: List[str]) -> np.ndarray: model = get_model() embs = model.encode(texts, normalize_embeddings=True, batch_size=64, show_progress_bar=False) return np.asarray(embs, dtype="float32") def _build_index(texts: List[str], dim: int = None) -> faiss.IndexFlatIP: embs = _encode(texts) index = faiss.IndexFlatIP(embs.shape[1] if dim is None else dim) index.add(embs) return index def _save_index(index: faiss.Index, idx_path: str, meta: List[Dict], meta_path: str): faiss.write_index(index, idx_path) with open(meta_path, "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) def _search(idx_path: str, meta_path: str, query: str, k: int) -> List[Dict]: if not (os.path.exists(idx_path) and os.path.exists(meta_path)): return [] index = faiss.read_index(idx_path) with open(meta_path, "r", encoding="utf-8") as f: meta = json.load(f) q = _encode([query]) D, I = index.search(q, k) out = [] for rank, (score, j) in enumerate(zip(D[0], I[0]), 1): if j < 0 or j >= len(meta): continue m = dict(meta[j]) m["score"] = float(score) m["rank"] = rank out.append(m) return out # ---------- Tarot ---------- def _build_tarot(): with open(TAROT_JSON, "r", encoding="utf-8") as f: data = json.load(f) items = [] for i, c in enumerate(data): name = c.get("name") or c.get("card_name") or f"Card {i}" # 蒐集能用到的文字欄位 parts = [] for key in ("meaning_upright", "meaning_reversed", "description", "advice"): val = c.get(key) if isinstance(val, str) and val.strip(): parts.append(val.strip()) # keywords 可能是 list for key in ("keywords_upright", "keywords_reversed", "keywords"): val = c.get(key) if isinstance(val, list) and val: parts.append(" ".join(val)) elif isinstance(val, str) and val.strip(): parts.append(val.strip()) text = (name + " - " + " ".join(parts)).strip() items.append({"card_name": name, "text": text}) texts = [it["text"] for it in items] index = _build_index(texts) _save_index(index, TAROT_IDX, items, TAROT_META) def search_tarot(query: str, k: int = 3) -> List[Dict]: ensure_indexes() return _search(TAROT_IDX, TAROT_META, query, k) # ---------- Numerology ---------- def _build_num(): with open(NUM_JSON, "r", encoding="utf-8") as f: data = json.load(f) items = [] for r in data: num = r.get("number") if num is None: continue num_int = int(num) parts = [str(num_int)] for key in ("name", "description", "advice"): val = r.get(key) if isinstance(val, str) and val.strip(): parts.append(val.strip()) kws = r.get("keywords", []) if isinstance(kws, list) and kws: parts.append(" ".join(kws)) text = " ".join(parts) items.append({"number": num_int, "text": text}) texts = [it["text"] for it in items] index = _build_index(texts) _save_index(index, NUM_IDX, items, NUM_META) def search_numerology(query: str, k: int = 3) -> List[Dict]: ensure_indexes() return _search(NUM_IDX, NUM_META, query, k) # ---------- External corpus (optional) ---------- def _build_external(): """讀取 data/external_chunks.jsonl(每行一個 JSON:{id,url,title,text})""" items, texts = [], [] with open(EXT_JSONL, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: j = json.loads(line) except Exception: continue text = (j.get("title","").strip() + " " + j.get("text","").strip()).strip() if not text: continue items.append({ "id": j.get("id"), "url": j.get("url"), "title": j.get("title"), "text": text }) texts.append(text) if not texts: # 沒內容就不建索引,留空檔避免噴錯 return index = _build_index(texts) _save_index(index, EXT_IDX, items, EXT_META) def search_external(query: str, k: int = 3) -> List[Dict]: """如果沒有 external 索引/檔案會傳回空陣列,不報錯。""" if not (os.path.exists(EXT_IDX) and os.path.exists(EXT_META)): return [] return _search(EXT_IDX, EXT_META, query, k) # ---------- Ensure / Union ---------- def ensure_indexes(): os.makedirs(CACHE_DIR, exist_ok=True) # Tarot if _should_rebuild([TAROT_JSON], [TAROT_IDX, TAROT_META]): try: _build_tarot() except Exception as e: print("[rag_utils] build tarot failed:", e) # Numerology if _should_rebuild([NUM_JSON], [NUM_IDX, NUM_META]): try: _build_num() except Exception as e: print("[rag_utils] build numerology failed:", e) # External(可選) if os.path.exists(EXT_JSONL) and _should_rebuild([EXT_JSONL], [EXT_IDX, EXT_META]): try: _build_external() except Exception as e: print("[rag_utils] build external failed:", e) def search_union(query: str, k_each: int = 2) -> List[Dict]: """把 Tarot + Numerology + External 合併(各取 k_each),再依分數排序。""" ensure_indexes() out = [] out += search_tarot(query, k_each) out += search_numerology(query, k_each) out += search_external(query, k_each) # 排序:score 高在前 out.sort(key=lambda x: x.get("score", 0.0), reverse=True) return out[: max(3, k_each)] # 至少回 3 則