Node-Brain-BU3 / tools /websearch.py
broadfield-dev's picture
Create websearch.py
cb327b0 verified
raw
history blame
3.78 kB
import requests
from bs4 import BeautifulSoup, Comment
import logging
import re
from duckduckgo_search import DDGS
logger = logging.getLogger(__name__)
DEFAULT_TIMEOUT = 10
MAX_CONTENT_LENGTH_PER_URL = 3000
MAX_TOTAL_SCRAPED_CONTENT = 9000
def scrape_url(url_to_scrape):
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'}
response = requests.get(url_to_scrape, headers=headers, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
response.raise_for_status()
if 'html' not in response.headers.get('content-type', '').lower():
return {"url": url_to_scrape, "error": "Non-HTML content"}
soup = BeautifulSoup(response.content, 'html.parser')
for el_type in ["script", "style", "nav", "footer", "aside", "form", "header", "menu"]:
for el in soup.select(el_type): el.decompose()
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
main_selectors = ['main', 'article', '.main-content', '.entry-content', 'div[role="main"]']
content_area = next((soup.select_one(s) for s in main_selectors if soup.select_one(s)), soup.body or soup)
text_parts = [el.get_text(separator=' ', strip=True) for el in content_area.find_all(['p', 'h1', 'h2', 'h3', 'li', 'td'])]
full_text = "\n\n".join(filter(None, text_parts))
if not full_text: full_text = content_area.get_text(separator='\n', strip=True)
full_text = re.sub(r'\s{3,}', ' ', full_text).strip()
full_text = re.sub(r'(\n\s*){3,}', '\n\n', full_text)
title = soup.find('title').string.strip() if soup.find('title') and soup.find('title').string else url_to_scrape
return {"url": url_to_scrape, "title": title, "content": full_text[:MAX_CONTENT_LENGTH_PER_URL]}
except requests.exceptions.RequestException as e:
logger.error(f"WEBSEARCH: Request failed for {url_to_scrape}: {e}")
return {"url": url_to_scrape, "error": f"Request failed: {e}"}
except Exception as e:
logger.error(f"WEBSEARCH: Scraping error for {url_to_scrape}: {e}", exc_info=True)
return {"url": url_to_scrape, "error": f"Scraping error: {e}"}
def search_and_scrape_duckduckgo(search_query, num_results=3):
scraped_data = []
total_content_len = 0
try:
with DDGS() as ddgs:
results = list(ddgs.text(search_query, max_results=num_results + 2, region='wt-wt'))
if not results: return [{"query": search_query, "error": "No search results."}]
for res in results[:num_results]:
if total_content_len >= MAX_TOTAL_SCRAPED_CONTENT: break
if not res.get('href'): continue
scraped_info = scrape_url(res['href'])
if not scraped_info.get("title") or scraped_info.get("title") == res['href']:
scraped_info["title"] = res.get('title', res['href'])
current_content = scraped_info.get("content", "")
if res.get('body') and (not current_content or len(current_content) < 150 or scraped_info.get("error")):
scraped_info["content"] = f"Search snippet: {res['body']}\n\n(Content from page):\n{current_content}"
scraped_data.append(scraped_info)
if scraped_info.get("content") and not scraped_info.get("error"):
total_content_len += len(scraped_info["content"])
return scraped_data
except Exception as e:
logger.error(f"WEBSEARCH (DDG): Error for '{search_query}': {e}", exc_info=True)
return [{"query": search_query, "error": f"DDG search failed: {e}"}]