File size: 4,945 Bytes
48922fa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
RAG-based search engine with intelligent answer synthesis.
"""
from typing import List, Dict, Any, Optional
import asyncio
from langchain.chains import RetrievalQAWithSourcesChain
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from duckduckgo_search import DDGS
from googlesearch import search as gsearch
import requests
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential

class SearchEngine:
    def __init__(self):
        self.embeddings = HuggingFaceEmbeddings(
            model_name="sentence-transformers/all-mpnet-base-v2"
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50
        )
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def search_web(self, query: str, max_results: int = 10) -> List[Dict[str, str]]:
        """Perform web search using multiple search engines."""
        results = []
        
        # DuckDuckGo Search
        try:
            with DDGS() as ddgs:
                ddg_results = [r for r in ddgs.text(query, max_results=max_results)]
                results.extend(ddg_results)
        except Exception as e:
            print(f"DuckDuckGo search error: {e}")
        
        # Google Search
        try:
            google_results = gsearch(query, num_results=max_results)
            results.extend([{"link": url, "title": url} for url in google_results])
        except Exception as e:
            print(f"Google search error: {e}")
        
        return results[:max_results]
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def fetch_content(self, url: str) -> Optional[str]:
        """Fetch and extract content from a webpage."""
        try:
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
            }
            response = requests.get(url, headers=headers, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, "html.parser")
            
            # Remove unwanted elements
            for element in soup(["script", "style", "nav", "footer", "header"]):
                element.decompose()
            
            text = soup.get_text(separator="\n", strip=True)
            return text
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None
    
    async def process_search_results(self, query: str) -> Dict[str, Any]:
        """Process search results and create a RAG-based answer."""
        # Perform web search
        search_results = await self.search_web(query)
        
        # Fetch content from search results
        documents = []
        for result in search_results:
            url = result.get("link")
            if not url:
                continue
                
            content = await self.fetch_content(url)
            if content:
                # Split content into chunks
                chunks = self.text_splitter.split_text(content)
                for chunk in chunks:
                    doc = Document(
                        page_content=chunk,
                        metadata={"source": url, "title": result.get("title", url)}
                    )
                    documents.append(doc)
        
        if not documents:
            return {
                "answer": "I couldn't find any relevant information.",
                "sources": []
            }
        
        # Create vector store
        vectorstore = FAISS.from_documents(documents, self.embeddings)
        
        # Create retrieval chain
        chain = RetrievalQAWithSourcesChain.from_chain_type(
            llm=None,  # We'll implement custom answer synthesis
            retriever=vectorstore.as_retriever()
        )
        
        # Get relevant documents
        relevant_docs = chain.retriever.get_relevant_documents(query)
        
        # For now, return the most relevant chunks and sources
        sources = []
        content = []
        for doc in relevant_docs[:3]:
            if doc.metadata["source"] not in sources:
                sources.append(doc.metadata["source"])
            content.append(doc.page_content)
        
        return {
            "answer": "\n\n".join(content),
            "sources": sources
        }
    
    async def search(self, query: str) -> Dict[str, Any]:
        """Main search interface."""
        try:
            return await self.process_search_results(query)
        except Exception as e:
            return {
                "answer": f"An error occurred: {str(e)}",
                "sources": []
            }