Spaces:
Sleeping
Sleeping
File size: 3,714 Bytes
2de095a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
from mcp.server.fastmcp import FastMCP
from typing import Dict, Any, Optional
import json
from datetime import datetime
# Initialize FastMCP server
mcp = FastMCP(
"RequestProcessor", # Name of the MCP server
instructions="You are a request processing assistant that can handle various types of requests and perform data processing tasks.",
host="0.0.0.0",
port=8001,
)
@mcp.tool()
async def process_request(message: str, data: Optional[Dict[str, Any]] = None) -> str:
"""
Process various types of requests and perform data processing.
Args:
message (str): The main message or request description
data (Dict[str, Any], optional): Additional data to process
Returns:
str: Processing result and response
"""
try:
if data is None:
data = {}
# Process the request based on message content
processed_data = {
"original_message": message,
"processed_at": datetime.now().isoformat(),
"service_version": "1.0.0",
"data_size": len(str(data)),
"processing_status": "success"
}
# Add any additional processing logic here
if "analyze" in message.lower():
processed_data["analysis_type"] = "text_analysis"
processed_data["word_count"] = len(message.split())
if "validate" in message.lower():
processed_data["validation_type"] = "data_validation"
processed_data["is_valid"] = True
return json.dumps(processed_data, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error processing request: {str(e)}"
@mcp.tool()
async def get_service_info() -> str:
"""
Get information about the request processing service.
Returns:
str: Service information
"""
info = {
"service_name": "RequestProcessor",
"version": "1.0.0",
"status": "running",
"capabilities": [
"request_processing",
"data_analysis",
"text_validation",
"general_data_handling"
],
"description": "A versatile request processing service that can handle various types of data and requests"
}
return json.dumps(info, indent=2, ensure_ascii=False)
@mcp.tool()
async def validate_data(data: Dict[str, Any]) -> str:
"""
Validate the structure and content of provided data.
Args:
data (Dict[str, Any]): Data to validate
Returns:
str: Validation results
"""
try:
validation_result = {
"is_valid": True,
"validation_timestamp": datetime.now().isoformat(),
"data_type": type(data).__name__,
"data_keys": list(data.keys()) if isinstance(data, dict) else [],
"data_size": len(str(data)),
"issues": []
}
# Basic validation logic
if not data:
validation_result["is_valid"] = False
validation_result["issues"].append("Data is empty")
if isinstance(data, dict):
for key, value in data.items():
if value is None:
validation_result["issues"].append(f"Key '{key}' has null value")
if validation_result["issues"]:
validation_result["is_valid"] = False
return json.dumps(validation_result, indent=2, ensure_ascii=False)
except Exception as e:
return f"Error during validation: {str(e)}"
if __name__ == "__main__":
# Start the MCP server with stdio transport
mcp.run(transport="stdio") |