File size: 7,357 Bytes
64fd9b7
edc48fd
 
ebbe4db
64fd9b7
 
edc48fd
64fd9b7
 
 
edc48fd
64fd9b7
edc48fd
 
64fd9b7
 
edc48fd
 
 
 
64fd9b7
 
a46e32d
 
 
 
 
 
ebbe4db
 
 
 
a46e32d
ebbe4db
 
 
 
a46e32d
 
 
 
 
 
ebbe4db
 
 
 
 
a46e32d
ebbe4db
a46e32d
ebbe4db
 
 
edc48fd
64fd9b7
edc48fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a46e32d
 
 
edc48fd
64fd9b7
 
 
a46e32d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edc48fd
64fd9b7
edc48fd
 
 
 
64fd9b7
edc48fd
 
ebbe4db
edc48fd
 
64fd9b7
edc48fd
64fd9b7
edc48fd
64fd9b7
 
 
 
edc48fd
64fd9b7
ebbe4db
 
 
64fd9b7
ebbe4db
edc48fd
ebbe4db
64fd9b7
ebbe4db
 
 
64fd9b7
 
 
 
 
 
ebbe4db
edc48fd
64fd9b7
 
 
 
 
ebbe4db
edc48fd
ebbe4db
 
 
edc48fd
64fd9b7
 
ebbe4db
 
64fd9b7
ebbe4db
edc48fd
ebbe4db
 
a46e32d
ebbe4db
 
 
 
a46e32d
ebbe4db
 
 
a46e32d
ebbe4db
a46e32d
ebbe4db
 
 
 
 
a46e32d
ebbe4db
 
 
 
 
 
 
 
 
 
 
 
a46e32d
 
 
 
 
 
ebbe4db
 
 
 
edc48fd
 
ebbe4db
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# app/rag_system.py
from __future__ import annotations

import os, re
from pathlib import Path
from typing import List, Tuple

import faiss
import numpy as np
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer

ROOT_DIR = Path(__file__).resolve().parent.parent
DATA_DIR = ROOT_DIR / "data"
UPLOAD_DIR = DATA_DIR / "uploads"
INDEX_DIR = DATA_DIR / "index"
CACHE_DIR = Path(os.getenv("HF_HOME", str(ROOT_DIR / ".cache")))
for d in (DATA_DIR, UPLOAD_DIR, INDEX_DIR, CACHE_DIR):
    d.mkdir(parents=True, exist_ok=True)

MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")

# Output dili – EN üçün "en" saxla (default en)
OUTPUT_LANG = os.getenv("OUTPUT_LANG", "en").lower()

# --- util funksiyalar ---
NUM_PAT = re.compile(r"(\d+([.,]\d+)?|%|m²|AZN|usd|eur|\bset\b|\bmt\b)", re.IGNORECASE)

def _split_sentences(text: str) -> List[str]:
    return [s.strip() for s in re.split(r'(?<=[\.\!\?])\s+|[\r\n]+', text) if s.strip()]

def _mostly_numeric(s: str) -> bool:
    # daha aqressiv threshold
    alnum = [c for c in s if c.isalnum()]
    if not alnum:
        return True
    digits = sum(c.isdigit() for c in alnum)
    return digits / max(1, len(alnum)) > 0.3

def _tabular_like(s: str) -> bool:
    # rəqəmlər/ölçülər/valyuta bol olan sətirləri at
    hits = len(NUM_PAT.findall(s))
    return hits >= 2 or "Page" in s or len(s) < 20

def _clean_for_summary(text: str) -> str:
    lines = []
    for ln in text.splitlines():
        t = " ".join(ln.split())
        if not t:
            continue
        if _mostly_numeric(t) or _tabular_like(t):
            continue
        lines.append(t)
    return " ".join(lines)

