File size: 3,782 Bytes
cb327b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import requests
from bs4 import BeautifulSoup, Comment
import logging
import re
from duckduckgo_search import DDGS

logger = logging.getLogger(__name__)

DEFAULT_TIMEOUT = 10
MAX_CONTENT_LENGTH_PER_URL = 3000
MAX_TOTAL_SCRAPED_CONTENT = 9000

def scrape_url(url_to_scrape):
    try:
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'}
        response = requests.get(url_to_scrape, headers=headers, timeout=DEFAULT_TIMEOUT, allow_redirects=True)
        response.raise_for_status()
        if 'html' not in response.headers.get('content-type', '').lower():
            return {"url": url_to_scrape, "error": "Non-HTML content"}

        soup = BeautifulSoup(response.content, 'html.parser')
        for el_type in ["script", "style", "nav", "footer", "aside", "form", "header", "menu"]:
            for el in soup.select(el_type): el.decompose()
        for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
            comment.extract()
        
        main_selectors = ['main', 'article', '.main-content', '.entry-content', 'div[role="main"]']
        content_area = next((soup.select_one(s) for s in main_selectors if soup.select_one(s)), soup.body or soup)
        
        text_parts = [el.get_text(separator=' ', strip=True) for el in content_area.find_all(['p', 'h1', 'h2', 'h3', 'li', 'td'])]
        full_text = "\n\n".join(filter(None, text_parts))
        if not full_text: full_text = content_area.get_text(separator='\n', strip=True)
        
        full_text = re.sub(r'\s{3,}', ' ', full_text).strip()
        full_text = re.sub(r'(\n\s*){3,}', '\n\n', full_text)
        title = soup.find('title').string.strip() if soup.find('title') and soup.find('title').string else url_to_scrape
        
        return {"url": url_to_scrape, "title": title, "content": full_text[:MAX_CONTENT_LENGTH_PER_URL]}
    except requests.exceptions.RequestException as e:
        logger.error(f"WEBSEARCH: Request failed for {url_to_scrape}: {e}")
        return {"url": url_to_scrape, "error": f"Request failed: {e}"}
    except Exception as e:
        logger.error(f"WEBSEARCH: Scraping error for {url_to_scrape}: {e}", exc_info=True)
        return {"url": url_to_scrape, "error": f"Scraping error: {e}"}

def search_and_scrape_duckduckgo(search_query, num_results=3):
    scraped_data = []
    total_content_len = 0
    try:
        with DDGS() as ddgs:
            results = list(ddgs.text(search_query, max_results=num_results + 2, region='wt-wt'))
        if not results: return [{"query": search_query, "error": "No search results."}]
        
        for res in results[:num_results]:
            if total_content_len >= MAX_TOTAL_SCRAPED_CONTENT: break
            if not res.get('href'): continue
            
            scraped_info = scrape_url(res['href'])
            if not scraped_info.get("title") or scraped_info.get("title") == res['href']:
                scraped_info["title"] = res.get('title', res['href'])
            
            current_content = scraped_info.get("content", "")
            if res.get('body') and (not current_content or len(current_content) < 150 or scraped_info.get("error")):
                scraped_info["content"] = f"Search snippet: {res['body']}\n\n(Content from page):\n{current_content}"
            
            scraped_data.append(scraped_info)
            if scraped_info.get("content") and not scraped_info.get("error"):
                total_content_len += len(scraped_info["content"])
        return scraped_data
    except Exception as e:
        logger.error(f"WEBSEARCH (DDG): Error for '{search_query}': {e}", exc_info=True)
        return [{"query": search_query, "error": f"DDG search failed: {e}"}]