Soulcompass / rag_utils.py
Rong6693's picture
Update rag_utils.py
e713130 verified
# rag_utils.py
import os, json, time
from typing import List, Dict, Tuple
import numpy as np
# Embedding / Vector index
from sentence_transformers import SentenceTransformer
import faiss
# -------- Paths --------
ROOT_DIR = os.path.dirname(__file__)
DATA_DIR = os.path.join(ROOT_DIR, "data")
CACHE_DIR = "/tmp" # Spaces 可寫
TAROT_JSON = os.path.join(DATA_DIR, "tarot_data_full.json")
NUM_JSON = os.path.join(DATA_DIR, "numerology_data_full.json")
EXT_JSONL = os.path.join(DATA_DIR, "external_chunks.jsonl") # 可選(爬蟲結果)
TAROT_IDX = os.path.join(CACHE_DIR, "faiss_tarot.index")
TAROT_META = os.path.join(CACHE_DIR, "faiss_tarot_meta.json")
NUM_IDX = os.path.join(CACHE_DIR, "faiss_num.index")
NUM_META = os.path.join(CACHE_DIR, "faiss_num_meta.json")
EXT_IDX = os.path.join(CACHE_DIR, "faiss_ext.index")
EXT_META = os.path.join(CACHE_DIR, "faiss_ext_meta.json")
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 輕量好用
_model = None
def get_model():
global _model
if _model is None:
_model = SentenceTransformer(EMBED_MODEL)
return _model
# ---------- Utils ----------
def _mtime(path: str) -> float:
return os.path.getmtime(path) if os.path.exists(path) else 0.0
def _should_rebuild(src_paths: List[str], idx_paths: List[str]) -> bool:
"""任一來源較新,或索引不存在 → 重建"""
src_time = max((_mtime(p) for p in src_paths if p and os.path.exists(p)), default=0.0)
idx_time = min((_mtime(p) for p in idx_paths if p and os.path.exists(p)), default=0.0)
if not all(os.path.exists(p) for p in idx_paths):
return True
return src_time > idx_time
def _encode(texts: List[str]) -> np.ndarray:
model = get_model()
embs = model.encode(texts, normalize_embeddings=True, batch_size=64, show_progress_bar=False)
return np.asarray(embs, dtype="float32")
def _build_index(texts: List[str], dim: int = None) -> faiss.IndexFlatIP:
embs = _encode(texts)
index = faiss.IndexFlatIP(embs.shape[1] if dim is None else dim)
index.add(embs)
return index
def _save_index(index: faiss.Index, idx_path: str, meta: List[Dict], meta_path: str):
faiss.write_index(index, idx_path)
with open(meta_path, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
def _search(idx_path: str, meta_path: str, query: str, k: int) -> List[Dict]:
if not (os.path.exists(idx_path) and os.path.exists(meta_path)):
return []
index = faiss.read_index(idx_path)
with open(meta_path, "r", encoding="utf-8") as f:
meta = json.load(f)
q = _encode([query])
D, I = index.search(q, k)
out = []
for rank, (score, j) in enumerate(zip(D[0], I[0]), 1):
if j < 0 or j >= len(meta):
continue
m = dict(meta[j])
m["score"] = float(score)
m["rank"] = rank
out.append(m)
return out
# ---------- Tarot ----------
def _build_tarot():
with open(TAROT_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
items = []
for i, c in enumerate(data):
name = c.get("name") or c.get("card_name") or f"Card {i}"
# 蒐集能用到的文字欄位
parts = []
for key in ("meaning_upright", "meaning_reversed", "description", "advice"):
val = c.get(key)
if isinstance(val, str) and val.strip():
parts.append(val.strip())
# keywords 可能是 list
for key in ("keywords_upright", "keywords_reversed", "keywords"):
val = c.get(key)
if isinstance(val, list) and val:
parts.append(" ".join(val))
elif isinstance(val, str) and val.strip():
parts.append(val.strip())
text = (name + " - " + " ".join(parts)).strip()
items.append({"card_name": name, "text": text})
texts = [it["text"] for it in items]
index = _build_index(texts)
_save_index(index, TAROT_IDX, items, TAROT_META)
def search_tarot(query: str, k: int = 3) -> List[Dict]:
ensure_indexes()
return _search(TAROT_IDX, TAROT_META, query, k)
# ---------- Numerology ----------
def _build_num():
with open(NUM_JSON, "r", encoding="utf-8") as f:
data = json.load(f)
items = []
for r in data:
num = r.get("number")
if num is None:
continue
num_int = int(num)
parts = [str(num_int)]
for key in ("name", "description", "advice"):
val = r.get(key)
if isinstance(val, str) and val.strip():
parts.append(val.strip())
kws = r.get("keywords", [])
if isinstance(kws, list) and kws:
parts.append(" ".join(kws))
text = " ".join(parts)
items.append({"number": num_int, "text": text})
texts = [it["text"] for it in items]
index = _build_index(texts)
_save_index(index, NUM_IDX, items, NUM_META)
def search_numerology(query: str, k: int = 3) -> List[Dict]:
ensure_indexes()
return _search(NUM_IDX, NUM_META, query, k)
# ---------- External corpus (optional) ----------
def _build_external():
"""讀取 data/external_chunks.jsonl(每行一個 JSON:{id,url,title,text})"""
items, texts = [], []
with open(EXT_JSONL, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
j = json.loads(line)
except Exception:
continue
text = (j.get("title","").strip() + " " + j.get("text","").strip()).strip()
if not text:
continue
items.append({
"id": j.get("id"),
"url": j.get("url"),
"title": j.get("title"),
"text": text
})
texts.append(text)
if not texts:
# 沒內容就不建索引,留空檔避免噴錯
return
index = _build_index(texts)
_save_index(index, EXT_IDX, items, EXT_META)
def search_external(query: str, k: int = 3) -> List[Dict]:
"""如果沒有 external 索引/檔案會傳回空陣列,不報錯。"""
if not (os.path.exists(EXT_IDX) and os.path.exists(EXT_META)):
return []
return _search(EXT_IDX, EXT_META, query, k)
# ---------- Ensure / Union ----------
def ensure_indexes():
os.makedirs(CACHE_DIR, exist_ok=True)
# Tarot
if _should_rebuild([TAROT_JSON], [TAROT_IDX, TAROT_META]):
try:
_build_tarot()
except Exception as e:
print("[rag_utils] build tarot failed:", e)
# Numerology
if _should_rebuild([NUM_JSON], [NUM_IDX, NUM_META]):
try:
_build_num()
except Exception as e:
print("[rag_utils] build numerology failed:", e)
# External(可選)
if os.path.exists(EXT_JSONL) and _should_rebuild([EXT_JSONL], [EXT_IDX, EXT_META]):
try:
_build_external()
except Exception as e:
print("[rag_utils] build external failed:", e)
def search_union(query: str, k_each: int = 2) -> List[Dict]:
"""把 Tarot + Numerology + External 合併(各取 k_each),再依分數排序。"""
ensure_indexes()
out = []
out += search_tarot(query, k_each)
out += search_numerology(query, k_each)
out += search_external(query, k_each)
# 排序:score 高在前
out.sort(key=lambda x: x.get("score", 0.0), reverse=True)
return out[: max(3, k_each)] # 至少回 3 則