Spaces:
Build error
Build error
import os | |
import gradio as gr | |
import chromadb | |
import fitz # PyMuPDF | |
import json | |
import dspy | |
from sentence_transformers import SentenceTransformer | |
from dspy import Example, MIPROv2, Evaluate, evaluate | |
from dspy import LiteLLM | |
# تحميل التوكن من Secrets | |
HF_TOKEN = os.environ["HF_TOKEN"] | |
# تهيئة النموذج عبر LiteLLM من Hugging Face API | |
dspy.settings.configure( | |
lm=LiteLLM( | |
model="HuggingFaceH4/zephyr-7b-beta", # اختر نموذج Instruct مدعوم | |
api_base="https://api-inference.huggingface.co/v1", | |
api_key=HF_TOKEN | |
) | |
) | |
# إعداد قاعدة بيانات Chroma | |
client = chromadb.PersistentClient(path="./chroma_db") | |
col = client.get_or_create_collection(name="arabic_docs") | |
# إعداد نموذج LaBSE للتضمين العربي | |
embedder = SentenceTransformer("sentence-transformers/LaBSE") | |
def process_pdf(pdf_bytes): | |
doc = fitz.open(stream=pdf_bytes, filetype="pdf") | |
texts = [] | |
for page in doc: | |
text = page.get_text() | |
for chunk in text.split("\n\n"): | |
if len(chunk.strip()) > 50: | |
texts.append(chunk.strip()) | |
return texts | |
def ingest(pdf_file): | |
pdf_bytes = pdf_file | |
texts = process_pdf(pdf_bytes) | |
embeddings = embedder.encode(texts, show_progress_bar=True) | |
for i, (chunk, emb) in enumerate(zip(texts, embeddings)): | |
col.add(ids=[f"chunk_{i}"], embeddings=[emb.tolist()], metadatas=[{"text": chunk}]) | |
return f"✅ تمت إضافة {len(texts)} مقطعاً." | |
def retrieve_context(question): | |
embedding = embedder.encode([question])[0] | |
results = col.query(query_embeddings=[embedding.tolist()], n_results=3) | |
context_list = [m["text"] for m in results["metadatas"][0]] | |
return "\n\n".join(context_list) | |
class RagSig(dspy.Signature): | |
question: str = dspy.InputField() | |
context: str = dspy.InputField() | |
answer: str = dspy.OutputField() | |
class RagMod(dspy.Module): | |
def __init__(self): | |
super().__init__() | |
self.predictor = dspy.Predict(RagSig) | |
def forward(self, question): | |
context = retrieve_context(question) | |
return self.predictor(question=question, context=context) | |
model = RagMod() | |
def answer(question): | |
out = model(question) | |
return out.answer | |
def load_dataset(path): | |
with open(path, "r", encoding="utf-8") as f: | |
return [Example(**json.loads(l)).with_inputs("question") for l in f] | |
def optimize(train_file, val_file): | |
global model | |
trainset = load_dataset(train_file.name) | |
valset = load_dataset(val_file.name) | |
tp = MIPROv2(metric=evaluate.answer_exact_match, auto="light", num_threads=4) | |
optimized = tp.compile(model, trainset=trainset, valset=valset) | |
model = optimized | |
return "✅ تم تحسين النموذج!" | |
with gr.Blocks() as demo: | |
gr.Markdown("## 🧠 نظام RAG عربي باستخدام DSPy + ChromaDB + Hugging Face Inference") | |
with gr.Tab("📥 تحميل وتخزين"): | |
pdf_input = gr.File(label="ارفع ملف PDF", type="binary") | |
ingest_btn = gr.Button("إضافة إلى قاعدة البيانات") | |
ingest_out = gr.Textbox(label="نتيجة الإضافة") | |
ingest_btn.click(ingest, inputs=pdf_input, outputs=ingest_out) | |
with gr.Tab("❓ سؤال"): | |
q = gr.Textbox(label="اكتب سؤالك بالعربية") | |
answer_btn = gr.Button("احصل على الإجابة") | |
out = gr.Textbox(label="الإجابة") | |
answer_btn.click(answer, inputs=q, outputs=out) | |
with gr.Tab("⚙️ تحسين النموذج"): | |
train_file = gr.File(label="trainset.jsonl") | |
val_file = gr.File(label="valset.jsonl") | |
opt_btn = gr.Button("ابدأ التحسين") | |
result = gr.Textbox(label="نتيجة التحسين") | |
opt_btn.click(optimize, inputs=[train_file, val_file], outputs=result) | |
demo.launch() | |