from mcp.server.fastmcp import FastMCP from typing import Dict, Any, Optional import json from datetime import datetime # Initialize FastMCP server mcp = FastMCP( "RequestProcessor", # Name of the MCP server instructions="You are a request processing assistant that can handle various types of requests and perform data processing tasks.", host="0.0.0.0", port=8001, ) @mcp.tool() async def process_request(message: str, data: Optional[Dict[str, Any]] = None) -> str: """ Process various types of requests and perform data processing. Args: message (str): The main message or request description data (Dict[str, Any], optional): Additional data to process Returns: str: Processing result and response """ try: if data is None: data = {} # Process the request based on message content processed_data = { "original_message": message, "processed_at": datetime.now().isoformat(), "service_version": "1.0.0", "data_size": len(str(data)), "processing_status": "success" } # Add any additional processing logic here if "analyze" in message.lower(): processed_data["analysis_type"] = "text_analysis" processed_data["word_count"] = len(message.split()) if "validate" in message.lower(): processed_data["validation_type"] = "data_validation" processed_data["is_valid"] = True return json.dumps(processed_data, indent=2, ensure_ascii=False) except Exception as e: return f"Error processing request: {str(e)}" @mcp.tool() async def get_service_info() -> str: """ Get information about the request processing service. Returns: str: Service information """ info = { "service_name": "RequestProcessor", "version": "1.0.0", "status": "running", "capabilities": [ "request_processing", "data_analysis", "text_validation", "general_data_handling" ], "description": "A versatile request processing service that can handle various types of data and requests" } return json.dumps(info, indent=2, ensure_ascii=False) @mcp.tool() async def validate_data(data: Dict[str, Any]) -> str: """ Validate the structure and content of provided data. Args: data (Dict[str, Any]): Data to validate Returns: str: Validation results """ try: validation_result = { "is_valid": True, "validation_timestamp": datetime.now().isoformat(), "data_type": type(data).__name__, "data_keys": list(data.keys()) if isinstance(data, dict) else [], "data_size": len(str(data)), "issues": [] } # Basic validation logic if not data: validation_result["is_valid"] = False validation_result["issues"].append("Data is empty") if isinstance(data, dict): for key, value in data.items(): if value is None: validation_result["issues"].append(f"Key '{key}' has null value") if validation_result["issues"]: validation_result["is_valid"] = False return json.dumps(validation_result, indent=2, ensure_ascii=False) except Exception as e: return f"Error during validation: {str(e)}" if __name__ == "__main__": # Start the MCP server with stdio transport mcp.run(transport="stdio")