Spaces:
Running
Running
# rag_utils.py | |
import os, json, time | |
from typing import List, Dict, Tuple | |
import numpy as np | |
# Embedding / Vector index | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
# -------- Paths -------- | |
ROOT_DIR = os.path.dirname(__file__) | |
DATA_DIR = os.path.join(ROOT_DIR, "data") | |
CACHE_DIR = "/tmp" # Spaces 可寫 | |
TAROT_JSON = os.path.join(DATA_DIR, "tarot_data_full.json") | |
NUM_JSON = os.path.join(DATA_DIR, "numerology_data_full.json") | |
EXT_JSONL = os.path.join(DATA_DIR, "external_chunks.jsonl") # 可選(爬蟲結果) | |
TAROT_IDX = os.path.join(CACHE_DIR, "faiss_tarot.index") | |
TAROT_META = os.path.join(CACHE_DIR, "faiss_tarot_meta.json") | |
NUM_IDX = os.path.join(CACHE_DIR, "faiss_num.index") | |
NUM_META = os.path.join(CACHE_DIR, "faiss_num_meta.json") | |
EXT_IDX = os.path.join(CACHE_DIR, "faiss_ext.index") | |
EXT_META = os.path.join(CACHE_DIR, "faiss_ext_meta.json") | |
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 輕量好用 | |
_model = None | |
def get_model(): | |
global _model | |
if _model is None: | |
_model = SentenceTransformer(EMBED_MODEL) | |
return _model | |
# ---------- Utils ---------- | |
def _mtime(path: str) -> float: | |
return os.path.getmtime(path) if os.path.exists(path) else 0.0 | |
def _should_rebuild(src_paths: List[str], idx_paths: List[str]) -> bool: | |
"""任一來源較新,或索引不存在 → 重建""" | |
src_time = max((_mtime(p) for p in src_paths if p and os.path.exists(p)), default=0.0) | |
idx_time = min((_mtime(p) for p in idx_paths if p and os.path.exists(p)), default=0.0) | |
if not all(os.path.exists(p) for p in idx_paths): | |
return True | |
return src_time > idx_time | |
def _encode(texts: List[str]) -> np.ndarray: | |
model = get_model() | |
embs = model.encode(texts, normalize_embeddings=True, batch_size=64, show_progress_bar=False) | |
return np.asarray(embs, dtype="float32") | |
def _build_index(texts: List[str], dim: int = None) -> faiss.IndexFlatIP: | |
embs = _encode(texts) | |
index = faiss.IndexFlatIP(embs.shape[1] if dim is None else dim) | |
index.add(embs) | |
return index | |
def _save_index(index: faiss.Index, idx_path: str, meta: List[Dict], meta_path: str): | |
faiss.write_index(index, idx_path) | |
with open(meta_path, "w", encoding="utf-8") as f: | |
json.dump(meta, f, ensure_ascii=False, indent=2) | |
def _search(idx_path: str, meta_path: str, query: str, k: int) -> List[Dict]: | |
if not (os.path.exists(idx_path) and os.path.exists(meta_path)): | |
return [] | |
index = faiss.read_index(idx_path) | |
with open(meta_path, "r", encoding="utf-8") as f: | |
meta = json.load(f) | |
q = _encode([query]) | |
D, I = index.search(q, k) | |
out = [] | |
for rank, (score, j) in enumerate(zip(D[0], I[0]), 1): | |
if j < 0 or j >= len(meta): | |
continue | |
m = dict(meta[j]) | |
m["score"] = float(score) | |
m["rank"] = rank | |
out.append(m) | |
return out | |
# ---------- Tarot ---------- | |
def _build_tarot(): | |
with open(TAROT_JSON, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
items = [] | |
for i, c in enumerate(data): | |
name = c.get("name") or c.get("card_name") or f"Card {i}" | |
# 蒐集能用到的文字欄位 | |
parts = [] | |
for key in ("meaning_upright", "meaning_reversed", "description", "advice"): | |
val = c.get(key) | |
if isinstance(val, str) and val.strip(): | |
parts.append(val.strip()) | |
# keywords 可能是 list | |
for key in ("keywords_upright", "keywords_reversed", "keywords"): | |
val = c.get(key) | |
if isinstance(val, list) and val: | |
parts.append(" ".join(val)) | |
elif isinstance(val, str) and val.strip(): | |
parts.append(val.strip()) | |
text = (name + " - " + " ".join(parts)).strip() | |
items.append({"card_name": name, "text": text}) | |
texts = [it["text"] for it in items] | |
index = _build_index(texts) | |
_save_index(index, TAROT_IDX, items, TAROT_META) | |
def search_tarot(query: str, k: int = 3) -> List[Dict]: | |
ensure_indexes() | |
return _search(TAROT_IDX, TAROT_META, query, k) | |
# ---------- Numerology ---------- | |
def _build_num(): | |
with open(NUM_JSON, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
items = [] | |
for r in data: | |
num = r.get("number") | |
if num is None: | |
continue | |
num_int = int(num) | |
parts = [str(num_int)] | |
for key in ("name", "description", "advice"): | |
val = r.get(key) | |
if isinstance(val, str) and val.strip(): | |
parts.append(val.strip()) | |
kws = r.get("keywords", []) | |
if isinstance(kws, list) and kws: | |
parts.append(" ".join(kws)) | |
text = " ".join(parts) | |
items.append({"number": num_int, "text": text}) | |
texts = [it["text"] for it in items] | |
index = _build_index(texts) | |
_save_index(index, NUM_IDX, items, NUM_META) | |
def search_numerology(query: str, k: int = 3) -> List[Dict]: | |
ensure_indexes() | |
return _search(NUM_IDX, NUM_META, query, k) | |
# ---------- External corpus (optional) ---------- | |
def _build_external(): | |
"""讀取 data/external_chunks.jsonl(每行一個 JSON:{id,url,title,text})""" | |
items, texts = [], [] | |
with open(EXT_JSONL, "r", encoding="utf-8") as f: | |
for line in f: | |
line = line.strip() | |
if not line: | |
continue | |
try: | |
j = json.loads(line) | |
except Exception: | |
continue | |
text = (j.get("title","").strip() + " " + j.get("text","").strip()).strip() | |
if not text: | |
continue | |
items.append({ | |
"id": j.get("id"), | |
"url": j.get("url"), | |
"title": j.get("title"), | |
"text": text | |
}) | |
texts.append(text) | |
if not texts: | |
# 沒內容就不建索引,留空檔避免噴錯 | |
return | |
index = _build_index(texts) | |
_save_index(index, EXT_IDX, items, EXT_META) | |
def search_external(query: str, k: int = 3) -> List[Dict]: | |
"""如果沒有 external 索引/檔案會傳回空陣列,不報錯。""" | |
if not (os.path.exists(EXT_IDX) and os.path.exists(EXT_META)): | |
return [] | |
return _search(EXT_IDX, EXT_META, query, k) | |
# ---------- Ensure / Union ---------- | |
def ensure_indexes(): | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
# Tarot | |
if _should_rebuild([TAROT_JSON], [TAROT_IDX, TAROT_META]): | |
try: | |
_build_tarot() | |
except Exception as e: | |
print("[rag_utils] build tarot failed:", e) | |
# Numerology | |
if _should_rebuild([NUM_JSON], [NUM_IDX, NUM_META]): | |
try: | |
_build_num() | |
except Exception as e: | |
print("[rag_utils] build numerology failed:", e) | |
# External(可選) | |
if os.path.exists(EXT_JSONL) and _should_rebuild([EXT_JSONL], [EXT_IDX, EXT_META]): | |
try: | |
_build_external() | |
except Exception as e: | |
print("[rag_utils] build external failed:", e) | |
def search_union(query: str, k_each: int = 2) -> List[Dict]: | |
"""把 Tarot + Numerology + External 合併(各取 k_each),再依分數排序。""" | |
ensure_indexes() | |
out = [] | |
out += search_tarot(query, k_each) | |
out += search_numerology(query, k_each) | |
out += search_external(query, k_each) | |
# 排序:score 高在前 | |
out.sort(key=lambda x: x.get("score", 0.0), reverse=True) | |
return out[: max(3, k_each)] # 至少回 3 則 |