aiws / rag_engine.py
fikird
Add RAG functionality with vector storage and web crawling
44198e0
raw
history blame
3.47 kB
from typing import List, Dict, Any
import numpy as np
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from search_engine import WebSearchEngine
import logging
logger = logging.getLogger(__name__)
class RAGEngine:
def __init__(self):
self.web_search = WebSearchEngine()
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"}
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
self.vector_store = None
def process_and_store_content(self, content: str, metadata: Dict[str, Any] = None) -> None:
"""Process content and store in vector store"""
try:
# Split content into chunks
texts = self.text_splitter.split_text(content)
# Create metadata for each chunk
metadatas = [metadata or {}] * len(texts)
# Initialize or update vector store
if self.vector_store is None:
self.vector_store = FAISS.from_texts(texts, self.embeddings, metadatas=metadatas)
else:
self.vector_store.add_texts(texts, metadatas=metadatas)
except Exception as e:
logger.error(f"Error processing content: {str(e)}")
raise
async def search_and_process(self, query: str, max_results: int = 5, similarity_k: int = 3) -> Dict:
"""Search the web and process results with RAG"""
try:
# Get web search results
web_results = self.web_search.search(query, max_results)
# Process and store new content
for result in web_results['results']:
if 'content' in result:
self.process_and_store_content(
result['content'],
metadata={'url': result.get('url'), 'title': result.get('title')}
)
# Perform similarity search
if self.vector_store:
similar_docs = self.vector_store.similarity_search_with_score(
query,
k=similarity_k
)
# Add similarity results
web_results['similar_chunks'] = [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': doc[1]
}
for doc in similar_docs
]
return web_results
except Exception as e:
logger.error(f"Error in search_and_process: {str(e)}")
raise
def get_relevant_context(self, query: str, k: int = 3) -> List[Dict]:
"""Get most relevant context from vector store"""
if not self.vector_store:
return []
similar_docs = self.vector_store.similarity_search_with_score(query, k=k)
return [
{
'content': doc[0].page_content,
'metadata': doc[0].metadata,
'similarity_score': doc[1]
}
for doc in similar_docs
]