|
""" |
|
MCP Server implementation for sentiment analysis. |
|
|
|
This module implements a Model Context Protocol server that provides |
|
sentiment analysis capabilities through JSON-RPC 2.0 protocol with |
|
async request handling and comprehensive error management. |
|
""" |
|
|
|
import asyncio |
|
import json |
|
import logging |
|
import sys |
|
import traceback |
|
from typing import Dict, Any, List, Optional, Union |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
import uuid |
|
from datetime import datetime |
|
|
|
try: |
|
import uvloop |
|
UVLOOP_AVAILABLE = True |
|
except ImportError: |
|
UVLOOP_AVAILABLE = False |
|
|
|
from .tools import get_tools, list_tools, call_tool |
|
from .sentiment_analyzer import get_analyzer |
|
|
|
|
|
class MCPMessageType(Enum): |
|
"""MCP message types.""" |
|
REQUEST = "request" |
|
RESPONSE = "response" |
|
NOTIFICATION = "notification" |
|
|
|
|
|
@dataclass |
|
class MCPRequest: |
|
"""MCP request message structure.""" |
|
jsonrpc: str |
|
method: str |
|
params: Optional[Dict[str, Any]] = None |
|
id: Optional[Union[str, int]] = None |
|
|
|
|
|
@dataclass |
|
class MCPResponse: |
|
"""MCP response message structure.""" |
|
jsonrpc: str |
|
id: Optional[Union[str, int]] |
|
result: Optional[Dict[str, Any]] = None |
|
error: Optional[Dict[str, Any]] = None |
|
|
|
|
|
@dataclass |
|
class MCPError: |
|
"""MCP error structure.""" |
|
code: int |
|
message: str |
|
data: Optional[Dict[str, Any]] = None |
|
|
|
|
|
class MCPErrorCodes: |
|
"""Standard JSON-RPC 2.0 error codes.""" |
|
PARSE_ERROR = -32700 |
|
INVALID_REQUEST = -32600 |
|
METHOD_NOT_FOUND = -32601 |
|
INVALID_PARAMS = -32602 |
|
INTERNAL_ERROR = -32603 |
|
|
|
|
|
TOOL_ERROR = -32000 |
|
ANALYZER_ERROR = -32001 |
|
VALIDATION_ERROR = -32002 |
|
|
|
|
|
class SentimentMCPServer: |
|
""" |
|
Model Context Protocol server for sentiment analysis. |
|
|
|
Implements JSON-RPC 2.0 protocol with async request handling, |
|
tool registration, and comprehensive error management. |
|
""" |
|
|
|
def __init__(self, name: str = "sentiment-analyzer", version: str = "1.0.0"): |
|
""" |
|
Initialize MCP server. |
|
|
|
Args: |
|
name: Server name |
|
version: Server version |
|
""" |
|
self.name = name |
|
self.version = version |
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
self.running = False |
|
self.request_count = 0 |
|
self.error_count = 0 |
|
self.start_time = None |
|
|
|
|
|
self._handlers = { |
|
"initialize": self._handle_initialize, |
|
"tools/list": self._handle_list_tools, |
|
"tools/call": self._handle_call_tool, |
|
"ping": self._handle_ping, |
|
"server/info": self._handle_server_info, |
|
"server/stats": self._handle_server_stats |
|
} |
|
|
|
self.logger.info(f"Initialized MCP server '{name}' v{version}") |
|
|
|
async def start(self) -> None: |
|
"""Start the MCP server.""" |
|
self.running = True |
|
self.start_time = datetime.now() |
|
self.logger.info(f"MCP server '{self.name}' started") |
|
|
|
|
|
try: |
|
await get_analyzer("auto") |
|
self.logger.info("Sentiment analyzer pre-loaded successfully") |
|
except Exception as e: |
|
self.logger.warning(f"Failed to pre-load analyzer: {e}") |
|
|
|
async def stop(self) -> None: |
|
"""Stop the MCP server.""" |
|
self.running = False |
|
self.logger.info(f"MCP server '{self.name}' stopped") |
|
|
|
|
|
try: |
|
analyzer = await get_analyzer("auto") |
|
await analyzer.cleanup() |
|
except Exception as e: |
|
self.logger.error(f"Error during cleanup: {e}") |
|
|
|
def _create_error_response(self, request_id: Optional[Union[str, int]], |
|
code: int, message: str, |
|
data: Optional[Dict[str, Any]] = None) -> MCPResponse: |
|
""" |
|
Create error response. |
|
|
|
Args: |
|
request_id: Request ID |
|
code: Error code |
|
message: Error message |
|
data: Additional error data |
|
|
|
Returns: |
|
MCPResponse with error |
|
""" |
|
error = { |
|
"code": code, |
|
"message": message |
|
} |
|
if data: |
|
error["data"] = data |
|
|
|
return MCPResponse( |
|
jsonrpc="2.0", |
|
id=request_id, |
|
error=error |
|
) |
|
|
|
def _create_success_response(self, request_id: Optional[Union[str, int]], |
|
result: Dict[str, Any]) -> MCPResponse: |
|
""" |
|
Create success response. |
|
|
|
Args: |
|
request_id: Request ID |
|
result: Response result |
|
|
|
Returns: |
|
MCPResponse with result |
|
""" |
|
return MCPResponse( |
|
jsonrpc="2.0", |
|
id=request_id, |
|
result=result |
|
) |
|
|
|
def _parse_request(self, message: str) -> MCPRequest: |
|
""" |
|
Parse JSON-RPC request message. |
|
|
|
Args: |
|
message: JSON message string |
|
|
|
Returns: |
|
Parsed MCPRequest |
|
|
|
Raises: |
|
ValueError: If parsing fails |
|
""" |
|
try: |
|
data = json.loads(message) |
|
except json.JSONDecodeError as e: |
|
raise ValueError(f"Invalid JSON: {e}") |
|
|
|
|
|
if not isinstance(data, dict): |
|
raise ValueError("Request must be a JSON object") |
|
|
|
if data.get("jsonrpc") != "2.0": |
|
raise ValueError("Invalid JSON-RPC version") |
|
|
|
if "method" not in data: |
|
raise ValueError("Missing 'method' field") |
|
|
|
return MCPRequest( |
|
jsonrpc=data["jsonrpc"], |
|
method=data["method"], |
|
params=data.get("params"), |
|
id=data.get("id") |
|
) |
|
|
|
async def process_request(self, message: str) -> str: |
|
""" |
|
Process incoming JSON-RPC request. |
|
|
|
Args: |
|
message: JSON-RPC request message |
|
|
|
Returns: |
|
JSON-RPC response message |
|
""" |
|
request_id = None |
|
|
|
try: |
|
|
|
try: |
|
request = self._parse_request(message) |
|
request_id = request.id |
|
except ValueError as e: |
|
response = self._create_error_response( |
|
None, MCPErrorCodes.PARSE_ERROR, str(e) |
|
) |
|
return json.dumps(response.__dict__) |
|
|
|
|
|
self.request_count += 1 |
|
|
|
|
|
self.logger.debug(f"Processing request: {request.method} (ID: {request_id})") |
|
|
|
|
|
if request.method in self._handlers: |
|
handler = self._handlers[request.method] |
|
result = await handler(request.params or {}) |
|
response = self._create_success_response(request_id, result) |
|
else: |
|
response = self._create_error_response( |
|
request_id, |
|
MCPErrorCodes.METHOD_NOT_FOUND, |
|
f"Method '{request.method}' not found" |
|
) |
|
|
|
except Exception as e: |
|
self.error_count += 1 |
|
self.logger.error(f"Request processing failed: {e}") |
|
self.logger.debug(traceback.format_exc()) |
|
|
|
response = self._create_error_response( |
|
request_id, |
|
MCPErrorCodes.INTERNAL_ERROR, |
|
"Internal server error", |
|
{"error": str(e), "type": type(e).__name__} |
|
) |
|
|
|
|
|
response_dict = response.__dict__ |
|
|
|
response_dict = {k: v for k, v in response_dict.items() if v is not None} |
|
|
|
return json.dumps(response_dict) |
|
|
|
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle initialize request. |
|
|
|
Args: |
|
params: Initialize parameters |
|
|
|
Returns: |
|
Server capabilities |
|
""" |
|
client_info = params.get("clientInfo", {}) |
|
self.logger.info(f"Client connected: {client_info}") |
|
|
|
return { |
|
"protocolVersion": "2024-11-05", |
|
"capabilities": { |
|
"tools": { |
|
"listChanged": False |
|
}, |
|
"logging": {}, |
|
"prompts": { |
|
"listChanged": False |
|
}, |
|
"resources": { |
|
"subscribe": False, |
|
"listChanged": False |
|
} |
|
}, |
|
"serverInfo": { |
|
"name": self.name, |
|
"version": self.version, |
|
"description": "Sentiment analysis server using TextBlob and Transformers" |
|
} |
|
} |
|
|
|
async def _handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle tools/list request. |
|
|
|
Args: |
|
params: List tools parameters |
|
|
|
Returns: |
|
Available tools |
|
""" |
|
try: |
|
tools = await list_tools() |
|
return {"tools": tools} |
|
except Exception as e: |
|
raise RuntimeError(f"Failed to list tools: {e}") |
|
|
|
async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle tools/call request. |
|
|
|
Args: |
|
params: Tool call parameters |
|
|
|
Returns: |
|
Tool execution result |
|
""" |
|
try: |
|
name = params.get("name") |
|
arguments = params.get("arguments", {}) |
|
|
|
if not name: |
|
raise ValueError("Tool name is required") |
|
|
|
result = await call_tool(name, arguments) |
|
|
|
return { |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": json.dumps(result, indent=2) |
|
} |
|
], |
|
"isError": not result.get("success", True) |
|
} |
|
|
|
except Exception as e: |
|
self.logger.error(f"Tool call failed: {e}") |
|
return { |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": json.dumps({ |
|
"success": False, |
|
"error": str(e), |
|
"error_type": type(e).__name__ |
|
}, indent=2) |
|
} |
|
], |
|
"isError": True |
|
} |
|
|
|
async def _handle_ping(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle ping request. |
|
|
|
Args: |
|
params: Ping parameters |
|
|
|
Returns: |
|
Pong response |
|
""" |
|
return { |
|
"pong": True, |
|
"timestamp": datetime.now().isoformat(), |
|
"server": self.name, |
|
"version": self.version |
|
} |
|
|
|
async def _handle_server_info(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle server/info request. |
|
|
|
Args: |
|
params: Server info parameters |
|
|
|
Returns: |
|
Server information |
|
""" |
|
try: |
|
analyzer = await get_analyzer("auto") |
|
analyzer_info = analyzer.get_info() |
|
except Exception as e: |
|
analyzer_info = {"error": str(e)} |
|
|
|
return { |
|
"server": { |
|
"name": self.name, |
|
"version": self.version, |
|
"running": self.running, |
|
"start_time": self.start_time.isoformat() if self.start_time else None |
|
}, |
|
"analyzer": analyzer_info, |
|
"capabilities": { |
|
"sentiment_analysis": True, |
|
"batch_processing": True, |
|
"multiple_backends": True, |
|
"async_processing": True |
|
} |
|
} |
|
|
|
async def _handle_server_stats(self, params: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Handle server/stats request. |
|
|
|
Args: |
|
params: Server stats parameters |
|
|
|
Returns: |
|
Server statistics |
|
""" |
|
uptime = None |
|
if self.start_time: |
|
uptime = (datetime.now() - self.start_time).total_seconds() |
|
|
|
return { |
|
"requests_processed": self.request_count, |
|
"errors_encountered": self.error_count, |
|
"success_rate": ( |
|
(self.request_count - self.error_count) / self.request_count |
|
if self.request_count > 0 else 0 |
|
), |
|
"uptime_seconds": uptime, |
|
"running": self.running |
|
} |
|
|
|
|
|
class MCPServerRunner: |
|
""" |
|
Runner for MCP server with stdio communication. |
|
|
|
Handles stdin/stdout communication for MCP protocol. |
|
""" |
|
|
|
def __init__(self, server: SentimentMCPServer): |
|
""" |
|
Initialize server runner. |
|
|
|
Args: |
|
server: MCP server instance |
|
""" |
|
self.server = server |
|
self.logger = logging.getLogger(__name__) |
|
|
|
async def run(self) -> None: |
|
"""Run the MCP server with stdio communication.""" |
|
self.logger.info("Starting MCP server with stdio communication") |
|
|
|
|
|
await self.server.start() |
|
|
|
try: |
|
|
|
if UVLOOP_AVAILABLE: |
|
self.logger.info("Using uvloop for better performance") |
|
|
|
|
|
reader = asyncio.StreamReader() |
|
protocol = asyncio.StreamReaderProtocol(reader) |
|
await asyncio.get_event_loop().connect_read_pipe( |
|
lambda: protocol, sys.stdin |
|
) |
|
|
|
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe( |
|
asyncio.streams.FlowControlMixin, sys.stdout |
|
) |
|
writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, asyncio.get_event_loop()) |
|
|
|
self.logger.info("MCP server ready for requests") |
|
|
|
|
|
while self.server.running: |
|
try: |
|
|
|
line = await reader.readline() |
|
if not line: |
|
break |
|
|
|
message = line.decode().strip() |
|
if not message: |
|
continue |
|
|
|
|
|
response = await self.server.process_request(message) |
|
|
|
|
|
writer.write((response + '\n').encode()) |
|
await writer.drain() |
|
|
|
except asyncio.CancelledError: |
|
break |
|
except Exception as e: |
|
self.logger.error(f"Communication error: {e}") |
|
break |
|
|
|
finally: |
|
await self.server.stop() |
|
self.logger.info("MCP server stopped") |
|
|
|
|
|
async def create_server(name: str = "sentiment-analyzer", |
|
version: str = "1.0.0") -> SentimentMCPServer: |
|
""" |
|
Create and configure MCP server. |
|
|
|
Args: |
|
name: Server name |
|
version: Server version |
|
|
|
Returns: |
|
Configured MCP server |
|
""" |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(sys.stderr) |
|
] |
|
) |
|
|
|
|
|
server = SentimentMCPServer(name, version) |
|
|
|
return server |
|
|
|
|
|
async def main() -> None: |
|
"""Main entry point for MCP server.""" |
|
|
|
if UVLOOP_AVAILABLE: |
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
|
|
|
|
|
server = await create_server() |
|
runner = MCPServerRunner(server) |
|
|
|
try: |
|
await runner.run() |
|
except KeyboardInterrupt: |
|
logging.info("Server interrupted by user") |
|
except Exception as e: |
|
logging.error(f"Server error: {e}") |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |