broadfield-dev commited on
Commit
cb327b0
·
verified ·
1 Parent(s): 9161e19

Create websearch.py

Browse files
Files changed (1) hide show
  1. tools/websearch.py +72 -0
tools/websearch.py ADDED
@@ -0,0 +1,72 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from bs4 import BeautifulSoup, Comment
3
+ import logging
4
+ import re
5
+ from duckduckgo_search import DDGS
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ DEFAULT_TIMEOUT = 10
10
+ MAX_CONTENT_LENGTH_PER_URL = 3000
11
+ MAX_TOTAL_SCRAPED_CONTENT = 9000
12
+
13
+ def scrape_url(url_to_scrape):
14
+ try:
15
+ headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'}
16
+ response = requests.get(url_to_scrape, headers=headers, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
17
+ response.raise_for_status()
18
+ if 'html' not in response.headers.get('content-type', '').lower():
19
+ return {"url": url_to_scrape, "error": "Non-HTML content"}
20
+
21
+ soup = BeautifulSoup(response.content, 'html.parser')
22
+ for el_type in ["script", "style", "nav", "footer", "aside", "form", "header", "menu"]:
23
+ for el in soup.select(el_type): el.decompose()
24
+ for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
25
+ comment.extract()
26
+
27
+ main_selectors = ['main', 'article', '.main-content', '.entry-content', 'div[role="main"]']
28
+ content_area = next((soup.select_one(s) for s in main_selectors if soup.select_one(s)), soup.body or soup)
29
+
30
+ text_parts = [el.get_text(separator=' ', strip=True) for el in content_area.find_all(['p', 'h1', 'h2', 'h3', 'li', 'td'])]
31
+ full_text = "\n\n".join(filter(None, text_parts))
32
+ if not full_text: full_text = content_area.get_text(separator='\n', strip=True)
33
+
34
+ full_text = re.sub(r'\s{3,}', ' ', full_text).strip()
35
+ full_text = re.sub(r'(\n\s*){3,}', '\n\n', full_text)
36
+ title = soup.find('title').string.strip() if soup.find('title') and soup.find('title').string else url_to_scrape
37
+
38
+ return {"url": url_to_scrape, "title": title, "content": full_text[:MAX_CONTENT_LENGTH_PER_URL]}
39
+ except requests.exceptions.RequestException as e:
40
+ logger.error(f"WEBSEARCH: Request failed for {url_to_scrape}: {e}")
41
+ return {"url": url_to_scrape, "error": f"Request failed: {e}"}
42
+ except Exception as e:
43
+ logger.error(f"WEBSEARCH: Scraping error for {url_to_scrape}: {e}", exc_info=True)
44
+ return {"url": url_to_scrape, "error": f"Scraping error: {e}"}
45
+
46
+ def search_and_scrape_duckduckgo(search_query, num_results=3):
47
+ scraped_data = []
48
+ total_content_len = 0
49
+ try:
50
+ with DDGS() as ddgs:
51
+ results = list(ddgs.text(search_query, max_results=num_results + 2, region='wt-wt'))
52
+ if not results: return [{"query": search_query, "error": "No search results."}]
53
+
54
+ for res in results[:num_results]:
55
+ if total_content_len >= MAX_TOTAL_SCRAPED_CONTENT: break
56
+ if not res.get('href'): continue
57
+
58
+ scraped_info = scrape_url(res['href'])
59
+ if not scraped_info.get("title") or scraped_info.get("title") == res['href']:
60
+ scraped_info["title"] = res.get('title', res['href'])
61
+
62
+ current_content = scraped_info.get("content", "")
63
+ if res.get('body') and (not current_content or len(current_content) < 150 or scraped_info.get("error")):
64
+ scraped_info["content"] = f"Search snippet: {res['body']}\n\n(Content from page):\n{current_content}"
65
+
66
+ scraped_data.append(scraped_info)
67
+ if scraped_info.get("content") and not scraped_info.get("error"):
68
+ total_content_len += len(scraped_info["content"])
69
+ return scraped_data
70
+ except Exception as e:
71
+ logger.error(f"WEBSEARCH (DDG): Error for '{search_query}': {e}", exc_info=True)
72
+ return [{"query": search_query, "error": f"DDG search failed: {e}"}]