Spaces:
Build error
Build error
from typing import Dict, List, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import time | |
import json | |
import os | |
from urllib.parse import urlparse, quote_plus | |
import logging | |
import random | |
logger = logging.getLogger(__name__) | |
class SearchResult: | |
def __init__(self, title: str, link: str, snippet: str): | |
self.title = title | |
self.link = link | |
self.snippet = snippet | |
class ModelManager: | |
"""Manages different AI models for specific tasks""" | |
def __init__(self): | |
self.device = "cpu" | |
self.models = {} | |
self.load_models() | |
def load_models(self): | |
# Use smaller models for CPU deployment | |
self.models['summarizer'] = pipeline( | |
"summarization", | |
model="facebook/bart-base", | |
device=self.device | |
) | |
self.models['embeddings'] = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={"device": self.device} | |
) | |
class ContentProcessor: | |
"""Processes and analyzes different types of content""" | |
def __init__(self): | |
self.model_manager = ModelManager() | |
def clean_text(self, text: str) -> str: | |
"""Clean and normalize text content""" | |
# Remove extra whitespace | |
text = ' '.join(text.split()) | |
# Remove common navigation elements | |
nav_elements = [ | |
"Skip to content", | |
"Search", | |
"Menu", | |
"Navigation", | |
"Subscribe", | |
"Browse", | |
"Submit", | |
"More", | |
"About", | |
"Contact", | |
"Privacy Policy", | |
"Terms of Use" | |
] | |
for element in nav_elements: | |
text = text.replace(element, "") | |
return text.strip() | |
def extract_main_content(self, soup: BeautifulSoup) -> str: | |
"""Extract main content from HTML""" | |
# Remove navigation, headers, footers | |
for elem in soup.find_all(['nav', 'header', 'footer', 'script', 'style', 'meta', 'link']): | |
elem.decompose() | |
# Try to find main content container | |
main_content = None | |
content_tags = ['article', 'main', '[role="main"]', '.content', '#content', '.post', '.entry'] | |
for tag in content_tags: | |
main_content = soup.select_one(tag) | |
if main_content: | |
break | |
if not main_content: | |
main_content = soup | |
# Extract text from paragraphs | |
paragraphs = main_content.find_all('p') | |
if paragraphs: | |
return ' '.join(p.get_text(strip=True) for p in paragraphs) | |
# Fallback to all text if no paragraphs found | |
return main_content.get_text(strip=True) | |
def process_content(self, content: str, html_content: str = None) -> Dict: | |
"""Process content and generate insights""" | |
try: | |
# Clean content | |
cleaned_content = self.clean_text(content) | |
# If HTML content is provided, try to extract main content | |
if html_content: | |
soup = BeautifulSoup(html_content, 'lxml') | |
main_content = self.extract_main_content(soup) | |
if main_content: | |
cleaned_content = self.clean_text(main_content) | |
# Generate summary in chunks if content is too long | |
chunks = [cleaned_content[i:i+1024] for i in range(0, len(cleaned_content), 1024)] | |
summaries = [] | |
for chunk in chunks[:3]: # Process up to 3 chunks to avoid too long processing | |
try: | |
summary = self.model_manager.models['summarizer']( | |
chunk, | |
max_length=150, | |
min_length=50, | |
do_sample=False | |
)[0]['summary_text'] | |
summaries.append(summary) | |
except Exception as e: | |
logger.warning(f"Error summarizing chunk: {str(e)}") | |
continue | |
# Combine summaries | |
final_summary = ' '.join(summaries) | |
# Extract key points using bullet points | |
key_points = self.model_manager.models['summarizer']( | |
cleaned_content[:1024], | |
max_length=100, | |
min_length=30, | |
num_beams=4, | |
do_sample=True | |
)[0]['summary_text'] | |
return { | |
'summary': final_summary, | |
'key_points': key_points, | |
'content': cleaned_content | |
} | |
except Exception as e: | |
return { | |
'summary': f"Error processing content: {str(e)}", | |
'key_points': "", | |
'content': content | |
} | |
class WebSearchEngine: | |
"""Main search engine class""" | |
def __init__(self): | |
self.processor = ContentProcessor() | |
self.session = requests.Session() | |
self.request_delay = 2.0 | |
self.last_request_time = 0 | |
self.max_retries = 3 | |
self.headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: | |
"""Make a GET request with retries and error handling""" | |
for i in range(max_retries): | |
try: | |
# Add delay between requests | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.request_delay: | |
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) | |
response = self.session.get(url, headers=self.headers, timeout=10) | |
self.last_request_time = time.time() | |
if response.status_code == 200: | |
return response | |
elif response.status_code == 429: # Rate limit | |
wait_time = (i + 1) * 5 | |
time.sleep(wait_time) | |
continue | |
else: | |
response.raise_for_status() | |
except Exception as e: | |
if i == max_retries - 1: | |
raise | |
time.sleep((i + 1) * 2) | |
raise Exception(f"Failed to fetch URL after {max_retries} attempts") | |
def is_valid_url(self, url: str) -> bool: | |
"""Check if URL is valid for crawling""" | |
try: | |
parsed = urlparse(url) | |
return bool(parsed.netloc and parsed.scheme) | |
except: | |
return False | |
def get_metadata(self, soup: BeautifulSoup) -> Dict: | |
"""Extract metadata from page""" | |
title = soup.title.string if soup.title else "No title" | |
description = "" | |
if soup.find("meta", attrs={"name": "description"}): | |
description = soup.find("meta", attrs={"name": "description"}).get("content", "") | |
return { | |
'title': title, | |
'description': description | |
} | |
def process_url(self, url: str) -> Dict: | |
"""Process a single URL""" | |
if not self.is_valid_url(url): | |
return {'error': f"Invalid URL: {url}"} | |
try: | |
response = self.safe_get(url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Get metadata | |
metadata = self.get_metadata(soup) | |
# Process content with both text and HTML | |
processed = self.processor.process_content( | |
soup.get_text(), | |
html_content=response.text | |
) | |
return { | |
'url': url, | |
'title': metadata['title'], | |
'description': metadata['description'], | |
'summary': processed['summary'], | |
'key_points': processed['key_points'], | |
'content': processed['content'] | |
} | |
except Exception as e: | |
return {'error': f"Error processing {url}: {str(e)}"} | |
def format_results(self, results: List[Dict]) -> Dict: | |
"""Format search results in a user-friendly way""" | |
formatted_insights = [] | |
formatted_results = [] | |
for result in results: | |
if 'error' not in result: | |
# Format key points | |
if result.get('key_points'): | |
points = result['key_points'].split('. ') | |
formatted_points = [f"• {point.strip()}" for point in points if point.strip()] | |
formatted_insights.extend(formatted_points) | |
# Format detailed result | |
formatted_result = { | |
'title': result['title'], | |
'url': result['url'], | |
'summary': result['summary'], | |
} | |
formatted_results.append(formatted_result) | |
# Remove duplicates while preserving order | |
formatted_insights = list(dict.fromkeys(formatted_insights)) | |
return { | |
'insights': '\n'.join(formatted_insights[:10]), # Top 10 insights | |
'results': formatted_results | |
} | |
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: | |
"""Search DuckDuckGo and parse HTML results""" | |
search_results = [] | |
try: | |
# Encode query for URL | |
encoded_query = quote_plus(query) | |
# DuckDuckGo HTML search URL | |
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
# Get search results page | |
response = self.safe_get(search_url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Find all result elements | |
results = soup.find_all('div', {'class': 'result'}) | |
for result in results[:max_results]: | |
try: | |
# Extract link | |
link_elem = result.find('a', {'class': 'result__a'}) | |
if not link_elem: | |
continue | |
link = link_elem.get('href', '') | |
if not link or not self.is_valid_url(link): | |
continue | |
# Extract title | |
title = link_elem.get_text(strip=True) | |
# Extract snippet | |
snippet_elem = result.find('a', {'class': 'result__snippet'}) | |
snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" | |
search_results.append({ | |
'link': link, | |
'title': title, | |
'snippet': snippet | |
}) | |
# Add delay between processing results | |
time.sleep(random.uniform(0.2, 0.5)) | |
except Exception as e: | |
logger.warning(f"Error processing search result: {str(e)}") | |
continue | |
return search_results | |
except Exception as e: | |
logger.error(f"Error during DuckDuckGo search: {str(e)}") | |
return [] | |
def search(self, query: str, max_results: int = 5) -> Dict: | |
"""Perform search and process results""" | |
try: | |
# Search using DuckDuckGo HTML | |
search_results = self.search_duckduckgo(query, max_results) | |
if not search_results: | |
return {'error': 'No results found'} | |
results = [] | |
for result in search_results: | |
if 'link' in result: | |
processed = self.process_url(result['link']) | |
if 'error' not in processed: | |
results.append(processed) | |
time.sleep(random.uniform(0.5, 1.0)) | |
if not results: | |
return {'error': 'Failed to process any search results'} | |
# Format results in a user-friendly way | |
formatted = self.format_results(results) | |
return { | |
'results': formatted['results'], | |
'insights': formatted['insights'], | |
'follow_up_questions': [ | |
f"What are the recent breakthroughs in {query}?", | |
f"How does {query} impact various industries?", | |
f"What are the future prospects of {query}?" | |
] | |
} | |
except Exception as e: | |
return {'error': f"Search failed: {str(e)}"} | |
# Main search function | |
def search(query: str, max_results: int = 5) -> Dict: | |
"""Main search function""" | |
engine = WebSearchEngine() | |
return engine.search(query, max_results) | |