Spaces:
Sleeping
Sleeping
Update rag_search.py
Browse files- rag_search.py +55 -42
rag_search.py
CHANGED
@@ -1,22 +1,21 @@
|
|
1 |
"""
|
2 |
-
Step
|
3 |
|
4 |
-
|
5 |
-
-
|
6 |
-
-
|
7 |
-
-
|
8 |
-
- format_sources(hits): collapses to a neat "Sources:" list
|
9 |
"""
|
10 |
|
11 |
from pathlib import Path
|
12 |
import json
|
13 |
-
|
|
|
14 |
|
15 |
import faiss
|
16 |
import numpy as np
|
17 |
from sentence_transformers import SentenceTransformer
|
18 |
|
19 |
-
# Paths must match indexer.py
|
20 |
DATA_DIR = Path("data")
|
21 |
INDEX_PATH = DATA_DIR / "index.faiss"
|
22 |
META_PATH = DATA_DIR / "meta.json"
|
@@ -24,58 +23,31 @@ EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
|
|
24 |
|
25 |
|
26 |
class RAGSearcher:
|
27 |
-
"""
|
28 |
-
Loads the FAISS index + metadata and performs semantic search.
|
29 |
-
If files are missing, it raises a RuntimeError (the UI will catch this and show a friendly message).
|
30 |
-
"""
|
31 |
-
|
32 |
def __init__(self):
|
33 |
if not INDEX_PATH.exists() or not META_PATH.exists():
|
34 |
raise RuntimeError(
|
35 |
"Index not found. Build it first with the 'Build/Refresh Index' button."
|
36 |
)
|
37 |
-
# Load FAISS index and metadata
|
38 |
self.index = faiss.read_index(str(INDEX_PATH))
|
39 |
self.metas: List[Dict] = json.loads(META_PATH.read_text(encoding="utf-8"))
|
40 |
-
# Load the embedding model (small + fast)
|
41 |
self.model = SentenceTransformer(EMBED_MODEL)
|
42 |
|
43 |
def search(self, query: str, k: int = 6) -> List[Dict]:
|
44 |
-
"""
|
45 |
-
Returns top-k hits with score, text, and meta fields.
|
46 |
-
- score ~ cosine similarity (because we normalized at indexing time)
|
47 |
-
"""
|
48 |
if not query or len(query.strip()) < 3:
|
49 |
return []
|
50 |
-
|
51 |
-
# Encode the query to the same space used by the index
|
52 |
-
qvec = self.model.encode(
|
53 |
-
[query], convert_to_numpy=True, normalize_embeddings=True
|
54 |
-
)
|
55 |
scores, idxs = self.index.search(qvec, k)
|
56 |
-
|
57 |
hits: List[Dict] = []
|
58 |
for score, idx in zip(scores[0], idxs[0]):
|
59 |
if idx < 0:
|
60 |
continue
|
61 |
meta = self.metas[int(idx)]
|
62 |
text = Path(meta["chunk_file"]).read_text(encoding="utf-8")
|
63 |
-
hits.append(
|
64 |
-
{
|
65 |
-
"score": float(score),
|
66 |
-
"text": text,
|
67 |
-
"meta": meta, # contains: file, chunk_file, chunk_id
|
68 |
-
}
|
69 |
-
)
|
70 |
return hits
|
71 |
|
72 |
|
73 |
def summarize_hits(hits: List[Dict], max_points: int = 4) -> str:
|
74 |
-
"""
|
75 |
-
Very small, safe extractive "summary":
|
76 |
-
- Take the first few hits and slice the first ~350 chars of each as bullet points.
|
77 |
-
- This is a placeholder. In Step 5, we'll replace with Evo synthesis.
|
78 |
-
"""
|
79 |
if not hits:
|
80 |
return "I couldn't find relevant information. Try rephrasing your question."
|
81 |
bullets = []
|
@@ -88,13 +60,9 @@ def summarize_hits(hits: List[Dict], max_points: int = 4) -> str:
|
|
88 |
|
89 |
|
90 |
def format_sources(hits: List[Dict], max_files: int = 5) -> str:
|
91 |
-
"""
|
92 |
-
Collapses the hit list to unique source files, and returns a short bulleted list.
|
93 |
-
"""
|
94 |
if not hits:
|
95 |
return "Sources: (none)"
|
96 |
-
seen = []
|
97 |
-
order = []
|
98 |
for h in hits:
|
99 |
f = h["meta"]["file"]
|
100 |
if f not in seen:
|
@@ -104,3 +72,48 @@ def format_sources(hits: List[Dict], max_files: int = 5) -> str:
|
|
104 |
break
|
105 |
bullets = [f"- `{Path(f).name}`" for f in order]
|
106 |
return "Sources:\n" + "\n".join(bullets)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
"""
|
2 |
+
Step 6: Retrieval helper + link extraction.
|
3 |
|
4 |
+
New in Step 6 (Objective):
|
5 |
+
- extract_links(hits): finds http/https URLs inside retrieved chunks
|
6 |
+
- split_form_links(links): filters links that look like "forms" (name or path)
|
7 |
+
- links_markdown(title, links): renders a clickable list for the UI
|
|
|
8 |
"""
|
9 |
|
10 |
from pathlib import Path
|
11 |
import json
|
12 |
+
import re
|
13 |
+
from typing import List, Dict, Tuple
|
14 |
|
15 |
import faiss
|
16 |
import numpy as np
|
17 |
from sentence_transformers import SentenceTransformer
|
18 |
|
|
|
19 |
DATA_DIR = Path("data")
|
20 |
INDEX_PATH = DATA_DIR / "index.faiss"
|
21 |
META_PATH = DATA_DIR / "meta.json"
|
|
|
23 |
|
24 |
|
25 |
class RAGSearcher:
|
|
|
|
|
|
|
|
|
|
|
26 |
def __init__(self):
|
27 |
if not INDEX_PATH.exists() or not META_PATH.exists():
|
28 |
raise RuntimeError(
|
29 |
"Index not found. Build it first with the 'Build/Refresh Index' button."
|
30 |
)
|
|
|
31 |
self.index = faiss.read_index(str(INDEX_PATH))
|
32 |
self.metas: List[Dict] = json.loads(META_PATH.read_text(encoding="utf-8"))
|
|
|
33 |
self.model = SentenceTransformer(EMBED_MODEL)
|
34 |
|
35 |
def search(self, query: str, k: int = 6) -> List[Dict]:
|
|
|
|
|
|
|
|
|
36 |
if not query or len(query.strip()) < 3:
|
37 |
return []
|
38 |
+
qvec = self.model.encode([query], convert_to_numpy=True, normalize_embeddings=True)
|
|
|
|
|
|
|
|
|
39 |
scores, idxs = self.index.search(qvec, k)
|
|
|
40 |
hits: List[Dict] = []
|
41 |
for score, idx in zip(scores[0], idxs[0]):
|
42 |
if idx < 0:
|
43 |
continue
|
44 |
meta = self.metas[int(idx)]
|
45 |
text = Path(meta["chunk_file"]).read_text(encoding="utf-8")
|
46 |
+
hits.append({"score": float(score), "text": text, "meta": meta})
|
|
|
|
|
|
|
|
|
|
|
|
|
47 |
return hits
|
48 |
|
49 |
|
50 |
def summarize_hits(hits: List[Dict], max_points: int = 4) -> str:
|
|
|
|
|
|
|
|
|
|
|
51 |
if not hits:
|
52 |
return "I couldn't find relevant information. Try rephrasing your question."
|
53 |
bullets = []
|
|
|
60 |
|
61 |
|
62 |
def format_sources(hits: List[Dict], max_files: int = 5) -> str:
|
|
|
|
|
|
|
63 |
if not hits:
|
64 |
return "Sources: (none)"
|
65 |
+
seen, order = [], []
|
|
|
66 |
for h in hits:
|
67 |
f = h["meta"]["file"]
|
68 |
if f not in seen:
|
|
|
72 |
break
|
73 |
bullets = [f"- `{Path(f).name}`" for f in order]
|
74 |
return "Sources:\n" + "\n".join(bullets)
|
75 |
+
|
76 |
+
|
77 |
+
# ---- NEW: Link extraction (Objective)
|
78 |
+
|
79 |
+
_URL_RE = re.compile(r"(https?://[^\s\)\]]+)", re.IGNORECASE)
|
80 |
+
|
81 |
+
def extract_links(hits: List[Dict], max_links: int = 12) -> List[str]:
|
82 |
+
"""
|
83 |
+
Scan the retrieved text for URLs (http/https).
|
84 |
+
- Deduplicate while preserving order.
|
85 |
+
- Return up to max_links.
|
86 |
+
"""
|
87 |
+
seen = set()
|
88 |
+
ordered: List[str] = []
|
89 |
+
for h in hits:
|
90 |
+
for m in _URL_RE.findall(h["text"]):
|
91 |
+
url = m.strip().rstrip(".,);]")
|
92 |
+
if url not in seen:
|
93 |
+
seen.add(url)
|
94 |
+
ordered.append(url)
|
95 |
+
if len(ordered) >= max_links:
|
96 |
+
return ordered
|
97 |
+
return ordered
|
98 |
+
|
99 |
+
_FORM_HINTS = ("form", "application", "apply", "download", "pdf")
|
100 |
+
|
101 |
+
def split_form_links(links: List[str]) -> Tuple[List[str], List[str]]:
|
102 |
+
"""
|
103 |
+
Separate links that look like "forms" based on common keywords in
|
104 |
+
the URL path or filename. Returns (form_links, other_links).
|
105 |
+
"""
|
106 |
+
forms, others = [], []
|
107 |
+
for u in links:
|
108 |
+
low = u.lower()
|
109 |
+
if any(h in low for h in _FORM_HINTS):
|
110 |
+
forms.append(u)
|
111 |
+
else:
|
112 |
+
others.append(u)
|
113 |
+
return forms, others
|
114 |
+
|
115 |
+
def links_markdown(title: str, links: List[str]) -> str:
|
116 |
+
if not links:
|
117 |
+
return f"**{title}:** (none)"
|
118 |
+
items = "\n".join([f"- [{u}]({u})" for u in links])
|
119 |
+
return f"**{title}:**\n{items}"
|