File size: 11,343 Bytes
64fd9b7
edc48fd
 
ebbe4db
64fd9b7
 
edc48fd
64fd9b7
 
 
edc48fd
64fd9b7
edc48fd
 
64fd9b7
 
edc48fd
 
 
 
64fd9b7
a46e32d
 
6b6b475
 
88d2e91
 
 
 
a46e32d
ebbe4db
a6ffef9
ebbe4db
 
40a908e
ebbe4db
 
 
a46e32d
 
 
6b6b475
40a908e
ebbe4db
 
6b6b475
ebbe4db
 
6b6b475
ebbe4db
6b6b475
 
 
 
 
 
 
 
 
 
 
 
 
 
edc48fd
07f735f
 
 
 
 
 
40a908e
07f735f
 
 
 
 
 
 
a6ffef9
07f735f
 
88d2e91
07f735f
 
a6ffef9
88d2e91
07f735f
88d2e91
07f735f
88d2e91
07f735f
 
 
 
88d2e91
 
07f735f
 
 
64fd9b7
edc48fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6b6b475
 
64fd9b7
40a908e
64fd9b7
 
edc48fd
64fd9b7
edc48fd
 
 
 
64fd9b7
edc48fd
 
6b6b475
 
edc48fd
6b6b475
64fd9b7
edc48fd
64fd9b7
 
 
 
a6ffef9
88d2e91
6b6b475
88d2e91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
edc48fd
88d2e91
 
 
 
64fd9b7
 
 
 
 
40a908e
64fd9b7
40a908e
 
ebbe4db
edc48fd
64fd9b7
 
 
 
 
ebbe4db
edc48fd
ebbe4db
 
 
edc48fd
64fd9b7
 
ebbe4db
 
64fd9b7
6b6b475
 
 
 
 
 
 
 
 
 
 
 
70b60a8
6b6b475
 
 
 
88d2e91
40a908e
 
88d2e91
40a908e
88d2e91
 
f06409c
88d2e91
 
edc48fd
ebbe4db
 
07f735f
70b60a8
 
 
88d2e91
 
70b60a8
a6ffef9
 
70b60a8
88d2e91
6b6b475
07f735f
70b60a8
f06409c
a6ffef9
 
40a908e
88d2e91
6b6b475
f06409c
6b6b475
 
ebbe4db
88d2e91
ebbe4db
07f735f
 
ebbe4db
a6ffef9
ebbe4db
 
 
 
 
a6ffef9
ebbe4db
 
6b6b475
70b60a8
ebbe4db
 
 
 
 
a6ffef9
07f735f
 
 
f06409c
a46e32d
 
ebbe4db
 
 
edc48fd
ebbe4db
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# app/rag_system.py
from __future__ import annotations

import os, re
from pathlib import Path
from typing import List, Tuple

import faiss
import numpy as np
from pypdf import PdfReader
from sentence_transformers import SentenceTransformer

ROOT_DIR = Path(__file__).resolve().parent.parent
DATA_DIR = ROOT_DIR / "data"
UPLOAD_DIR = DATA_DIR / "uploads"
INDEX_DIR = DATA_DIR / "index"
CACHE_DIR = Path(os.getenv("HF_HOME", str(ROOT_DIR / ".cache")))
for d in (DATA_DIR, UPLOAD_DIR, INDEX_DIR, CACHE_DIR):
    d.mkdir(parents=True, exist_ok=True)

MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
OUTPUT_LANG = os.getenv("OUTPUT_LANG", "en").lower()

AZ_CHARS = set("əğıöşçüİıĞÖŞÇÜƏ")
NUM_TOK_RE = re.compile(r"\b(\d+[.,]?\d*|%|m²|azn|usd|eur|set|mt)\b", re.IGNORECASE)
GENERIC_Q_RE = re.compile(
    r"(what\s+is\s+(it|this|the\s+document)\s+about\??|what\s+is\s+about\??|summary|overview)",
    re.IGNORECASE,
)

def _split_sentences(text: str) -> List[str]:
    return [s.strip() for s in re.split(r'(?<=[.!?])\s+|[\r\n]+', text) if s.strip()]

def _mostly_numeric(s: str) -> bool:
    alnum = [c for c in s if s and c.isalnum()]
    if not alnum:
        return True
    digits = sum(c.isdigit() for c in alnum)
    return digits / max(1, len(alnum)) > 0.3

def _tabular_like(s: str) -> bool:
    hits = len(NUM_TOK_RE.findall(s))
    return hits >= 4 or len(s) < 15

