File size: 1,601 Bytes
ffe126a d9c3f2e ffe126a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import os
import asyncio
import httpx
from tavily import TavilyClient
from mcp.server.fastmcp import FastMCP
import nest_asyncio
from dotenv import load_dotenv
# Apply nested asyncio patch
nest_asyncio.apply()
# Initialize Tavily client and FastMCP
tavily_client = TavilyClient(api_key=TAVILY_API_KEY)
mcp = FastMCP("tavily_search")
# Define the search tool
@mcp.tool()
async def search_autism(query: str) -> dict:
"""Performs a Tavily web search for information about autism."""
try:
# Execute a search query
response = tavily_client.search(
query=query,
max_results=5,
search_depth="advanced",
topic="general",
name="live_search",
description="""Using the latest information from reputable online sources, provide a concise AI-generated overview of the query related to autism spectrum """,
include_answer=True
)
return {
"results": response.get("results", []),
"answer": response.get("answer", "")
}
except Exception as e:
return {"error": f"Search failed: {str(e)}"}
# Main entry point
# async def main():
# query = "autism symptoms and treatments" # Replace with dynamic input if needed
# result = await search_autism(query)
# print("Search Results:")
# for res in result.get("results", []):
# print(f"- {res.get('title')} ({res.get('url')})")
# print("\nAnswer:")
# print(result.get("answer", "No answer provided."))
# Run the script
if __name__ == "__main__":
asyncio.run(main())
|