PLAITO / src /RAG.py
alibidaran's picture
Update src/RAG.py
31ffb61 verified
raw
history blame
9.04 kB
#from texts import *
from langgraph.graph import StateGraph
from langchain_core.runnables import RunnableLambda
from openai import OpenAI
import os
import openai
from langchain.text_splitter import RecursiveCharacterTextSplitter
import torch
from typing import TypedDict
import requests
import json
from typing import List, Dict, Any
from sklearn.cluster import KMeans
import numpy as np
file_text=''
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1024,
chunk_overlap=256,
length_function=len
)
openai.api_key = os.getenv('OPENAI_API_KEY')
client=OpenAI()
# def get_embedding(text, model="text-embedding-3-small"):
# # response = openai.embeddings.create(
# # input=[text],
# # model=model
# # )
# embeddings = encoder.encode([text], convert_to_tensor=True, show_progress_bar=True)
# embeddings = embeddings.cpu().numpy()
# return embeddings
# text=chuncked_text+Focusing_text+planning_text+Focusing_text2+Evoking_text
# chunks=text_splitter.split_text(text)
# embeddings=[get_embedding(chunk) for chunk in chunks]
# embedds=np.array(embeddings)
# kmeans=KMeans(n_clusters=3,max_iter=1000)
# kmeans.fit(embedds)
# def embed_document(state,file_text):
# chunks=text_splitter.split_text(file_text)
# embeddings=encoder.encode(chunks, convert_to_tensor=True, show_progress_bar=False)
# embeddings=embeddings.cpu().numpy()
# print(len(embeddings))
# return {'single_query':state['single_query'],'embeddings':embeddings,'chunks':chunks}
def get_knowledge(state,embeddings,chunks,method='cosine'):
query_embedding = state['embedded_query']
if method=='cosine':
# Convert to tensor
query_tensor = torch.tensor(query_embedding, dtype=torch.float32).to('cpu') # shape: [embedding_dim]
#embeddings_tensor = torch.tensor(embeddings, dtype=torch.float32).to('cuda') # shape: [num_chunks, embedding_dim]
# Normalize
embeddings_tensor=torch.from_numpy(embeddings).to('cpu')
query_tensor = query_tensor / query_tensor.norm()
embeddings_tensor = embeddings_tensor / embeddings_tensor.norm(dim=1, keepdim=True)
# Compute cosine similarity
similarities = torch.matmul(query_tensor, embeddings_tensor.T)
print(similarities.shape)
top_k = 5
top_k_indices = torch.topk(similarities, k=top_k).indices
print(top_k_indices)
return {'embedded_query':state['embedded_query'],'knowledge':[chunks[i] for i in top_k_indices.squeeze(0)]}
# elif method=='Kmeans':
# query_emb = np.array(get_embedding(state['single_query']))
# # Predict the closest cluster
# cluster_idx = kmeans.predict([query_emb])[0]
# # Find indices of documents in this cluster
# cluster_doc_indices = np.where(kmeans.labels_ == cluster_idx)[0]
# # Compute L2 (Euclidean) distance within the cluster
# cluster_embs = np.array(embeddings)[cluster_doc_indices]
# distances = np.linalg.norm(cluster_embs - query_emb, axis=1)
# top_k=5
# # Get top_k most similar documents (smallest distances)
# top_indices = distances.argsort()[:top_k]
# return {'single_query':state['single_query'],'knowledge':[chunks[cluster_doc_indices[i]] for i in top_indices]}
def summerise_knowledge(state):
prompt="""
[system]
## Instructions:
You are skillfull text analysist. Summerise the extracted information from uploaded file by the user. Make your summary as concice as possible.
### Inputs:
Extracted Knowledge:
{}
### Output:
"""
text=""
chunks=state['knowledge']
for chunk in chunks:
text+=chunk
response=client.chat.completions.create(
model="gpt-5-mini",
messages =[{'role':'user','content':prompt.format(text)}])
# url = "https://11434-dep-01k080agynagw33vkkb9xfxpkb-d.cloudspaces.litng.ai/api/chat"
# s = requests.Session()
# s.headers.update({"Authorization": "Bearer bf54d08f-e88a-4a4a-bd14-444c984eaa6e"})
# response = s.post(url, json={
# "model": "hf.co/alibidaran/LLAMA3-instructive_reasoning-GGUF:Q8_0",
# "messages":[{'role':'user','content':prompt.format(text)}] ,
# "options": {
# "temperature": 0.7,
# "top_p": 0.95 # Set your desired top_p here
# }
# })
# full_response = ""
# for line in response.iter_lines():
# if line:
# data = json.loads(line.decode("utf-8"))
# message = data.get("message", {})
# content = message.get("content", "")
# full_response += content
# if data.get("done", Falsurl = "https://800-01jy9pekct42qjmqxcap35g81s.cloudspaces.litng.ai/predict"):
# break
print(response.choices[0].message)
return {'embedded_query':state['embedded_query'],'summary':response.choices[0].message.content}
#return {'single_query':state['single_query'],'summary':full_response}
# def making_instructions(state):
# prompt="""
# [system]
# ## Instructions:
# You are skill full certificated psychologist. Create an instruction for the practitioner to help them how to behaive and respond to the client effectively.
# ## Input:
# [summary]
# {}
# ### Output:
# """
# response=client.chat.completions.create(
# model="gpt-4o-mini",
# # api_base="https://litellm.llemma.net/",
# # api_key="sk-ZsStrG5lPoGnCHZl4NgcOA",
# messages =[{'role':'user','content':prompt.format(state['summary'])}],
# temperature=0.7,
# top_p=0.95,
# max_tokens=800)
# return {'query':state['query'],'instruction':response.choices[0].message}
def respond(state):
system_prompt="""
You are a reasonable expert who thinks and answer the users question.
Before respond first think and create a chain of thoughts in your mind.
Then respond to the client. Also follow the retrived information in the ##Summary section.
Your chain of thought and reflection must be in <thinking>..</thinking> format and your respond
should be in the <output>..</output> format.
"""
user_prompt="""
## Instructions:
{}
## Summary:
{}
"""
url="https://8000-01jy9pekct42qjmqxcap35g81s.cloudspaces.litng.ai/predict"
payload = { "user_prompt":user_prompt.format(state['single_query'],state['summary'])}
response = requests.post(url, data=payload)
return {'final_response':response.json()['output'][0]}
# messages=[
# {'role':'system', 'content':system_prompt},
# ]
# messages+=[{'role':'user','content':user_prompt.format(state['single_query'],state['summary'])}]
# url = "https://11434-dep-01jx2gzqqspsvcvtgmabz6jkkz-d.cloudspaces.litng.ai/api/chat"
# s = requests.Session()
# s.headers.update({"Authorization": "Bearer bf54d08f-e88a-4a4a-bd14-444c984eaa6e"})
# response = s.post(url, json={
# "model": "hf.co/alibidaran/LLAMA3-intructive_reasoning_GGUF:Q8_0",
# "messages": messages,
# "options": {
# "temperature": 0.7 # Set your desired temperature here
# }
# })
# # Collect the assistant's output
# full_response = ""
# for line in response.iter_lines():
# if line:
# data = json.loads(line.decode("utf-8"))
# message = data.get("message", {})
# content = message.get("content", "")
# full_response += content
# if data.get("done", False):
# break
# response=client.chat.completions.create(
# model="gpt-4o-mini",
# # api_base="https://litellm.llemma.net/",
# # api_key="sk-ZsStrG5lPoGnCHZl4NgcOA",
# messages =messages,
# temperature=0.7,
# top_p=0.95,)
#return {'final_response':full_response}
def end_node(state):
#print("Response:\n", state["final_response"])
return {'knowledge':state["summary"]}
class MyState(TypedDict):
#query: List[Dict[str, Any]]
embedded_query:list
knowledge: list
summary: str
def load_graph(embeddings,chunks):
graph_builder = StateGraph(state_schema=MyState)
# Add nodes
graph_builder.set_entry_point('get_knowledge')
graph_builder.add_node("get_knowledge", RunnableLambda(lambda state: get_knowledge(state,embeddings,chunks,method='cosine')))
graph_builder.add_node("summarise", RunnableLambda(summerise_knowledge))
#graph_builder.add_node('instructuons', RunnableLambda(making_instructions))
#graph_builder.add_node("respond", RunnableLambda(respond))
graph_builder.add_node("end", RunnableLambda(end_node))
# Add edges
graph_builder.add_edge("get_knowledge", "summarise")
#graph_builder.add_edge("summarise", "respond")
#graph_builder.add_edge("instructuons",'respond')
graph_builder.add_edge("summarise", "end")
# Compile the graph
# graph = graph_builder.compile()
return graph_builder