|
import os |
|
import asyncio |
|
from dotenv import load_dotenv |
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from openai import OpenAI |
|
openai = OpenAI( |
|
api_key=DEEPINFRA_API_KEY, |
|
base_url="https://api.deepinfra.com/v1/openai", |
|
) |
|
|
|
|
|
import weaviate |
|
from weaviate.classes.init import Auth |
|
from contextlib import contextmanager |
|
|
|
@contextmanager |
|
def weaviate_client(): |
|
client = weaviate.connect_to_weaviate_cloud( |
|
cluster_url=WEAVIATE_URL, |
|
auth_credentials=Auth.api_key(WEAVIATE_API_KEY), |
|
) |
|
try: |
|
yield client |
|
finally: |
|
client.close() |
|
|
|
|
|
last_uploaded_path = None |
|
|
|
|
|
def embed_texts(texts: list[str], batch_size: int = 50) -> list[list[float]]: |
|
all_embeddings = [] |
|
for i in range(0, len(texts), batch_size): |
|
batch = texts[i : i + batch_size] |
|
try: |
|
resp = openai.embeddings.create( |
|
model="Qwen/Qwen3-Embedding-8B", |
|
input=batch, |
|
encoding_format="float" |
|
) |
|
batch_embs = [item.embedding for item in resp.data] |
|
all_embeddings.extend(batch_embs) |
|
except Exception as e: |
|
print(f"Embedding error: {e}") |
|
all_embeddings.extend([[] for _ in batch]) |
|
return all_embeddings |
|
|
|
def encode_query(query: str) -> list[float] | None: |
|
embs = embed_texts([query], batch_size=1) |
|
if embs and embs[0]: |
|
return embs[0] |
|
return None |
|
|
|
async def old_Document(query: str, top_k: int = 1) -> dict: |
|
qe = encode_query(query) |
|
if not qe: |
|
return {"answer": []} |
|
|
|
try: |
|
with weaviate_client() as client: |
|
coll = client.collections.get("Old_Documents") |
|
res = coll.query.near_vector( |
|
near_vector=qe, |
|
limit=top_k, |
|
return_properties=["text"] |
|
) |
|
if not getattr(res, "objects", None): |
|
return {"answer": []} |
|
return { |
|
"answer": [obj.properties.get("text", "[No Text]") for obj in res.objects] |
|
} |
|
except Exception as e: |
|
print("RAG Error:", e) |
|
return {"answer": []} |
|
|
|
|
|
def ingest_file(path: str) -> str: |
|
global last_uploaded_path |
|
last_uploaded_path = path |
|
return f"Old document ingested: {os.path.basename(path)}" |
|
|
|
def answer_question(query: str) -> str: |
|
try: |
|
rag_resp = asyncio.run(old_Document(query)) |
|
chunks = rag_resp.get("answer", []) |
|
if not chunks: |
|
return "Sorry, I couldn't find relevant content in the old document." |
|
|
|
return "\n".join(f"- {c}" for c in chunks) |
|
except Exception as e: |
|
return f"Error processing your request: {e}" |
|
|
|
|
|
with gr.Blocks(title="Old Documents RAG") as demo: |
|
gr.Markdown("## Old Documents RAG") |
|
query = gr.Textbox(placeholder="Your question...", lines=2, label="Ask about Old Documents") |
|
doc_file = gr.File(label="Upload Old Document (PDF, DOCX, TXT)") |
|
btn = gr.Button("Submit") |
|
out = gr.Textbox(label="Answer from Old Documents", lines=8, interactive=False) |
|
|
|
def process_old_doc(query, doc_file): |
|
if doc_file: |
|
|
|
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs") |
|
os.makedirs(upload_dir, exist_ok=True) |
|
safe_filename = os.path.basename(doc_file.name) |
|
save_path = os.path.join(upload_dir, safe_filename) |
|
with open(save_path, "wb") as f: |
|
f.write(doc_file.read()) |
|
status = ingest_file(save_path) |
|
answer = answer_question(query) |
|
return f"{status}\n\n{answer}" |
|
else: |
|
|
|
if last_uploaded_path: |
|
answer = answer_question(query) |
|
return f"[Using previously uploaded document: {os.path.basename(last_uploaded_path)}]\n\n{answer}" |
|
else: |
|
return "No document uploaded. Please upload an old document to proceed." |
|
|
|
btn.click(fn=process_old_doc, inputs=[query, doc_file], outputs=out) |
|
|
|
if __name__ == "__main__": |
|
demo.launch(debug=True) |