Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Web Scraper MCP Server | |
A Model Context Protocol server that provides web scraping tools. | |
Exposes functions to scrape websites, convert content to markdown, and generate sitemaps. | |
""" | |
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
from markdownify import markdownify as md | |
from urllib.parse import urljoin, urlparse | |
from typing import Tuple, List, Dict | |
import re | |
import tempfile | |
import zipfile | |
import os | |
def scrape_website_content(url: str) -> Tuple[str, str]: | |
""" | |
Scrape a website and return its main content formatted as markdown and a downloadable file path. | |
Args: | |
url (str): The URL to scrape (can include or omit http/https protocol) | |
Returns: | |
Tuple[str, str]: The scraped content formatted as markdown, and a file path for download | |
""" | |
try: | |
# Validate URL | |
if not url.startswith(('http://', 'https://')): | |
url = 'https://' + url | |
# Create session with proper headers | |
session = requests.Session() | |
session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
# Make request | |
response = session.get(url, timeout=10) | |
response.raise_for_status() | |
# Parse HTML | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
element.decompose() | |
# Try to find main content area | |
main_content = ( | |
soup.find('main') or | |
soup.find('article') or | |
soup.find('div', class_=re.compile(r'content|main|post|article')) or | |
soup.find('body') | |
) | |
if main_content: | |
# Convert to markdown | |
markdown_text = md(str(main_content), heading_style="ATX") | |
# Clean up the markdown | |
# Remove excessive newlines | |
markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) | |
# Remove empty links | |
markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) | |
# Clean up whitespace | |
markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) | |
# Add title if available | |
title = soup.find('title') | |
if title: | |
markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" | |
markdown_text = markdown_text.strip() | |
# Write to temp file for download | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: | |
f.write(markdown_text) | |
temp_path = f.name | |
return markdown_text, temp_path | |
return "No main content found on the webpage.", None | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching URL: {str(e)}", None | |
except Exception as e: | |
return f"Error processing content: {str(e)}", None | |
def generate_sitemap(url: str, max_links_per_domain: int = None) -> Tuple[str, str]: | |
""" | |
Generate a sitemap from all links found on a webpage and provide a downloadable file path. | |
Args: | |
url (str): The URL to analyze for links (can include or omit http/https protocol) | |
max_links_per_domain (int, optional): Maximum number of links to display per domain. | |
If None, shows all links. Defaults to None. | |
Returns: | |
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
""" | |
try: | |
# Validate URL | |
if not url.startswith(('http://', 'https://')): | |
url = 'https://' + url | |
# Create session with proper headers | |
session = requests.Session() | |
session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
# Make request | |
response = session.get(url, timeout=10) | |
response.raise_for_status() | |
# Parse HTML | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Find all links | |
links = soup.find_all('a', href=True) | |
# Process links | |
sitemap_data = [] | |
seen_urls = set() | |
for link in links: | |
href = link.get('href') | |
text = link.get_text().strip() | |
if not href: | |
continue | |
# Convert relative URLs to absolute | |
full_url = urljoin(url, href) | |
# Filter out unwanted links | |
if (full_url in seen_urls or | |
href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or | |
full_url == url): | |
continue | |
seen_urls.add(full_url) | |
# Create link entry | |
if not text: | |
text = href | |
sitemap_data.append({ | |
'text': text[:100] + '...' if len(text) > 100 else text, | |
'url': full_url | |
}) | |
# Generate sitemap markdown | |
if not sitemap_data: | |
return "No links found on this page.", None | |
sitemap_md = "# Sitemap\n\n" | |
sitemap_md += f"Found {len(sitemap_data)} links:\n\n" | |
# Group by domain for better organization | |
domain_groups = {} | |
parsed_base = urlparse(url) | |
for item in sitemap_data: | |
parsed_url = urlparse(item['url']) | |
if parsed_url.netloc == parsed_base.netloc: | |
domain_key = "Internal Links" | |
else: | |
domain_key = f"External Links ({parsed_url.netloc})" | |
if domain_key not in domain_groups: | |
domain_groups[domain_key] = [] | |
domain_groups[domain_key].append(item) | |
# Format sitemap | |
for domain, links in domain_groups.items(): | |
sitemap_md += f"## {domain}\n\n" | |
# Use the limit parameter or show all links if None | |
if max_links_per_domain is None: | |
links_to_show = links | |
remaining_links = 0 | |
else: | |
links_to_show = links[:max_links_per_domain] | |
remaining_links = max(0, len(links) - max_links_per_domain) | |
for link in links_to_show: | |
sitemap_md += f"- [{link['text']}]({link['url']})\n" | |
if remaining_links > 0: | |
sitemap_md += f"- ... and {remaining_links} more links\n" | |
sitemap_md += "\n" | |
# Write to temp file for download | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: | |
f.write(sitemap_md) | |
temp_path = f.name | |
return sitemap_md, temp_path | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching URL: {str(e)}", None | |
except Exception as e: | |
return f"Error processing content: {str(e)}", None | |
def extract_all_content_as_zip(url: str, max_links: int = None) -> Tuple[str, str]: | |
""" | |
Extract text content from all links found on a webpage and create a downloadable zip file. | |
Args: | |
url (str): The URL to analyze for links (can include or omit http/https protocol) | |
max_links (int, optional): Maximum number of links to process. If None, processes all links. Defaults to None. | |
Returns: | |
Tuple[str, str]: Status message and zip file path for download | |
""" | |
try: | |
# Validate URL | |
if not url.startswith(('http://', 'https://')): | |
url = 'https://' + url | |
# Create session with proper headers | |
session = requests.Session() | |
session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
# First get the sitemap to find all links | |
response = session.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
links = soup.find_all('a', href=True) | |
# Process links to get unique URLs | |
unique_urls = set() | |
parsed_base = urlparse(url) | |
for link in links: | |
href = link.get('href') | |
if not href: | |
continue | |
full_url = urljoin(url, href) | |
# Filter out unwanted links and focus on same domain for safety | |
if (href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or | |
full_url == url): | |
continue | |
# Only include internal links to avoid scraping too many external sites | |
parsed_url = urlparse(full_url) | |
if parsed_url.netloc == parsed_base.netloc: | |
unique_urls.add(full_url) | |
if not unique_urls: | |
return "No internal links found to extract content from.", None | |
# Use all URLs or limit if specified | |
urls_to_process = list(unique_urls) | |
total_links_found = len(urls_to_process) | |
# Apply limit if specified | |
if max_links is not None: | |
urls_to_process = urls_to_process[:max_links] | |
limited_message = f" (limited to {max_links} out of {total_links_found})" | |
else: | |
limited_message = "" | |
# Create temporary zip file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: | |
zip_path = temp_zip.name | |
successful_extractions = 0 | |
failed_extractions = 0 | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
for i, link_url in enumerate(urls_to_process, 1): | |
try: | |
# Get content from each link | |
link_response = session.get(link_url, timeout=10) | |
link_response.raise_for_status() | |
# Parse and extract content | |
link_soup = BeautifulSoup(link_response.content, 'html.parser') | |
# Remove unwanted elements | |
for element in link_soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): | |
element.decompose() | |
# Find main content | |
main_content = ( | |
link_soup.find('main') or | |
link_soup.find('article') or | |
link_soup.find('div', class_=re.compile(r'content|main|post|article')) or | |
link_soup.find('body') | |
) | |
if main_content: | |
# Convert to markdown | |
markdown_text = md(str(main_content), heading_style="ATX") | |
# Clean up the markdown | |
markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) | |
markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) | |
markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) | |
# Add title if available | |
title = link_soup.find('title') | |
if title: | |
markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" | |
markdown_text = markdown_text.strip() | |
# Create safe filename | |
parsed_link = urlparse(link_url) | |
safe_filename = re.sub(r'[^\w\-_.]', '_', parsed_link.path or 'index') | |
if not safe_filename.endswith('.md'): | |
safe_filename += '.md' | |
# Ensure unique filename | |
if safe_filename == '.md' or safe_filename == 'index.md': | |
safe_filename = f"page_{i}.md" | |
# Add source URL as header | |
final_content = f"<!-- Source: {link_url} -->\n\n{markdown_text}" | |
# Add to zip | |
zip_file.writestr(safe_filename, final_content) | |
successful_extractions += 1 | |
else: failed_extractions += 1 | |
except Exception as e: | |
failed_extractions += 1 | |
continue | |
status_message = f"Successfully extracted content from {successful_extractions} pages{limited_message}" | |
if failed_extractions > 0: | |
status_message += f", failed to extract from {failed_extractions} pages" | |
status_message += f". Created zip file with {successful_extractions} markdown files." | |
return status_message, zip_path | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching URL: {str(e)}", None | |
except Exception as e: | |
return f"Error processing content: {str(e)}", None | |
def generate_sitemap_for_ui(url: str) -> Tuple[str, str]: | |
""" | |
Wrapper function for the Gradio UI that shows all links without limitation. | |
Args: | |
url (str): The URL to analyze for links | |
Returns: | |
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
""" | |
return generate_sitemap(url, max_links_per_domain=None) | |
def generate_sitemap_with_limit(url: str, max_links_per_domain: int) -> Tuple[str, str]: | |
""" | |
Wrapper function for Gradio UI that allows configurable link limits per domain. | |
Args: | |
url (str): The URL to analyze for links | |
max_links_per_domain (int): Maximum number of links to display per domain (0 = show all) | |
Returns: | |
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download | |
""" | |
limit = None if max_links_per_domain == 0 else max_links_per_domain | |
return generate_sitemap(url, max_links_per_domain=limit) | |
def extract_all_content_for_ui(url: str) -> Tuple[str, str]: | |
""" | |
Wrapper function for the Gradio UI that extracts content from all internal links without limitation. | |
Args: | |
url (str): The URL to analyze for links | |
Returns: | |
Tuple[str, str]: Status message and zip file path for download | |
""" | |
return extract_all_content_as_zip(url, max_links=None) | |
def extract_limited_content_as_zip(url: str, max_links: int) -> Tuple[str, str]: | |
""" | |
Wrapper function for Gradio UI that allows configurable link limits for bulk extraction. | |
Args: | |
url (str): The URL to analyze for links | |
max_links (int): Maximum number of links to process (0 = process all) | |
Returns: | |
Tuple[str, str]: Status message and zip file path for download | |
""" | |
limit = None if max_links == 0 else max_links | |
return extract_all_content_as_zip(url, max_links=limit) | |
# Create Gradio interfaces for each function | |
def create_mcp_interface(): | |
"""Create Gradio interface that exposes web scraping tools as MCP functions.""" | |
# Create individual interfaces for each tool | |
scrape_interface = gr.Interface( | |
fn=scrape_website_content, | |
inputs=gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com or example.com" | |
), | |
outputs=[ | |
gr.Textbox( | |
label="Scraped Content", | |
lines=20, | |
max_lines=50, | |
show_copy_button=True, | |
container=True | |
), | |
gr.File(label="Download Markdown") | |
], | |
title="Website Content Scraper", | |
description="Extract and format website content as markdown", | |
api_name="scrape_content" ) | |
sitemap_interface = gr.Interface( | |
fn=generate_sitemap_for_ui, | |
inputs=gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com or example.com" | |
), | |
outputs=[ | |
gr.Textbox( | |
label="Sitemap", | |
lines=20, | |
max_lines=50, | |
show_copy_button=True, | |
container=True | |
), | |
gr.File(label="Download Sitemap") | |
], | |
title="Website Sitemap Generator", | |
description="Generate a sitemap of all links found on a webpage", | |
api_name="generate_sitemap" | |
) | |
bulk_extract_interface = gr.Interface( | |
fn=extract_all_content_for_ui, | |
inputs=gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com or example.com" | |
), | |
outputs=[ | |
gr.Textbox( | |
label="Extraction Status", | |
lines=10, | |
max_lines=20, | |
show_copy_button=True, | |
container=True | |
), | |
gr.File(label="Download ZIP Archive") | |
], | |
title="Bulk Content Extractor", | |
description="Extract text content from all internal links and download as ZIP", | |
api_name="extract_all_content" ) | |
# Enhanced sitemap interface with configurable limits | |
sitemap_limited_interface = gr.Interface( | |
fn=generate_sitemap_with_limit, | |
inputs=[ | |
gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com or example.com" | |
), | |
gr.Number( | |
label="Max Links Per Domain", | |
value=0, | |
info="Enter 0 to show all links, or a positive number to limit display per domain", | |
minimum=0, | |
maximum=1000 | |
) | |
], | |
outputs=[ | |
gr.Textbox( | |
label="Sitemap", | |
lines=20, | |
max_lines=50, | |
show_copy_button=True, | |
container=True | |
), | |
gr.File(label="Download Sitemap") | |
], | |
title="Configurable Sitemap Generator", | |
description="Generate a sitemap with optional display limits (0 = show all links)", | |
api_name="generate_sitemap_limited" | |
) | |
# Enhanced bulk extract interface with configurable limits | |
bulk_limited_interface = gr.Interface( | |
fn=extract_limited_content_as_zip, | |
inputs=[ | |
gr.Textbox( | |
label="Website URL", | |
placeholder="https://example.com or example.com" | |
), | |
gr.Number( | |
label="Max Pages to Extract", | |
value=0, | |
info="Enter 0 to process all pages, or a positive number to limit extraction", | |
minimum=0, | |
maximum=1000 | |
) | |
], | |
outputs=[ | |
gr.Textbox( | |
label="Extraction Status", | |
lines=10, | |
max_lines=20, | |
show_copy_button=True, | |
container=True | |
), | |
gr.File(label="Download ZIP Archive") | |
], | |
title="Limited Bulk Content Extractor", | |
description="Extract text content from internal links with optional processing limits (0 = extract all)", | |
api_name="extract_limited_content" | |
) | |
# Combine into tabbed interface | |
demo = gr.TabbedInterface( | |
[scrape_interface, sitemap_interface, sitemap_limited_interface, bulk_extract_interface, bulk_limited_interface], | |
["Content Scraper", "All Links Sitemap", "Limited Sitemap", "Bulk Extractor", "Limited Bulk Extractor"], | |
title="🕷️ Web Scraper MCP Server" | |
) | |
return demo | |
if __name__ == "__main__": | |
# Create and launch the MCP server | |
app = create_mcp_interface() | |
app.launch( | |
mcp_server=True | |
) | |