|
import requests |
|
from bs4 import BeautifulSoup |
|
from smolagents.tools import Tool |
|
import re |
|
import json |
|
import logging |
|
import time |
|
from urllib.parse import urlparse, urljoin |
|
import pandas as pd |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class WebBrowser(Tool): |
|
""" |
|
Retrieves information from online sources by browsing web pages. |
|
Useful for extracting or summarizing web content, with special handling for structured data. |
|
Can extract tables, lists, and key information from web pages. |
|
""" |
|
name = "web_browser" |
|
description = "Fetches content from web pages with improved structured data handling. Has specialized extraction for Wikipedia. Returns text content or structured data." |
|
inputs = { |
|
'url': {'type': 'string', 'description': 'The URL of the web page to browse.'}, |
|
'extraction_mode': {'type': 'string', 'description': 'Mode for data extraction: "text" (default), "tables", "lists", or "structured".', 'nullable': True} |
|
} |
|
outputs = {'content': {'type': 'object', 'description': 'The extracted content from the web page, either as text or structured data.'}} |
|
output_type = "object" |
|
|
|
def __init__(self, user_agent="GAIA-Agent/1.0", *args, **kwargs): |
|
""" |
|
Initializes the web browser with a user agent. |
|
Args: |
|
user_agent (str): The User-Agent string to use for requests. |
|
""" |
|
super().__init__(*args, **kwargs) |
|
self.headers = {"User-Agent": user_agent} |
|
self.is_initialized = True |
|
|
|
self.session = requests.Session() |
|
self.session.headers.update(self.headers) |
|
|
|
def forward(self, url: str, extraction_mode: str = "text") -> dict: |
|
""" |
|
Fetches the content of a web page and extracts information based on the specified mode. |
|
|
|
Args: |
|
url (str): The URL of the web page to browse. |
|
extraction_mode (str): The mode for data extraction - "text" (default), "tables", "lists", or "structured" |
|
|
|
Returns: |
|
dict: The extracted content or an error message |
|
""" |
|
|
|
if not url.startswith(('http://', 'https://')): |
|
return {"error": f"Invalid URL format. URL must start with http:// or https://. Received: {url}"} |
|
|
|
try: |
|
|
|
if 'wikipedia.org' in url: |
|
return self._handle_wikipedia(url, extraction_mode) |
|
|
|
|
|
return self._process_regular_webpage(url, extraction_mode) |
|
|
|
except requests.exceptions.HTTPError as http_err: |
|
return {"error": f"HTTP error occurred while fetching {url}: {http_err}"} |
|
except requests.exceptions.ConnectionError as conn_err: |
|
return {"error": f"Connection error occurred while fetching {url}: {conn_err}"} |
|
except requests.exceptions.Timeout as timeout_err: |
|
return {"error": f"Timeout occurred while fetching {url}: {timeout_err}"} |
|
except requests.exceptions.RequestException as req_err: |
|
return {"error": f"An unexpected error occurred while fetching {url}: {req_err}"} |
|
except Exception as e: |
|
return {"error": f"An unexpected error occurred during parsing of {url}: {e}"} |
|
|
|
def _process_regular_webpage(self, url, extraction_mode): |
|
"""Process a regular (non-Wikipedia) webpage""" |
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for script_or_style in soup(["script", "style"]): |
|
script_or_style.decompose() |
|
|
|
if extraction_mode == "text": |
|
return self._extract_text(soup, url) |
|
elif extraction_mode == "tables": |
|
return self._extract_tables(soup, url) |
|
elif extraction_mode == "lists": |
|
return self._extract_lists(soup, url) |
|
elif extraction_mode == "structured": |
|
return self._extract_structured_data(soup, url) |
|
else: |
|
return {"error": f"Unknown extraction mode: {extraction_mode}"} |
|
|
|
def _handle_wikipedia(self, url, extraction_mode): |
|
"""Special handling for Wikipedia pages""" |
|
|
|
parsed_url = urlparse(url) |
|
if not parsed_url.netloc.endswith('wikipedia.org'): |
|
return self._process_regular_webpage(url, extraction_mode) |
|
|
|
|
|
path_parts = parsed_url.path.split('/') |
|
if len(path_parts) < 3 or path_parts[1] != 'wiki': |
|
|
|
return self._process_regular_webpage(url, extraction_mode) |
|
|
|
title = path_parts[2] |
|
lang = parsed_url.netloc.split('.')[0] |
|
|
|
|
|
api_url = f"https://{lang}.wikipedia.org/api/rest_v1/page/summary/{title}" |
|
|
|
try: |
|
logger.info(f"Fetching Wikipedia API data from {api_url}") |
|
api_response = self.session.get(api_url, timeout=15) |
|
api_response.raise_for_status() |
|
api_data = api_response.json() |
|
|
|
|
|
wiki_data = { |
|
"title": api_data.get("title", ""), |
|
"description": api_data.get("description", ""), |
|
"extract": api_data.get("extract", ""), |
|
"url": api_data.get("content_urls", {}).get("desktop", {}).get("page", url) |
|
} |
|
|
|
|
|
if extraction_mode in ["tables", "structured"]: |
|
|
|
response = self.session.get(url, timeout=15) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
tables = self._extract_tables(soup, url, return_raw=False) |
|
wiki_data["tables"] = tables.get("tables", []) |
|
|
|
|
|
if extraction_mode == "structured": |
|
wiki_data["infobox"] = self._extract_wikipedia_infobox(soup) |
|
wiki_data["sections"] = self._extract_wikipedia_sections(soup) |
|
|
|
return { |
|
"source": "wikipedia_api_enhanced", |
|
"url": url, |
|
"data": wiki_data |
|
} |
|
|
|
|
|
return { |
|
"source": "wikipedia_api", |
|
"url": url, |
|
"data": wiki_data |
|
} |
|
|
|
except (requests.exceptions.RequestException, ValueError) as e: |
|
logger.warning(f"Wikipedia API request failed: {e}. Falling back to HTML scraping.") |
|
|
|
return self._process_regular_webpage(url, extraction_mode) |
|
|
|
def _extract_text(self, soup, url): |
|
"""Extract clean text from the page""" |
|
text_from_soup = soup.get_text(separator='\n', strip=True) |
|
|
|
|
|
cleaned_lines = [] |
|
for line in text_from_soup.splitlines(): |
|
line = line.strip() |
|
if line: |
|
|
|
cleaned_line = ' '.join(line.split()) |
|
cleaned_lines.append(cleaned_line) |
|
|
|
text = '\n'.join(cleaned_lines) |
|
|
|
if not text: |
|
return {"error": f"No text content found at {url}."} |
|
|
|
return { |
|
"source": "web_page", |
|
"url": url, |
|
"content_type": "text", |
|
"text": text |
|
} |
|
|
|
def _extract_tables(self, soup, url, return_raw=True): |
|
"""Extract tables from the page""" |
|
tables = [] |
|
|
|
|
|
html_tables = soup.find_all('table') |
|
|
|
for i, table in enumerate(html_tables): |
|
try: |
|
|
|
dfs = pd.read_html(str(table)) |
|
|
|
if dfs: |
|
|
|
for j, df in enumerate(dfs): |
|
|
|
df.columns = [str(col).strip() for col in df.columns] |
|
|
|
|
|
table_dict = { |
|
"table_id": f"table_{i}_{j}", |
|
"headers": df.columns.tolist(), |
|
"rows": df.values.tolist(), |
|
} |
|
tables.append(table_dict) |
|
except Exception as e: |
|
logger.warning(f"Failed to parse table {i}: {e}") |
|
|
|
try: |
|
headers = [] |
|
header_row = table.find('tr') |
|
if header_row: |
|
headers = [th.get_text(strip=True) for th in header_row.find_all(['th', 'td'])] |
|
|
|
rows = [] |
|
for tr in table.find_all('tr'): |
|
row = [td.get_text(strip=True) for td in tr.find_all(['td', 'th'])] |
|
if row and row != headers: |
|
rows.append(row) |
|
|
|
if headers or rows: |
|
tables.append({ |
|
"table_id": f"table_{i}_manual", |
|
"headers": headers, |
|
"rows": rows |
|
}) |
|
except Exception: |
|
continue |
|
|
|
if return_raw: |
|
return { |
|
"source": "web_page", |
|
"url": url, |
|
"content_type": "tables", |
|
"table_count": len(tables), |
|
"tables": tables |
|
} |
|
else: |
|
return {"tables": tables} |
|
|
|
def _extract_lists(self, soup, url): |
|
"""Extract lists from the page""" |
|
lists = [] |
|
|
|
|
|
for list_type in ['ul', 'ol']: |
|
list_elements = soup.find_all(list_type, recursive=True) |
|
|
|
for i, list_elem in enumerate(list_elements): |
|
|
|
if list_elem.parent.name in ['li', 'ul', 'ol']: |
|
continue |
|
|
|
items = [] |
|
for li in list_elem.find_all('li', recursive=False): |
|
|
|
for nested_list in li.find_all(['ul', 'ol']): |
|
nested_list.decompose() |
|
|
|
item_text = li.get_text(strip=True) |
|
if item_text: |
|
items.append(item_text) |
|
|
|
if items: |
|
lists.append({ |
|
"list_id": f"{list_type}_{i}", |
|
"list_type": "ordered" if list_type == "ol" else "unordered", |
|
"items": items |
|
}) |
|
|
|
return { |
|
"source": "web_page", |
|
"url": url, |
|
"content_type": "lists", |
|
"list_count": len(lists), |
|
"lists": lists |
|
} |
|
|
|
def _extract_structured_data(self, soup, url): |
|
"""Extract various types of structured data from the page""" |
|
result = { |
|
"source": "web_page", |
|
"url": url, |
|
"content_type": "structured", |
|
"title": soup.title.string if soup.title else "", |
|
"meta_description": "", |
|
} |
|
|
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
if meta_desc: |
|
result["meta_description"] = meta_desc.get('content', '') |
|
|
|
|
|
text_result = self._extract_text(soup, url) |
|
if "text" in text_result: |
|
result["text"] = text_result["text"] |
|
|
|
|
|
tables_result = self._extract_tables(soup, url, return_raw=False) |
|
result["tables"] = tables_result.get("tables", []) |
|
|
|
|
|
lists_result = self._extract_lists(soup, url) |
|
result["lists"] = lists_result.get("lists", []) |
|
|
|
|
|
headings = [] |
|
for i, heading in enumerate(soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])): |
|
headings.append({ |
|
"id": f"heading_{i}", |
|
"level": int(heading.name[1]), |
|
"text": heading.get_text(strip=True) |
|
}) |
|
result["headings"] = headings |
|
|
|
|
|
json_ld_data = [] |
|
for script in soup.find_all('script', type='application/ld+json'): |
|
try: |
|
json_data = json.loads(script.string) |
|
json_ld_data.append(json_data) |
|
except (json.JSONDecodeError, ValueError): |
|
continue |
|
|
|
if json_ld_data: |
|
result["structured_data"] = json_ld_data |
|
|
|
return result |
|
|
|
def _extract_wikipedia_infobox(self, soup): |
|
"""Extract information from Wikipedia infobox""" |
|
infobox = {} |
|
|
|
|
|
infobox_table = soup.find('table', class_=['infobox', 'vcard']) |
|
if infobox_table: |
|
for row in infobox_table.find_all('tr'): |
|
|
|
header = row.find('th') |
|
value = row.find('td') |
|
|
|
if header and value: |
|
key = header.get_text(strip=True) |
|
|
|
for sup in value.find_all('sup'): |
|
sup.decompose() |
|
|
|
val = value.get_text(strip=True) |
|
if key and val: |
|
infobox[key] = val |
|
|
|
return infobox |
|
|
|
def _extract_wikipedia_sections(self, soup): |
|
"""Extract sections and their content from Wikipedia""" |
|
sections = [] |
|
current_section = None |
|
|
|
|
|
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) |
|
|
|
for heading in headings: |
|
|
|
if heading.get('id') in ['firstHeading', 'mw-toc-heading']: |
|
continue |
|
|
|
level = int(heading.name[1]) |
|
title = heading.get_text(strip=True) |
|
|
|
|
|
current_section = { |
|
"level": level, |
|
"title": title, |
|
"content": "" |
|
} |
|
|
|
|
|
content_elements = [] |
|
sibling = heading.next_sibling |
|
|
|
while sibling and not (sibling.name and sibling.name.startswith('h')): |
|
if sibling.name in ['p', 'ul', 'ol']: |
|
content_elements.append(sibling.get_text(strip=True)) |
|
sibling = sibling.next_sibling |
|
|
|
if content_elements: |
|
current_section["content"] = "\n".join(content_elements) |
|
sections.append(current_section) |
|
|
|
return sections |
|
|
|
if __name__ == '__main__': |
|
browser = WebBrowser() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
test_url_wikipedia = "https://en.wikipedia.org/wiki/Mercedes_Sosa" |
|
print(f"--- Browsing: {test_url_wikipedia} ---") |
|
|
|
content_wikipedia = browser.forward(test_url_wikipedia) |
|
if content_wikipedia.startswith("Error:"): |
|
print(content_wikipedia) |
|
else: |
|
|
|
print(content_wikipedia[:1000] + "..." if len(content_wikipedia) > 1000 else content_wikipedia) |
|
|
|
print("\n--- Example with a non-existent page ---") |
|
test_url_non_existent = "http://example.com/nonexistentpage12345.html" |
|
content_non_existent = browser.forward(test_url_non_existent) |
|
print(content_non_existent) |
|
|
|
print("\n--- Example with an invalid URL format ---") |
|
test_url_invalid_format = "www.google.com" |
|
content_invalid_format = browser.forward(test_url_invalid_format) |
|
print(content_invalid_format) |