def _clean_for_summary(text: str) -> str:
    out = []
    for ln in text.splitlines():
        t = " ".join(ln.split())
        if not t or _mostly_numeric(t) or _tabular_like(t):
            continue
        out.append(t)
    return " ".join(out)

def _sim_jaccard(a: str, b: str) -> float:
    aw = set(a.lower().split())
    bw = set(b.lower().split())
    if not aw or not bw:
        return 0.0
    return len(aw & bw) / len(aw | bw)

def _looks_azerbaijani(s: str) -> bool:
    has_az = any(ch in AZ_CHARS for ch in s)
    non_ascii_ratio = sum(ord(c) > 127 for c in s) / max(1, len(s))
    return has_az or non_ascii_ratio > 0.15

def _non_ascii_ratio(s: str) -> float:
    return sum(ord(c) > 127 for c in s) / max(1, len(s))

def _keyword_summary_en(contexts: List[str]) -> List[str]:
    text = " ".join(contexts).lower()
    bullets: List[str] = []

    def add(b: str):
        if b not in bullets:
            bullets.append(b)

    if ("şüşə" in text) or ("ara kəsm" in text) or ("s/q" in text):
        add("Removal and re-installation of glass partitions in sanitary areas.")
    if "divar kağız" in text:
        add("Wallpaper repair or replacement; some areas replaced with plaster and paint.")
    if ("alçı boya" in text) or ("boya işi" in text) or ("plaster" in text) or ("boya" in text):
        add("Wall plastering and painting works.")
    if "seramik" in text or "ceramic" in text:
        add("Ceramic tiling works (including grouting).")
    if ("dilatasyon" in text) or ("ar 153" in text) or ("ar153" in text):
        add("Installation of AR 153–050 floor expansion joint profile with accessories and insulation.")
    if "daş yunu" in text or "rock wool" in text:
        add("Rock wool insulation installed where required.")
    if ("sütunlarda" in text) or ("üzlüyün" in text) or ("cladding" in text):
        add("Repair of wall cladding on columns.")
    if ("m²" in text) or ("ədəd" in text) or ("azn" in text) or ("unit price" in text):
        add("Bill of quantities style lines with unit prices and measures (m², pcs).")

    if not bullets:
        bullets = [
            "The document appears to be a bill of quantities or a structured list of works.",
            "Scope likely includes demolition/reinstallation, finishing (plaster & paint), tiling, and profiles.",
        ]
    return bullets[:5]

