Spaces:
Sleeping
Sleeping
import time | |
import numpy as np | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
class RAGPipeline: | |
def __init__(self, logger): | |
self.logger = logger | |
self.logger("[RAG] Initializing tokenizer and model...") | |
self.tokenizer = AutoTokenizer.from_pretrained("aubmindlab/aragpt2-mega", trust_remote_code=True) | |
self.generator = AutoModelForCausalLM.from_pretrained("aubmindlab/aragpt2-mega", trust_remote_code=True) | |
self.chunk_embeddings = [] | |
self.index = [] | |
self.logger("[RAG] Initialization done.") | |
def build_index(self, chunks): | |
start = time.time() | |
self.logger(f"[RAG] Building index for {len(chunks)} chunks...") | |
self.chunk_embeddings = [] | |
self.index = [] | |
for i, chunk in enumerate(chunks): | |
embedding = self._dummy_embedding(chunk) | |
self.chunk_embeddings.append(embedding) | |
self.index.append(chunk) | |
if (i+1) % 10 == 0 or (i+1) == len(chunks): | |
self.logger(f"[RAG] Processed {i+1}/{len(chunks)} chunks.") | |
self.chunk_embeddings = np.array(self.chunk_embeddings) | |
dim = self.chunk_embeddings.shape[1] if len(self.chunk_embeddings) > 0 else 0 | |
elapsed = time.time() - start | |
self.logger(f"[RAG] Index built with dimension {dim} in {elapsed:.2f}s.") | |
return "Index built successfully." | |
def _dummy_embedding(self, text): | |
return np.random.rand(768) | |
def generate_answer(self, question, top_k=3): | |
start = time.time() | |
self.logger(f"[RAG] Generating answer for question:\n{question}") | |
if len(self.index) == 0: | |
self.logger("[RAG] Warning: index is empty, please build index first.") | |
return "لم يتم بناء الفهرس بعد.", [] | |
# بحث مبسط لأقرب النصوص (dummy - عشوائي) | |
passages = self.index[:top_k] | |
prompt = question + "\n\nالمراجع:\n" + "\n".join(passages) | |
inputs = self.tokenizer(prompt, return_tensors="pt") | |
output = self.generator.generate(inputs.input_ids, max_new_tokens=150, do_sample=True) | |
response = self.tokenizer.decode(output[0], skip_special_tokens=True) | |
elapsed = time.time() - start | |
self.logger(f"[RAG] Answer generated in {elapsed:.2f}s.") | |
return response, passages | |