aiws / search_engine.py
fikird
Add RAG functionality with vector storage and web crawling
44198e0
raw
history blame
5.98 kB
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from duckduckgo_search import ddg
from transformers import pipeline
from langchain.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse
class ModelManager:
"""Manages different AI models for specific tasks"""
def __init__(self):
self.device = "cpu"
self.models = {}
self.load_models()
def load_models(self):
# Use smaller models for CPU deployment
self.models['summarizer'] = pipeline(
"summarization",
model="facebook/bart-base",
device=self.device
)
self.models['embeddings'] = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": self.device}
)
class ContentProcessor:
"""Processes and analyzes different types of content"""
def __init__(self):
self.model_manager = ModelManager()
def process_content(self, content: str) -> Dict:
"""Process content and generate insights"""
try:
# Generate summary
summary = self.model_manager.models['summarizer'](
content[:1024],
max_length=100,
min_length=30,
do_sample=False
)[0]['summary_text']
return {
'summary': summary,
'content_type': 'text',
'explanation': summary
}
except Exception as e:
print(f"Error processing content: {str(e)}")
return {
'summary': content[:200] + "...",
'content_type': 'text',
'explanation': "Unable to generate detailed analysis."
}
class WebSearchEngine:
"""Main search engine class"""
def __init__(self):
self.processor = ContentProcessor()
self.session = requests.Session()
self.request_delay = 1.0
self.last_request_time = 0
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid for crawling"""
try:
parsed = urlparse(url)
return bool(parsed.netloc and parsed.scheme in ['http', 'https'])
except:
return False
def get_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extract metadata from page"""
title = soup.title.string if soup.title else ""
description = ""
if soup.find("meta", attrs={"name": "description"}):
description = soup.find("meta", attrs={"name": "description"}).get("content", "")
return {
"title": title,
"description": description
}
def process_url(self, url: str) -> Dict:
"""Process a single URL"""
try:
# Respect rate limiting
current_time = time.time()
if current_time - self.last_request_time < self.request_delay:
time.sleep(self.request_delay - (current_time - self.last_request_time))
response = self.session.get(url, timeout=10)
self.last_request_time = time.time()
if not response.ok:
return None
soup = BeautifulSoup(response.text, 'lxml')
metadata = self.get_metadata(soup)
# Extract main content
content = ' '.join([p.get_text() for p in soup.find_all('p')])
if not content:
return None
processed_content = self.processor.process_content(content)
processed_content['metadata'] = metadata
return {
'url': url,
'title': metadata['title'],
'snippet': content[:200] + "...",
'processed_content': processed_content
}
except Exception as e:
print(f"Error processing {url}: {str(e)}")
return None
def search(self, query: str, max_results: int = 5) -> Dict:
"""Perform search and process results"""
try:
# Search using DuckDuckGo
search_results = ddg(query, max_results=max_results)
# Process results
processed_results = []
for result in search_results:
if self.is_valid_url(result['link']):
processed = self.process_url(result['link'])
if processed:
processed_results.append(processed)
# Generate insights
all_content = ' '.join([r['processed_content']['summary'] for r in processed_results if r])
insights = self.processor.process_content(all_content)['summary']
# Generate follow-up questions
follow_up_questions = [
f"What are the key differences between {query} and related topics?",
f"How has {query} evolved over time?",
f"What are the practical applications of {query}?"
]
return {
'results': processed_results,
'insights': insights,
'follow_up_questions': follow_up_questions,
'similar_queries': []
}
except Exception as e:
print(f"Error during search: {str(e)}")
return {
'results': [],
'insights': f"Error performing search: {str(e)}",
'follow_up_questions': [],
'similar_queries': []
}
# Main search function
def search(query: str, max_results: int = 5) -> Dict:
"""Main search function"""
engine = WebSearchEngine()
return engine.search(query, max_results)