class SimpleRAG:
    def __init__(
        self,
        index_path: Path = INDEX_DIR / "faiss.index",
        meta_path: Path = INDEX_DIR / "meta.npy",
        model_name: str = MODEL_NAME,
        cache_dir: Path = CACHE_DIR,
    ):
        self.index_path = Path(index_path)
        self.meta_path = Path(meta_path)
        self.model_name = model_name
        self.cache_dir = Path(cache_dir)

        self.model = SentenceTransformer(self.model_name, cache_folder=str(self.cache_dir))
        self.embed_dim = self.model.get_sentence_embedding_dimension()

        self._translator = None  # lazy
        self.index: faiss.Index = faiss.IndexFlatIP(self.embed_dim)
        self.chunks: List[str] = []
        self.last_added: List[str] = []
        self._load()

    def _load(self) -> None:
        if self.meta_path.exists():
            try:
                self.chunks = np.load(self.meta_path, allow_pickle=True).tolist()
            except Exception:
                self.chunks = []
        if self.index_path.exists():
            try:
                idx = faiss.read_index(str(self.index_path))
                if getattr(idx, "d", None) == self.embed_dim:
                    self.index = idx
            except Exception:
                pass

    def _persist(self) -> None:
        faiss.write_index(self.index, str(self.index_path))
        np.save(self.meta_path, np.array(self.chunks, dtype=object))

    @staticmethod
    def _pdf_to_texts(pdf_path: Path, step: int = 1400) -> List[str]:
        # 1) pypdf
        pages: List[str] = []
        try:
            reader = PdfReader(str(pdf_path))
            for p in reader.pages:
                t = p.extract_text() or ""
                if t.strip():
                    pages.append(t)
        except Exception:
            pages = []

        full = " ".join(pages).strip()
        if not full:
            # 2) pdfminer fallback
            try:
                from pdfminer.high_level import extract_text as pdfminer_extract_text
                full = (pdfminer_extract_text(str(pdf_path)) or "").strip()
            except Exception:
                full = ""

        if not full:
            return []

        chunks: List[str] = []
        for i in range(0, len(full), step):
            part = full[i : i + step].strip()
            if part:
                chunks.append(part)
        return chunks

    def add_pdf(self, pdf_path: Path) -> int:
        texts = self._pdf_to_texts(pdf_path)
        if not texts:
            # IMPORTANT: do NOT clobber last_added if this PDF had no extractable text
            return 0

        self.last_added = texts[:]  # only set if we actually extracted text
        emb = self.model.encode(texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=False)
        self.index.add(emb.astype(np.float32))
        self.chunks.extend(texts)
        self._persist()
        return len(texts)

    def search(self, query: str, k: int = 5) -> List[Tuple[str, float]]:
        if self.index is None or self.index.ntotal == 0:
            return []
        q = self.model.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        D, I = self.index.search(q, min(k, max(1, self.index.ntotal)))
        out: List[Tuple[str, float]] = []
        if I.size > 0 and self.chunks:
            for idx, score in zip(I[0], D[0]):
                if 0 <= idx < len(self.chunks):
                    out.append((self.chunks[idx], float(score)))
        return out

    def _translate_to_en(self, texts: List[str]) -> List[str]:
        if not texts:
            return texts
        try:
            from transformers import pipeline
            if self._translator is None:
                self._translator = pipeline(
                    "translation",
                    model="Helsinki-NLP/opus-mt-az-en",
                    cache_dir=str(self.cache_dir),
                    device=-1,
                )
            outs = self._translator(texts, max_length=800)
            return [o["translation_text"].strip() for o in outs]
        except Exception:
            return texts

    def _prepare_contexts(self, question: str, contexts: List[str]) -> List[str]:
        # Generic question or empty search → use last uploaded file snippets
        generic = (len((question or "").split()) <= 5) or bool(GENERIC_Q_RE.search(question or ""))
        if (not contexts or generic) and self.last_added:
            return self.last_added[:5]
        return contexts

    def synthesize_answer(self, question: str, contexts: List[str], max_sentences: int = 4) -> str:
        contexts = self._prepare_contexts(question, contexts)

        if not contexts:
            return "No relevant context found. Please upload a PDF or ask a more specific question."

        # 1) Clean & keep top contexts
        cleaned_contexts = [_clean_for_summary(c) for c in contexts[:5]]
        cleaned_contexts = [c for c in cleaned_contexts if len(c) > 40]
        if not cleaned_contexts:
            bullets = _keyword_summary_en(contexts[:5])
            return "Answer (based on document context):\n" + "\n".join(f"- {b}" for b in bullets)

        # 2) Pre-translate paragraphs to EN when target is EN
        translated = self._translate_to_en(cleaned_contexts) if OUTPUT_LANG == "en" else cleaned_contexts

        # 3) Split into candidate sentences and filter
        candidates: List[str] = []
        for para in translated:
            for s in _split_sentences(para):
                w = s.split()
                if not (6 <= len(w) <= 60):
                    continue
                # full sentence requirement: punctuation at end OR sufficiently long
                if not re.search(r"[.!?](?:[\"'])?$", s) and len(w) < 18:
                    continue
                if _tabular_like(s) or _mostly_numeric(s):
                    continue
                candidates.append(" ".join(w))

        # 4) Fallback if no sentences
        if not candidates:
            bullets = _keyword_summary_en(cleaned_contexts)
            return "Answer (based on document context):\n" + "\n".join(f"- {b}" for b in bullets)

        # 5) Rank by similarity to the question
        q_emb = self.model.encode([question], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        cand_emb = self.model.encode(candidates, convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
        scores = (cand_emb @ q_emb.T).ravel()
        order = np.argsort(-scores)

        # 6) Aggressive near-duplicate removal
        selected: List[str] = []
        for i in order:
            s = candidates[i].strip()
            if any(_sim_jaccard(s, t) >= 0.90 for t in selected):
                continue
            selected.append(s)
            if len(selected) >= max_sentences:
                break

        # 7) If still looks non-English, use keyword fallback
        if not selected or (sum(_non_ascii_ratio(s) for s in selected) / len(selected) > 0.10):
            bullets = _keyword_summary_en(cleaned_contexts)
            return "Answer (based on document context):\n" + "\n".join(f"- {b}" for b in bullets)

        bullets = "\n".join(f"- {s}" for s in selected)
        return f"Answer (based on document context):\n{bullets}"

def synthesize_answer(question: str, contexts: List[str]) -> str:
    return SimpleRAG().synthesize_answer(question, contexts)

__all__ = ["SimpleRAG", "synthesize_answer", "DATA_DIR", "UPLOAD_DIR", "INDEX_DIR", "CACHE_DIR", "MODEL_NAME"]