Spaces:
Runtime error
Runtime error
""" | |
RAG-based search engine with intelligent answer synthesis. | |
""" | |
from typing import List, Dict, Any, Optional | |
import asyncio | |
from langchain.chains import RetrievalQAWithSourcesChain | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.docstore.document import Document | |
from duckduckgo_search import DDGS | |
from googlesearch import search as gsearch | |
import requests | |
from bs4 import BeautifulSoup | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
class SearchEngine: | |
def __init__(self): | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-mpnet-base-v2" | |
) | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=50 | |
) | |
async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]: | |
"""Perform web search using multiple search engines.""" | |
results = [] | |
# DuckDuckGo Search | |
try: | |
with DDGS() as ddgs: | |
ddg_results = [r for r in ddgs.text(query, max_results=max_results)] | |
results.extend(ddg_results) | |
except Exception as e: | |
print(f"DuckDuckGo search error: {e}") | |
# Google Search | |
try: | |
google_results = gsearch(query, num_results=max_results) | |
results.extend([{"link": url, "title": url} for url in google_results]) | |
except Exception as e: | |
print(f"Google search error: {e}") | |
return results[:max_results] | |
async def fetch_content(self, url: str) -> Optional[str]: | |
"""Fetch and extract content from a webpage.""" | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Remove unwanted elements | |
for element in soup(["script", "style", "nav", "footer", "header"]): | |
element.decompose() | |
text = soup.get_text(separator="\n", strip=True) | |
return text | |
except Exception as e: | |
print(f"Error fetching {url}: {e}") | |
return None | |
async def process_search_results(self, query: str) -> Dict[str, Any]: | |
"""Process search results and create a RAG-based answer.""" | |
# Perform web search | |
search_results = await self.search_web(query) | |
# Fetch content from search results | |
documents = [] | |
for result in search_results: | |
url = result.get("link") | |
if not url: | |
continue | |
content = await self.fetch_content(url) | |
if content: | |
# Split content into chunks | |
chunks = self.text_splitter.split_text(content) | |
for chunk in chunks: | |
doc = Document( | |
page_content=chunk, | |
metadata={"source": url, "title": result.get("title", url)} | |
) | |
documents.append(doc) | |
if not documents: | |
return { | |
"answer": "I couldn't find any relevant information.", | |
"sources": [] | |
} | |
# Create vector store | |
vectorstore = FAISS.from_documents(documents, self.embeddings) | |
# Create retrieval chain | |
chain = RetrievalQAWithSourcesChain.from_chain_type( | |
llm=None, # We'll implement custom answer synthesis | |
retriever=vectorstore.as_retriever() | |
) | |
# Get relevant documents | |
relevant_docs = chain.retriever.get_relevant_documents(query) | |
# For now, return the most relevant chunks and sources | |
sources = [] | |
content = [] | |
for doc in relevant_docs[:3]: | |
if doc.metadata["source"] not in sources: | |
sources.append(doc.metadata["source"]) | |
content.append(doc.page_content) | |
return { | |
"answer": "\n\n".join(content), | |
"sources": sources | |
} | |
async def search(self, query: str) -> Dict[str, Any]: | |
"""Main search interface.""" | |
try: | |
return await self.process_search_results(query) | |
except Exception as e: | |
return { | |
"answer": f"An error occurred: {str(e)}", | |
"sources": [] | |
} | |