File size: 3,714 Bytes
2de095a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from mcp.server.fastmcp import FastMCP
from typing import Dict, Any, Optional
import json
from datetime import datetime

# Initialize FastMCP server
mcp = FastMCP(
    "RequestProcessor",  # Name of the MCP server
    instructions="You are a request processing assistant that can handle various types of requests and perform data processing tasks.",
    host="0.0.0.0",
    port=8001,
)

@mcp.tool()
async def process_request(message: str, data: Optional[Dict[str, Any]] = None) -> str:
    """
    Process various types of requests and perform data processing.
    
    Args:
        message (str): The main message or request description
        data (Dict[str, Any], optional): Additional data to process
        
    Returns:
        str: Processing result and response
    """
    try:
        if data is None:
            data = {}
        
        # Process the request based on message content
        processed_data = {
            "original_message": message,
            "processed_at": datetime.now().isoformat(),
            "service_version": "1.0.0",
            "data_size": len(str(data)),
            "processing_status": "success"
        }
        
        # Add any additional processing logic here
        if "analyze" in message.lower():
            processed_data["analysis_type"] = "text_analysis"
            processed_data["word_count"] = len(message.split())
        
        if "validate" in message.lower():
            processed_data["validation_type"] = "data_validation"
            processed_data["is_valid"] = True
        
        return json.dumps(processed_data, indent=2, ensure_ascii=False)
        
    except Exception as e:
        return f"Error processing request: {str(e)}"

@mcp.tool()
async def get_service_info() -> str:
    """
    Get information about the request processing service.
    
    Returns:
        str: Service information
    """
    info = {
        "service_name": "RequestProcessor",
        "version": "1.0.0",
        "status": "running",
        "capabilities": [
            "request_processing",
            "data_analysis", 
            "text_validation",
            "general_data_handling"
        ],
        "description": "A versatile request processing service that can handle various types of data and requests"
    }
    
    return json.dumps(info, indent=2, ensure_ascii=False)

@mcp.tool()
async def validate_data(data: Dict[str, Any]) -> str:
    """
    Validate the structure and content of provided data.
    
    Args:
        data (Dict[str, Any]): Data to validate
        
    Returns:
        str: Validation results
    """
    try:
        validation_result = {
            "is_valid": True,
            "validation_timestamp": datetime.now().isoformat(),
            "data_type": type(data).__name__,
            "data_keys": list(data.keys()) if isinstance(data, dict) else [],
            "data_size": len(str(data)),
            "issues": []
        }
        
        # Basic validation logic
        if not data:
            validation_result["is_valid"] = False
            validation_result["issues"].append("Data is empty")
        
        if isinstance(data, dict):
            for key, value in data.items():
                if value is None:
                    validation_result["issues"].append(f"Key '{key}' has null value")
        
        if validation_result["issues"]:
            validation_result["is_valid"] = False
        
        return json.dumps(validation_result, indent=2, ensure_ascii=False)
        
    except Exception as e:
        return f"Error during validation: {str(e)}"

if __name__ == "__main__":
    # Start the MCP server with stdio transport
    mcp.run(transport="stdio")