evo-gov-copilot-mu / rag_search.py
HemanM's picture
Update rag_search.py
02be56c verified
"""
Step 6: Retrieval helper + link extraction.
New in Step 6 (Objective):
- extract_links(hits): finds http/https URLs inside retrieved chunks
- split_form_links(links): filters links that look like "forms" (name or path)
- links_markdown(title, links): renders a clickable list for the UI
"""
from pathlib import Path
import json
import re
from typing import List, Dict, Tuple
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
DATA_DIR = Path("data")
INDEX_PATH = DATA_DIR / "index.faiss"
META_PATH = DATA_DIR / "meta.json"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
class RAGSearcher:
def __init__(self):
if not INDEX_PATH.exists() or not META_PATH.exists():
raise RuntimeError(
"Index not found. Build it first with the 'Build/Refresh Index' button."
)
self.index = faiss.read_index(str(INDEX_PATH))
self.metas: List[Dict] = json.loads(META_PATH.read_text(encoding="utf-8"))
self.model = SentenceTransformer(EMBED_MODEL)
def search(self, query: str, k: int = 6) -> List[Dict]:
if not query or len(query.strip()) < 3:
return []
qvec = self.model.encode([query], convert_to_numpy=True, normalize_embeddings=True)
scores, idxs = self.index.search(qvec, k)
hits: List[Dict] = []
for score, idx in zip(scores[0], idxs[0]):
if idx < 0:
continue
meta = self.metas[int(idx)]
text = Path(meta["chunk_file"]).read_text(encoding="utf-8")
hits.append({"score": float(score), "text": text, "meta": meta})
return hits
def summarize_hits(hits: List[Dict], max_points: int = 4) -> str:
if not hits:
return "I couldn't find relevant information. Try rephrasing your question."
bullets = []
for h in hits[:max_points]:
snippet = " ".join(h["text"].strip().split())
if len(snippet) > 350:
snippet = snippet[:350] + "..."
bullets.append(f"- {snippet}")
return "\n".join(bullets)
def format_sources(hits: List[Dict], max_files: int = 5) -> str:
if not hits:
return "Sources: (none)"
seen, order = [], []
for h in hits:
f = h["meta"]["file"]
if f not in seen:
seen.append(f)
order.append(f)
if len(order) >= max_files:
break
bullets = [f"- `{Path(f).name}`" for f in order]
return "Sources:\n" + "\n".join(bullets)
# ---- NEW: Link extraction (Objective)
_URL_RE = re.compile(r"(https?://[^\s\)\]]+)", re.IGNORECASE)
def extract_links(hits: List[Dict], max_links: int = 12) -> List[str]:
"""
Scan the retrieved text for URLs (http/https).
- Deduplicate while preserving order.
- Return up to max_links.
"""
seen = set()
ordered: List[str] = []
for h in hits:
for m in _URL_RE.findall(h["text"]):
url = m.strip().rstrip(".,);]")
if url not in seen:
seen.add(url)
ordered.append(url)
if len(ordered) >= max_links:
return ordered
return ordered
_FORM_HINTS = ("form", "application", "apply", "download", "pdf")
def split_form_links(links: List[str]) -> Tuple[List[str], List[str]]:
"""
Separate links that look like "forms" based on common keywords in
the URL path or filename. Returns (form_links, other_links).
"""
forms, others = [], []
for u in links:
low = u.lower()
if any(h in low for h in _FORM_HINTS):
forms.append(u)
else:
others.append(u)
return forms, others
def links_markdown(title: str, links: List[str]) -> str:
if not links:
return f"**{title}:** (none)"
items = "\n".join([f"- [{u}]({u})" for u in links])
return f"**{title}:**\n{items}"