File size: 3,877 Bytes
9ee54df
02be56c
9ee54df
02be56c
 
 
 
9ee54df
 
 
 
02be56c
 
9ee54df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02be56c
9ee54df
 
 
 
 
 
 
02be56c
9ee54df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02be56c
9ee54df
 
 
 
 
 
 
 
 
02be56c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Step 6: Retrieval helper + link extraction.

New in Step 6 (Objective):
- extract_links(hits): finds http/https URLs inside retrieved chunks
- split_form_links(links): filters links that look like "forms" (name or path)
- links_markdown(title, links): renders a clickable list for the UI
"""

from pathlib import Path
import json
import re
from typing import List, Dict, Tuple

import faiss
import numpy as np
from sentence_transformers import SentenceTransformer

DATA_DIR = Path("data")
INDEX_PATH = DATA_DIR / "index.faiss"
META_PATH = DATA_DIR / "meta.json"
EMBED_MODEL = "sentence-transformers/all-MiniLM-L6-v2"


class RAGSearcher:
    def __init__(self):
        if not INDEX_PATH.exists() or not META_PATH.exists():
            raise RuntimeError(
                "Index not found. Build it first with the 'Build/Refresh Index' button."
            )
        self.index = faiss.read_index(str(INDEX_PATH))
        self.metas: List[Dict] = json.loads(META_PATH.read_text(encoding="utf-8"))
        self.model = SentenceTransformer(EMBED_MODEL)

    def search(self, query: str, k: int = 6) -> List[Dict]:
        if not query or len(query.strip()) < 3:
            return []
        qvec = self.model.encode([query], convert_to_numpy=True, normalize_embeddings=True)
        scores, idxs = self.index.search(qvec, k)
        hits: List[Dict] = []
        for score, idx in zip(scores[0], idxs[0]):
            if idx < 0:
                continue
            meta = self.metas[int(idx)]
            text = Path(meta["chunk_file"]).read_text(encoding="utf-8")
            hits.append({"score": float(score), "text": text, "meta": meta})
        return hits


def summarize_hits(hits: List[Dict], max_points: int = 4) -> str:
    if not hits:
        return "I couldn't find relevant information. Try rephrasing your question."
    bullets = []
    for h in hits[:max_points]:
        snippet = " ".join(h["text"].strip().split())
        if len(snippet) > 350:
            snippet = snippet[:350] + "..."
        bullets.append(f"- {snippet}")
    return "\n".join(bullets)


def format_sources(hits: List[Dict], max_files: int = 5) -> str:
    if not hits:
        return "Sources: (none)"
    seen, order = [], []
    for h in hits:
        f = h["meta"]["file"]
        if f not in seen:
            seen.append(f)
            order.append(f)
        if len(order) >= max_files:
            break
    bullets = [f"- `{Path(f).name}`" for f in order]
    return "Sources:\n" + "\n".join(bullets)


# ---- NEW: Link extraction (Objective)

_URL_RE = re.compile(r"(https?://[^\s\)\]]+)", re.IGNORECASE)

def extract_links(hits: List[Dict], max_links: int = 12) -> List[str]:
    """
    Scan the retrieved text for URLs (http/https).
    - Deduplicate while preserving order.
    - Return up to max_links.
    """
    seen = set()
    ordered: List[str] = []
    for h in hits:
        for m in _URL_RE.findall(h["text"]):
            url = m.strip().rstrip(".,);]")
            if url not in seen:
                seen.add(url)
                ordered.append(url)
            if len(ordered) >= max_links:
                return ordered
    return ordered

_FORM_HINTS = ("form", "application", "apply", "download", "pdf")

def split_form_links(links: List[str]) -> Tuple[List[str], List[str]]:
    """
    Separate links that look like "forms" based on common keywords in
    the URL path or filename. Returns (form_links, other_links).
    """
    forms, others = [], []
    for u in links:
        low = u.lower()
        if any(h in low for h in _FORM_HINTS):
            forms.append(u)
        else:
            others.append(u)
    return forms, others

def links_markdown(title: str, links: List[str]) -> str:
    if not links:
        return f"**{title}:** (none)"
    items = "\n".join([f"- [{u}]({u})" for u in links])
    return f"**{title}:**\n{items}"