Spaces:
Running
Running
# mcp_server.py (最小骨架) | |
import json, os | |
from modelcontextprotocol import Server | |
import rag_utils | |
srv = Server(name="soulcompass-mcp") | |
def tarot_search(query: str, k: int = 3): | |
hits = rag_utils.search_tarot(query, k) | |
return {"hits": hits} | |
def num_search(query: str, k: int = 3): | |
hits = rag_utils.search_numerology(query, k) | |
return {"hits": hits} | |
def union_search(query: str, k_each: int = 2): | |
hits = rag_utils.search_union(query, k_each) | |
return {"hits": hits} | |
# 以 UID 區分使用者資料(沿用你 app.py 的結構) | |
DATA_ROOT = "data/users" | |
def journal_append(uid: str, entry: dict): | |
user_dir = os.path.join(DATA_ROOT, uid); os.makedirs(user_dir, exist_ok=True) | |
path = os.path.join(user_dir, "journal.jsonl") | |
with open(path, "a", encoding="utf-8") as f: | |
f.write(json.dumps(entry, ensure_ascii=False) + "\n") | |
return {"ok": True} | |
def journal_query(uid: str, limit: int = 50): | |
path = os.path.join(DATA_ROOT, uid, "journal.jsonl") | |
out = [] | |
if os.path.exists(path): | |
for line in open(path, "r", encoding="utf-8").read().splitlines()[-limit:]: | |
try: out.append(json.loads(line)) | |
except: pass | |
return {"entries": out} | |
if __name__ == "__main__": | |
# 以 stdio 方式啟動(Space 內最簡單) | |
srv.run_stdio() | |