from typing import Dict, List, Any import requests from bs4 import BeautifulSoup from transformers import pipeline from langchain_community.embeddings import HuggingFaceEmbeddings import time import json import os from urllib.parse import urlparse, quote_plus import logging import random logger = logging.getLogger(__name__) class SearchResult: def __init__(self, title: str, link: str, snippet: str): self.title = title self.link = link self.snippet = snippet class ModelManager: """Manages different AI models for specific tasks""" def __init__(self): self.device = "cpu" self.models = {} self.load_models() def load_models(self): # Use smaller models for CPU deployment self.models['summarizer'] = pipeline( "summarization", model="facebook/bart-base", device=self.device ) self.models['embeddings'] = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": self.device} ) class ContentProcessor: """Processes and analyzes different types of content""" def __init__(self): self.model_manager = ModelManager() def clean_text(self, text: str) -> str: """Clean and normalize text content""" # Remove extra whitespace text = ' '.join(text.split()) # Remove common navigation elements nav_elements = [ "Skip to content", "Search", "Menu", "Navigation", "Subscribe", "Browse", "Submit", "More", "About", "Contact", "Privacy Policy", "Terms of Use" ] for element in nav_elements: text = text.replace(element, "") return text.strip() def extract_main_content(self, soup: BeautifulSoup) -> str: """Extract main content from HTML""" # Remove navigation, headers, footers for elem in soup.find_all(['nav', 'header', 'footer', 'script', 'style', 'meta', 'link']): elem.decompose() # Try to find main content container main_content = None content_tags = ['article', 'main', '[role="main"]', '.content', '#content', '.post', '.entry'] for tag in content_tags: main_content = soup.select_one(tag) if main_content: break if not main_content: main_content = soup # Extract text from paragraphs paragraphs = main_content.find_all('p') if paragraphs: return ' '.join(p.get_text(strip=True) for p in paragraphs) # Fallback to all text if no paragraphs found return main_content.get_text(strip=True) def process_content(self, content: str, html_content: str = None) -> Dict: """Process content and generate insights""" try: # Clean content cleaned_content = self.clean_text(content) # If HTML content is provided, try to extract main content if html_content: soup = BeautifulSoup(html_content, 'lxml') main_content = self.extract_main_content(soup) if main_content: cleaned_content = self.clean_text(main_content) # Generate summary in chunks if content is too long chunks = [cleaned_content[i:i+1024] for i in range(0, len(cleaned_content), 1024)] summaries = [] for chunk in chunks[:3]: # Process up to 3 chunks to avoid too long processing try: summary = self.model_manager.models['summarizer']( chunk, max_length=150, min_length=50, do_sample=False )[0]['summary_text'] summaries.append(summary) except Exception as e: logger.warning(f"Error summarizing chunk: {str(e)}") continue # Combine summaries final_summary = ' '.join(summaries) # Extract key points using bullet points key_points = self.model_manager.models['summarizer']( cleaned_content[:1024], max_length=100, min_length=30, num_beams=4, do_sample=True )[0]['summary_text'] return { 'summary': final_summary, 'key_points': key_points, 'content': cleaned_content } except Exception as e: return { 'summary': f"Error processing content: {str(e)}", 'key_points': "", 'content': content } class WebSearchEngine: """Main search engine class""" def __init__(self): self.processor = ContentProcessor() self.session = requests.Session() self.request_delay = 2.0 self.last_request_time = 0 self.max_retries = 3 self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1' } def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: """Make a GET request with retries and error handling""" for i in range(max_retries): try: # Add delay between requests current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.request_delay: time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) response = self.session.get(url, headers=self.headers, timeout=10) self.last_request_time = time.time() if response.status_code == 200: return response elif response.status_code == 429: # Rate limit wait_time = (i + 1) * 5 time.sleep(wait_time) continue else: response.raise_for_status() except Exception as e: if i == max_retries - 1: raise time.sleep((i + 1) * 2) raise Exception(f"Failed to fetch URL after {max_retries} attempts") def is_valid_url(self, url: str) -> bool: """Check if URL is valid for crawling""" try: parsed = urlparse(url) return bool(parsed.netloc and parsed.scheme) except: return False def get_metadata(self, soup: BeautifulSoup) -> Dict: """Extract metadata from page""" title = soup.title.string if soup.title else "No title" description = "" if soup.find("meta", attrs={"name": "description"}): description = soup.find("meta", attrs={"name": "description"}).get("content", "") return { 'title': title, 'description': description } def process_url(self, url: str) -> Dict: """Process a single URL""" if not self.is_valid_url(url): return {'error': f"Invalid URL: {url}"} try: response = self.safe_get(url) soup = BeautifulSoup(response.text, 'lxml') # Get metadata metadata = self.get_metadata(soup) # Process content with both text and HTML processed = self.processor.process_content( soup.get_text(), html_content=response.text ) return { 'url': url, 'title': metadata['title'], 'description': metadata['description'], 'summary': processed['summary'], 'key_points': processed['key_points'], 'content': processed['content'] } except Exception as e: return {'error': f"Error processing {url}: {str(e)}"} def format_results(self, results: List[Dict]) -> Dict: """Format search results in a user-friendly way""" formatted_insights = [] formatted_results = [] for result in results: if 'error' not in result: # Format key points if result.get('key_points'): points = result['key_points'].split('. ') formatted_points = [f"• {point.strip()}" for point in points if point.strip()] formatted_insights.extend(formatted_points) # Format detailed result formatted_result = { 'title': result['title'], 'url': result['url'], 'summary': result['summary'], } formatted_results.append(formatted_result) # Remove duplicates while preserving order formatted_insights = list(dict.fromkeys(formatted_insights)) return { 'insights': '\n'.join(formatted_insights[:10]), # Top 10 insights 'results': formatted_results } def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: """Search DuckDuckGo and parse HTML results""" search_results = [] try: # Encode query for URL encoded_query = quote_plus(query) # DuckDuckGo HTML search URL search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' # Get search results page response = self.safe_get(search_url) soup = BeautifulSoup(response.text, 'lxml') # Find all result elements results = soup.find_all('div', {'class': 'result'}) for result in results[:max_results]: try: # Extract link link_elem = result.find('a', {'class': 'result__a'}) if not link_elem: continue link = link_elem.get('href', '') if not link or not self.is_valid_url(link): continue # Extract title title = link_elem.get_text(strip=True) # Extract snippet snippet_elem = result.find('a', {'class': 'result__snippet'}) snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" search_results.append({ 'link': link, 'title': title, 'snippet': snippet }) # Add delay between processing results time.sleep(random.uniform(0.2, 0.5)) except Exception as e: logger.warning(f"Error processing search result: {str(e)}") continue return search_results except Exception as e: logger.error(f"Error during DuckDuckGo search: {str(e)}") return [] def search(self, query: str, max_results: int = 5) -> Dict: """Perform search and process results""" try: # Search using DuckDuckGo HTML search_results = self.search_duckduckgo(query, max_results) if not search_results: return {'error': 'No results found'} results = [] for result in search_results: if 'link' in result: processed = self.process_url(result['link']) if 'error' not in processed: results.append(processed) time.sleep(random.uniform(0.5, 1.0)) if not results: return {'error': 'Failed to process any search results'} # Format results in a user-friendly way formatted = self.format_results(results) return { 'results': formatted['results'], 'insights': formatted['insights'], 'follow_up_questions': [ f"What are the recent breakthroughs in {query}?", f"How does {query} impact various industries?", f"What are the future prospects of {query}?" ] } except Exception as e: return {'error': f"Search failed: {str(e)}"} # Main search function def search(query: str, max_results: int = 5) -> Dict: """Main search function""" engine = WebSearchEngine() return engine.search(query, max_results)