File size: 4,374 Bytes
163369c 10748bd 163369c e57beb8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
import os
import asyncio
from dotenv import load_dotenv
import gradio as gr
# if not (DEEPINFRA_TOKEN and WEAVIATE_URL and WEAVIATE_API_KEY):
# raise ValueError("Please set all required keys in .env")
# DeepInfra client
from openai import OpenAI
openai = OpenAI(
api_key=DEEPINFRA_API_KEY,
base_url="https://api.deepinfra.com/v1/openai",
)
# Weaviate client
import weaviate
from weaviate.classes.init import Auth
from contextlib import contextmanager
@contextmanager
def weaviate_client():
client = weaviate.connect_to_weaviate_cloud(
cluster_url=WEAVIATE_URL,
auth_credentials=Auth.api_key(WEAVIATE_API_KEY),
)
try:
yield client
finally:
client.close()
# Global path tracker
last_uploaded_path = None
# Embed function
def embed_texts(texts: list[str], batch_size: int = 50) -> list[list[float]]:
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
try:
resp = openai.embeddings.create(
model="Qwen/Qwen3-Embedding-8B",
input=batch,
encoding_format="float"
)
batch_embs = [item.embedding for item in resp.data]
all_embeddings.extend(batch_embs)
except Exception as e:
print(f"Embedding error: {e}")
all_embeddings.extend([[] for _ in batch])
return all_embeddings
def encode_query(query: str) -> list[float] | None:
embs = embed_texts([query], batch_size=1)
if embs and embs[0]:
return embs[0]
return None
async def old_Document(query: str, top_k: int = 1) -> dict:
qe = encode_query(query)
if not qe:
return {"answer": []}
try:
with weaviate_client() as client:
coll = client.collections.get("Old_Documents")
res = coll.query.near_vector(
near_vector=qe,
limit=top_k,
return_properties=["text"]
)
if not getattr(res, "objects", None):
return {"answer": []}
return {
"answer": [obj.properties.get("text", "[No Text]") for obj in res.objects]
}
except Exception as e:
print("RAG Error:", e)
return {"answer": []}
# New functions to support Gradio app
def ingest_file(path: str) -> str:
global last_uploaded_path
last_uploaded_path = path
return f"Old document ingested: {os.path.basename(path)}"
def answer_question(query: str) -> str:
try:
rag_resp = asyncio.run(old_Document(query))
chunks = rag_resp.get("answer", [])
if not chunks:
return "Sorry, I couldn't find relevant content in the old document."
return "\n".join(f"- {c}" for c in chunks)
except Exception as e:
return f"Error processing your request: {e}"
# Gradio interface for Old Documents
with gr.Blocks(title="Old Documents RAG") as demo:
gr.Markdown("## Old Documents RAG")
query = gr.Textbox(placeholder="Your question...", lines=2, label="Ask about Old Documents")
doc_file = gr.File(label="Upload Old Document (PDF, DOCX, TXT)")
btn = gr.Button("Submit")
out = gr.Textbox(label="Answer from Old Documents", lines=8, interactive=False)
def process_old_doc(query, doc_file):
if doc_file:
# Save and ingest the uploaded file
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs")
os.makedirs(upload_dir, exist_ok=True)
safe_filename = os.path.basename(doc_file.name)
save_path = os.path.join(upload_dir, safe_filename)
with open(save_path, "wb") as f:
f.write(doc_file.read())
status = ingest_file(save_path)
answer = answer_question(query)
return f"{status}\n\n{answer}"
else:
# Use last uploaded file or return error if none exists
if last_uploaded_path:
answer = answer_question(query)
return f"[Using previously uploaded document: {os.path.basename(last_uploaded_path)}]\n\n{answer}"
else:
return "No document uploaded. Please upload an old document to proceed."
btn.click(fn=process_old_doc, inputs=[query, doc_file], outputs=out)
if __name__ == "__main__":
demo.launch(debug=True) |