|
""" |
|
Web Search MCP Server - Feed LLMs with fresh sources |
|
==================================================== |
|
|
|
Prerequisites |
|
------------- |
|
$ pip install "gradio[mcp]" httpx trafilatura python-dateutil limits |
|
|
|
Environment |
|
----------- |
|
export SERPER_API_KEY="YOUR-KEY-HERE" |
|
|
|
Usage |
|
----- |
|
python app_mcp.py |
|
Then connect to: http://localhost:7860/gradio_api/mcp/sse |
|
""" |
|
|
|
import os |
|
import asyncio |
|
from typing import Optional |
|
from datetime import datetime |
|
import httpx |
|
import trafilatura |
|
import gradio as gr |
|
from dateutil import parser as dateparser |
|
from limits import parse |
|
from limits.aio.storage import MemoryStorage |
|
from limits.aio.strategies import MovingWindowRateLimiter |
|
|
|
|
|
SERPER_API_KEY = os.getenv("SERPER_API_KEY") |
|
SERPER_SEARCH_ENDPOINT = "https://google.serper.dev/search" |
|
SERPER_NEWS_ENDPOINT = "https://google.serper.dev/news" |
|
HEADERS = {"X-API-KEY": SERPER_API_KEY, "Content-Type": "application/json"} |
|
|
|
|
|
storage = MemoryStorage() |
|
limiter = MovingWindowRateLimiter(storage) |
|
rate_limit = parse("360/hour") |
|
|
|
|
|
async def search_web( |
|
query: str, search_type: str = "search", num_results: Optional[int] = 4 |
|
) -> str: |
|
""" |
|
Search the web for information or fresh news, returning extracted content. |
|
|
|
This tool can perform two types of searches: |
|
- "search" (default): General web search for diverse, relevant content from various sources |
|
- "news": Specifically searches for fresh news articles and breaking stories |
|
|
|
Use "news" mode when looking for: |
|
- Breaking news or very recent events |
|
- Time-sensitive information |
|
- Current affairs and latest developments |
|
- Today's/this week's happenings |
|
|
|
Use "search" mode (default) for: |
|
- General information and research |
|
- Technical documentation or guides |
|
- Historical information |
|
- Diverse perspectives from various sources |
|
|
|
Args: |
|
query (str): The search query. This is REQUIRED. Examples: "apple inc earnings", |
|
"climate change 2024", "AI developments" |
|
search_type (str): Type of search. This is OPTIONAL. Default is "search". |
|
Options: "search" (general web search) or "news" (fresh news articles). |
|
Use "news" for time-sensitive, breaking news content. |
|
num_results (int): Number of results to fetch. This is OPTIONAL. Default is 4. |
|
Range: 1-20. More results = more context but longer response time. |
|
|
|
Returns: |
|
str: Formatted text containing extracted content with metadata (title, |
|
source, date, URL, and main text) for each result, separated by dividers. |
|
Returns error message if API key is missing or search fails. |
|
|
|
Examples: |
|
- search_web("OpenAI GPT-5", "news") - Get 5 fresh news articles about OpenAI |
|
- search_web("python tutorial", "search") - Get 4 general results about Python (default count) |
|
- search_web("stock market today", "news", 10) - Get 10 news articles about today's market |
|
- search_web("machine learning basics") - Get 4 general search results (all defaults) |
|
""" |
|
if not SERPER_API_KEY: |
|
return "Error: SERPER_API_KEY environment variable is not set. Please set it to use this tool." |
|
|
|
|
|
if num_results is None: |
|
num_results = 4 |
|
num_results = max(1, min(20, num_results)) |
|
|
|
|
|
if search_type not in ["search", "news"]: |
|
search_type = "search" |
|
|
|
try: |
|
|
|
if not await limiter.hit(rate_limit, "global"): |
|
print(f"[{datetime.now().isoformat()}] Rate limit exceeded") |
|
return "Error: Rate limit exceeded. Please try again later (limit: 500 requests per hour)." |
|
|
|
|
|
endpoint = ( |
|
SERPER_NEWS_ENDPOINT if search_type == "news" else SERPER_SEARCH_ENDPOINT |
|
) |
|
|
|
|
|
payload = {"q": query, "num": num_results} |
|
if search_type == "news": |
|
payload["type"] = "news" |
|
payload["page"] = 1 |
|
|
|
async with httpx.AsyncClient(timeout=15) as client: |
|
resp = await client.post(endpoint, headers=HEADERS, json=payload) |
|
|
|
if resp.status_code != 200: |
|
return f"Error: Search API returned status {resp.status_code}. Please check your API key and try again." |
|
|
|
|
|
if search_type == "news": |
|
results = resp.json().get("news", []) |
|
else: |
|
results = resp.json().get("organic", []) |
|
|
|
if not results: |
|
return f"No {search_type} results found for query: '{query}'. Try a different search term or search type." |
|
|
|
|
|
urls = [r["link"] for r in results] |
|
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: |
|
tasks = [client.get(u) for u in urls] |
|
responses = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
chunks = [] |
|
successful_extractions = 0 |
|
|
|
for meta, response in zip(results, responses): |
|
if isinstance(response, Exception): |
|
continue |
|
|
|
|
|
body = trafilatura.extract( |
|
response.text, include_formatting=False, include_comments=False |
|
) |
|
|
|
if not body: |
|
continue |
|
|
|
successful_extractions += 1 |
|
print( |
|
f"[{datetime.now().isoformat()}] Successfully extracted content from {meta['link']}" |
|
) |
|
|
|
|
|
if search_type == "news": |
|
|
|
try: |
|
date_str = meta.get("date", "") |
|
if date_str: |
|
date_iso = dateparser.parse(date_str, fuzzy=True).strftime( |
|
"%Y-%m-%d" |
|
) |
|
else: |
|
date_iso = "Unknown" |
|
except Exception: |
|
date_iso = "Unknown" |
|
|
|
chunk = ( |
|
f"## {meta['title']}\n" |
|
f"**Source:** {meta.get('source', 'Unknown')} " |
|
f"**Date:** {date_iso}\n" |
|
f"**URL:** {meta['link']}\n\n" |
|
f"{body.strip()}\n" |
|
) |
|
else: |
|
|
|
domain = meta["link"].split("/")[2].replace("www.", "") |
|
|
|
chunk = ( |
|
f"## {meta['title']}\n" |
|
f"**Domain:** {domain}\n" |
|
f"**URL:** {meta['link']}\n\n" |
|
f"{body.strip()}\n" |
|
) |
|
|
|
chunks.append(chunk) |
|
|
|
if not chunks: |
|
return f"Found {len(results)} {search_type} results for '{query}', but couldn't extract readable content from any of them. The websites might be blocking automated access." |
|
|
|
result = "\n---\n".join(chunks) |
|
summary = f"Successfully extracted content from {successful_extractions} out of {len(results)} {search_type} results for query: '{query}'\n\n---\n\n" |
|
|
|
print( |
|
f"[{datetime.now().isoformat()}] Extraction complete: {successful_extractions}/{len(results)} successful for query '{query}'" |
|
) |
|
return summary + result |
|
|
|
except Exception as e: |
|
return f"Error occurred while searching: {str(e)}. Please try again or check your query." |
|
|
|
|
|
|
|
with gr.Blocks(title="Web Search MCP Server") as demo: |
|
gr.HTML( |
|
""" |
|
<div style="background-color: rgba(59, 130, 246, 0.1); border: 1px solid rgba(59, 130, 246, 0.3); border-radius: 8px; padding: 12px; margin-bottom: 16px; text-align: center;"> |
|
<p style="color: rgb(59, 130, 246); margin: 0; font-size: 14px; font-weight: 500;"> |
|
π€ Community resource β please use responsibly to keep this service available for everyone |
|
</p> |
|
</div> |
|
""" |
|
) |
|
|
|
gr.Markdown( |
|
""" |
|
# π Web Search MCP Server |
|
|
|
This MCP server provides web search capabilities to LLMs. It can perform general web searches |
|
or specifically search for fresh news articles, extracting the main content from results. |
|
|
|
**Search Types:** |
|
- **General Search**: Diverse results from various sources (blogs, docs, articles, etc.) |
|
- **News Search**: Fresh news articles and breaking stories from news sources |
|
|
|
**Note:** This interface is primarily designed for MCP tool usage by LLMs, but you can |
|
also test it manually below. |
|
""" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
query_input = gr.Textbox( |
|
label="Search Query", |
|
placeholder='e.g. "OpenAI news", "climate change 2024", "AI developments"', |
|
info="Required: Enter your search query", |
|
) |
|
with gr.Column(scale=1): |
|
search_type_input = gr.Radio( |
|
choices=["search", "news"], |
|
value="search", |
|
label="Search Type", |
|
info="Choose search type", |
|
) |
|
|
|
with gr.Row(): |
|
num_results_input = gr.Slider( |
|
minimum=1, |
|
maximum=20, |
|
value=4, |
|
step=1, |
|
label="Number of Results", |
|
info="Optional: How many results to fetch (default: 4)", |
|
) |
|
|
|
search_button = gr.Button("Search", variant="primary") |
|
|
|
output = gr.Textbox( |
|
label="Extracted Content", |
|
lines=25, |
|
max_lines=50, |
|
info="The extracted article content will appear here", |
|
) |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
["OpenAI GPT-5 latest developments", "news", 5], |
|
["React hooks useState", "search", 4], |
|
["Tesla stock price today", "news", 6], |
|
["Apple Vision Pro reviews", "search", 4], |
|
["best Italian restaurants NYC", "search", 4], |
|
], |
|
inputs=[query_input, search_type_input, num_results_input], |
|
outputs=output, |
|
fn=search_web, |
|
cache_examples=False, |
|
) |
|
|
|
search_button.click( |
|
fn=search_web, |
|
inputs=[query_input, search_type_input, num_results_input], |
|
outputs=output, |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
demo.launch(mcp_server=True, show_api=True) |
|
|