class SimpleRAG:
    def __init__(
        self,
        index_path: Path = INDEX_DIR / "faiss.index",
        meta_path: Path = INDEX_DIR / "meta.npy",
        model_name: str = MODEL_NAME,
        cache_dir: Path = CACHE_DIR,
    ):
        self.index_path = Path(index_path)
        self.meta_path = Path(meta_path)
        self.model_name = model_name
        self.cache_dir = Path(cache_dir)

        self.model = SentenceTransformer(self.model_name, cache_folder=str(self.cache_dir))
        self.embed_dim = self.model.get_sentence_embedding_dimension()

        # translator lazy-load
        self._translator = None

        self.index: faiss.Index = None  # type: ignore
        self.chunks: List[str] = []
        self._load()

    # ---- translator (az->en) ----
    def _translate_to_en(self, texts: List[str]) -> List[str]:
        if OUTPUT_LANG != "en" or not texts:
            return texts
        try:
            if self._translator is None:
                from transformers import pipeline
                # Helsinki-NLP az->en
                self._translator = pipeline(
                    "translation",
                    model="Helsinki-NLP/opus-mt-az-en",
                    cache_dir=str(self.cache_dir),
                    device=-1,
                )
            outs = self._translator(texts, max_length=400)
            return [o["translation_text"] for o in outs]
        except Exception:
            # tərcümə alınmasa, orijinalı qaytar
            return texts

    def _load(self) -> None:
        if self.meta_path.exists():
            try:
                self.chunks = np.load(self.meta_path, allow_pickle=True).tolist()
            except Exception:
                self.chunks = []
        if self.index_path.exists():
            try:
                idx = faiss.read_index(str(self.index_path))
                self.index = idx if getattr(idx, "d", None) == self.embed_dim else faiss.IndexFlatIP(self.embed_dim)
            except Exception:
                self.index = faiss.IndexFlatIP(self.embed_dim)
        else:
            self.index = faiss.IndexFlatIP(self.embed_dim)

    def _persist(self) -> None:
        faiss.write_index(self.index, str(self.index_path))
        np.save(self.meta_path, np.array(self.chunks, dtype=object))

    @staticmethod
    def _pdf_to_texts(pdf_path: Path, step: int = 800) -> List[str]:
        reader = PdfReader(str(pdf_path))
        pages = []
        for p in reader.pages:
            t = p.extract_text() or ""
            if t.strip():
                pages.append(t)
        chunks: List[str] = []
        for txt in pages:
            for i in range(0, len(txt), step):
                part = txt[i:i+step].strip()
                if part:
                    chunks.append(part)
        return chunks

    def add_pdf(self, pdf_path: Path) -> int:
        texts = self._pdf_to_texts(pdf_path)
        if not texts:
            return 0
        emb = self.model.encode(texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=False)
        self.index.add(emb.astype(np.float32))
        self.chunks.extend(texts)
        self._persist()
        return len(texts)

    def search(self, query: str, k: int = 5) -> List[Tuple[str, float]]:
        if self.index is None or self.index.ntotal == 0:
            return []
        q = self.model.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        D, I = self.index.search(q, min(k, max(1, self.index.ntotal)))
        out: List[Tuple[str, float]] = []
        if I.size > 0 and self.chunks:
            for idx, score in zip(I[0], D[0]):
                if 0 <= idx < len(self.chunks):
                    out.append((self.chunks[idx], float(score)))
        return out

    def synthesize_answer(self, question: str, contexts: List[str], max_sentences: int = 5) -> str:
        if not contexts:
            return "No relevant context found. Please upload a PDF or ask a more specific question."

        # Candidate sentences (clean + split)
        candidates: List[str] = []
        for c in contexts[:5]:
            cleaned = _clean_for_summary(c)
            for s in _split_sentences(cleaned):
                if 40 <= len(s) <= 240 and not _tabular_like(s):
                    candidates.append(s)

        if not candidates:
            return "The document appears largely tabular/numeric; couldn't extract readable sentences."

        # Rank by similarity
        q_emb = self.model.encode([question], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        cand_emb = self.model.encode(candidates, convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        scores = (cand_emb @ q_emb.T).ravel()
        order = np.argsort(-scores)

        # Pick top sentences with dedup by lowercase
        selected: List[str] = []
        seen = set()
        for i in order:
            s = candidates[i].strip()
            key = s.lower()
            if key in seen:
                continue
            seen.add(key)
            selected.append(s)
            if len(selected) >= max_sentences:
                break

        # Translate to EN if needed
        if OUTPUT_LANG == "en":
            selected = self._translate_to_en(selected)

        bullets = "\n".join(f"- {s}" for s in selected)
        return f"Answer (based on document context):\n{bullets}"


def synthesize_answer(question: str, contexts: List[str]) -> str:
    return SimpleRAG().synthesize_answer(question, contexts)


__all__ = ["SimpleRAG", "synthesize_answer", "DATA_DIR", "UPLOAD_DIR", "INDEX_DIR", "CACHE_DIR", "MODEL_NAME"]