|
from smolagents.tools import Tool |
|
from typing import Dict, Any, Optional |
|
import requests |
|
from bs4 import BeautifulSoup |
|
import re |
|
import json |
|
import pandas as pd |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class WebContentExtractor(Tool): |
|
""" |
|
Specialized tool for extracting structured content from specific websites. |
|
Has optimized extractors for Wikipedia, tabular data, and common content patterns. |
|
""" |
|
name = "web_content_extractor" |
|
description = "Extracts structured data from websites with specialized handlers for Wikipedia and other content types." |
|
inputs = { |
|
'url': {'type': 'string', 'description': 'The URL of the web page to extract content from.'}, |
|
'target_type': {'type': 'string', 'description': 'Type of content to extract: "info", "table", "list", or "specific_data".'}, |
|
'extraction_details': {'type': 'object', 'description': 'Additional details for extraction (e.g., table index, data label).', 'nullable': True} |
|
} |
|
outputs = {'result': {'type': 'object', 'description': 'The extracted content as structured data.'}} |
|
output_type = "object" |
|
|
|
def __init__(self, user_agent="GAIA-Agent/1.0", *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self.headers = {"User-Agent": user_agent} |
|
self.session = requests.Session() |
|
self.session.headers.update(self.headers) |
|
self.is_initialized = True |
|
|
|
def forward(self, url: str, target_type: str, extraction_details: Optional[Dict] = None) -> Dict[str, Any]: |
|
""" |
|
Extract specific content from a web page. |
|
|
|
Args: |
|
url: URL of the web page |
|
target_type: Type of content to extract ("info", "table", "list", "specific_data") |
|
extraction_details: Additional details for extraction |
|
|
|
Returns: |
|
Dict with extracted content or error message |
|
""" |
|
if not extraction_details: |
|
extraction_details = {} |
|
|
|
|
|
if not url.startswith(('http://', 'https://')): |
|
return {"error": f"Invalid URL format: {url}"} |
|
|
|
try: |
|
|
|
if 'wikipedia.org' in url: |
|
return self._extract_from_wikipedia(url, target_type, extraction_details) |
|
|
|
|
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
if target_type == "info": |
|
return self._extract_general_info(soup, url) |
|
elif target_type == "table": |
|
return self._extract_table(soup, url, extraction_details) |
|
elif target_type == "list": |
|
return self._extract_list(soup, url, extraction_details) |
|
elif target_type == "specific_data": |
|
return self._extract_specific_data(soup, url, extraction_details) |
|
else: |
|
return {"error": f"Unknown extraction type: {target_type}"} |
|
|
|
except requests.exceptions.RequestException as e: |
|
return {"error": f"Request error: {str(e)}"} |
|
except Exception as e: |
|
return {"error": f"Extraction error: {str(e)}"} |
|
|
|
def _extract_general_info(self, soup, url): |
|
"""Extract general information from a web page""" |
|
title = soup.title.string if soup.title else "No title found" |
|
|
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
description = meta_desc.get('content', '') if meta_desc else "No description found" |
|
|
|
|
|
main_headings = [h1.get_text(strip=True) for h1 in soup.find_all('h1')] |
|
|
|
|
|
key_facts = {} |
|
|
|
for dl in soup.find_all('dl'): |
|
for dt, dd in zip(dl.find_all('dt'), dl.find_all('dd')): |
|
key = dt.get_text(strip=True) |
|
value = dd.get_text(strip=True) |
|
if key and value: |
|
key_facts[key] = value |
|
|
|
|
|
paragraphs = soup.find_all('p') |
|
summary = "" |
|
para_count = 0 |
|
for p in paragraphs: |
|
text = p.get_text(strip=True) |
|
if len(text) > 50: |
|
summary += text + "\n\n" |
|
para_count += 1 |
|
if para_count >= 3: |
|
break |
|
|
|
return { |
|
"title": title, |
|
"url": url, |
|
"description": description, |
|
"main_headings": main_headings, |
|
"key_facts": key_facts, |
|
"summary": summary.strip() |
|
} |
|
|
|
def _extract_table(self, soup, url, details): |
|
"""Extract table data from a web page""" |
|
table_index = details.get('table_index', 0) |
|
|
|
|
|
tables = soup.find_all('table') |
|
|
|
if not tables: |
|
return {"error": "No tables found on the page"} |
|
|
|
if table_index >= len(tables): |
|
return {"error": f"Table index {table_index} is out of range. Found {len(tables)} tables."} |
|
|
|
try: |
|
|
|
table = tables[table_index] |
|
dfs = pd.read_html(str(table)) |
|
|
|
if not dfs: |
|
return {"error": "Failed to parse table with pandas"} |
|
|
|
df = dfs[0] |
|
|
|
|
|
headers = df.columns.tolist() |
|
rows = df.values.tolist() |
|
|
|
return { |
|
"table_data": { |
|
"headers": headers, |
|
"rows": rows |
|
}, |
|
"row_count": len(rows), |
|
"column_count": len(headers), |
|
"url": url |
|
} |
|
|
|
except Exception as e: |
|
|
|
logger.warning(f"Pandas table extraction failed: {e}. Falling back to manual extraction.") |
|
|
|
table = tables[table_index] |
|
headers = [] |
|
rows = [] |
|
|
|
|
|
thead = table.find('thead') |
|
if thead: |
|
header_row = thead.find('tr') |
|
if header_row: |
|
headers = [th.get_text(strip=True) for th in header_row.find_all(['th', 'td'])] |
|
|
|
|
|
if not headers: |
|
first_row = table.find('tr') |
|
if first_row: |
|
headers = [th.get_text(strip=True) for th in first_row.find_all(['th', 'td'])] |
|
|
|
|
|
for tr in table.find_all('tr'): |
|
row = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])] |
|
if row and row != headers: |
|
rows.append(row) |
|
|
|
return { |
|
"table_data": { |
|
"headers": headers, |
|
"rows": rows |
|
}, |
|
"row_count": len(rows), |
|
"column_count": len(headers) if headers else (len(rows[0]) if rows else 0), |
|
"url": url, |
|
"extraction_method": "manual_fallback" |
|
} |
|
|
|
def _extract_list(self, soup, url, details): |
|
"""Extract list data from a web page""" |
|
list_type = details.get('list_type', 'all') |
|
position = details.get('position', 0) |
|
|
|
list_elements = [] |
|
|
|
if list_type == 'ul' or list_type == 'all': |
|
list_elements.extend(soup.find_all('ul')) |
|
|
|
if list_type == 'ol' or list_type == 'all': |
|
list_elements.extend(soup.find_all('ol')) |
|
|
|
if not list_elements: |
|
return {"error": "No lists found on the page"} |
|
|
|
if position >= len(list_elements): |
|
return {"error": f"List position {position} is out of range. Found {len(list_elements)} lists."} |
|
|
|
target_list = list_elements[position] |
|
items = [] |
|
|
|
for li in target_list.find_all('li', recursive=False): |
|
|
|
for nested_list in li.find_all(['ul', 'ol']): |
|
nested_list.decompose() |
|
|
|
item_text = li.get_text(strip=True) |
|
if item_text: |
|
items.append(item_text) |
|
|
|
return { |
|
"list_type": target_list.name, |
|
"items": items, |
|
"count": len(items), |
|
"url": url |
|
} |
|
|
|
def _extract_specific_data(self, soup, url, details): |
|
"""Extract specific data based on given selectors or patterns""" |
|
data_label = details.get('data_label', '') |
|
selector = details.get('selector', '') |
|
attribute = details.get('attribute', '') |
|
regex_pattern = details.get('regex_pattern', '') |
|
|
|
result = { |
|
"url": url, |
|
"data_label": data_label, |
|
"found": False |
|
} |
|
|
|
|
|
if selector: |
|
elements = soup.select(selector) |
|
if elements: |
|
result["found"] = True |
|
|
|
if attribute: |
|
|
|
values = [elem.get(attribute, '') for elem in elements] |
|
result["values"] = values |
|
else: |
|
|
|
values = [elem.get_text(strip=True) for elem in elements] |
|
result["values"] = values |
|
|
|
|
|
if len(values) == 1: |
|
result["value"] = values[0] |
|
|
|
return result |
|
|
|
|
|
if regex_pattern: |
|
page_text = soup.get_text() |
|
matches = re.findall(regex_pattern, page_text) |
|
|
|
if matches: |
|
result["found"] = True |
|
result["matches"] = matches |
|
|
|
|
|
if len(matches) == 1: |
|
result["value"] = matches[0] |
|
|
|
return result |
|
|
|
|
|
if data_label: |
|
|
|
label_pattern = re.compile(rf'{re.escape(data_label)}\s*[:=-]?\s*([\w\s,.()-]+)', re.IGNORECASE) |
|
page_text = soup.get_text() |
|
match = label_pattern.search(page_text) |
|
|
|
if match: |
|
result["found"] = True |
|
result["value"] = match.group(1).strip() |
|
return result |
|
|
|
|
|
for heading in soup.find_all(['h1', 'h2', 'h3', 'h4']): |
|
if data_label.lower() in heading.get_text().lower(): |
|
next_sibling = heading.find_next_sibling() |
|
if next_sibling and next_sibling.name == 'p': |
|
result["found"] = True |
|
result["value"] = next_sibling.get_text(strip=True) |
|
return result |
|
|
|
|
|
return result |
|
|
|
def _extract_from_wikipedia(self, url, target_type, details): |
|
"""Specialized extraction for Wikipedia pages using APIs when possible""" |
|
|
|
title = url.split('/')[-1] |
|
|
|
|
|
domain = url.split('//')[1].split('.')[0] |
|
|
|
try: |
|
|
|
api_url = f"https://{domain}.wikipedia.org/api/rest_v1/page/summary/{title}" |
|
response = self.session.get(api_url, timeout=15) |
|
response.raise_for_status() |
|
api_data = response.json() |
|
|
|
|
|
if target_type == "info": |
|
return { |
|
"title": api_data.get("title", ""), |
|
"description": api_data.get("description", ""), |
|
"extract": api_data.get("extract", ""), |
|
"url": url, |
|
"source": "wikipedia_api" |
|
} |
|
|
|
|
|
html_response = self.session.get(url, timeout=15) |
|
html_response.raise_for_status() |
|
soup = BeautifulSoup(html_response.content, 'html.parser') |
|
|
|
if target_type == "table": |
|
|
|
if details.get('infobox', False): |
|
infobox = {} |
|
infobox_div = soup.find('table', {'class': 'infobox'}) |
|
|
|
if infobox_div: |
|
for row in infobox_div.find_all('tr'): |
|
header = row.find('th') |
|
data = row.find('td') |
|
if header and data: |
|
key = header.get_text(strip=True) |
|
value = data.get_text(strip=True) |
|
if key and value: |
|
infobox[key] = value |
|
|
|
return { |
|
"title": api_data.get("title", ""), |
|
"infobox": infobox, |
|
"url": url, |
|
"source": "wikipedia_infobox" |
|
} |
|
|
|
|
|
return self._extract_table(soup, url, details) |
|
|
|
elif target_type == "list": |
|
return self._extract_list(soup, url, details) |
|
|
|
elif target_type == "specific_data": |
|
|
|
data_label = details.get('data_label', '') |
|
|
|
|
|
infobox = soup.find('table', {'class': 'infobox'}) |
|
if infobox and data_label: |
|
for row in infobox.find_all('tr'): |
|
header = row.find('th') |
|
if header and data_label.lower() in header.get_text().lower(): |
|
data = row.find('td') |
|
if data: |
|
return { |
|
"found": True, |
|
"value": data.get_text(strip=True), |
|
"source": "wikipedia_infobox", |
|
"url": url |
|
} |
|
|
|
|
|
return self._extract_specific_data(soup, url, details) |
|
|
|
except Exception as e: |
|
logger.warning(f"Wikipedia API extraction failed: {e}. Falling back to HTML extraction.") |
|
|
|
|
|
try: |
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
if target_type == "info": |
|
return self._extract_general_info(soup, url) |
|
elif target_type == "table": |
|
return self._extract_table(soup, url, details) |
|
elif target_type == "list": |
|
return self._extract_list(soup, url, details) |
|
elif target_type == "specific_data": |
|
return self._extract_specific_data(soup, url, details) |
|
|
|
except Exception as fallback_error: |
|
return {"error": f"Wikipedia extraction error: {fallback_error}"} |