ISE / engines /search.py
fikird
Complete rewrite of ISE with advanced RAG and OSINT capabilities
48922fa
raw
history blame
4.95 kB
"""
RAG-based search engine with intelligent answer synthesis.
"""
from typing import List, Dict, Any, Optional
import asyncio
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from duckduckgo_search import DDGS
from googlesearch import search as gsearch
import requests
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential
class SearchEngine:
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
"""Perform web search using multiple search engines."""
results = []
# DuckDuckGo Search
try:
with DDGS() as ddgs:
ddg_results = [r for r in ddgs.text(query, max_results=max_results)]
results.extend(ddg_results)
except Exception as e:
print(f"DuckDuckGo search error: {e}")
# Google Search
try:
google_results = gsearch(query, num_results=max_results)
results.extend([{"link": url, "title": url} for url in google_results])
except Exception as e:
print(f"Google search error: {e}")
return results[:max_results]
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def fetch_content(self, url: str) -> Optional[str]:
"""Fetch and extract content from a webpage."""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove unwanted elements
for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()
text = soup.get_text(separator="\n", strip=True)
return text
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def process_search_results(self, query: str) -> Dict[str, Any]:
"""Process search results and create a RAG-based answer."""
# Perform web search
search_results = await self.search_web(query)
# Fetch content from search results
documents = []
for result in search_results:
url = result.get("link")
if not url:
continue
content = await self.fetch_content(url)
if content:
# Split content into chunks
chunks = self.text_splitter.split_text(content)
for chunk in chunks:
doc = Document(
page_content=chunk,
metadata={"source": url, "title": result.get("title", url)}
)
documents.append(doc)
if not documents:
return {
"answer": "I couldn't find any relevant information.",
"sources": []
}
# Create vector store
vectorstore = FAISS.from_documents(documents, self.embeddings)
# Create retrieval chain
chain = RetrievalQAWithSourcesChain.from_chain_type(
llm=None, # We'll implement custom answer synthesis
retriever=vectorstore.as_retriever()
)
# Get relevant documents
relevant_docs = chain.retriever.get_relevant_documents(query)
# For now, return the most relevant chunks and sources
sources = []
content = []
for doc in relevant_docs[:3]:
if doc.metadata["source"] not in sources:
sources.append(doc.metadata["source"])
content.append(doc.page_content)
return {
"answer": "\n\n".join(content),
"sources": sources
}
async def search(self, query: str) -> Dict[str, Any]:
"""Main search interface."""
try:
return await self.process_search_results(query)
except Exception as e:
return {
"answer": f"An error occurred: {str(e)}",
"sources": []
}