""" MCP Server implementation for sentiment analysis. This module implements a Model Context Protocol server that provides sentiment analysis capabilities through JSON-RPC 2.0 protocol with async request handling and comprehensive error management. """ import asyncio import json import logging import sys import traceback from typing import Dict, Any, List, Optional, Union from dataclasses import dataclass from enum import Enum import uuid from datetime import datetime try: import uvloop UVLOOP_AVAILABLE = True except ImportError: UVLOOP_AVAILABLE = False from .tools import get_tools, list_tools, call_tool from .sentiment_analyzer import get_analyzer class MCPMessageType(Enum): """MCP message types.""" REQUEST = "request" RESPONSE = "response" NOTIFICATION = "notification" @dataclass class MCPRequest: """MCP request message structure.""" jsonrpc: str method: str params: Optional[Dict[str, Any]] = None id: Optional[Union[str, int]] = None @dataclass class MCPResponse: """MCP response message structure.""" jsonrpc: str id: Optional[Union[str, int]] result: Optional[Dict[str, Any]] = None error: Optional[Dict[str, Any]] = None @dataclass class MCPError: """MCP error structure.""" code: int message: str data: Optional[Dict[str, Any]] = None class MCPErrorCodes: """Standard JSON-RPC 2.0 error codes.""" PARSE_ERROR = -32700 INVALID_REQUEST = -32600 METHOD_NOT_FOUND = -32601 INVALID_PARAMS = -32602 INTERNAL_ERROR = -32603 # Custom error codes TOOL_ERROR = -32000 ANALYZER_ERROR = -32001 VALIDATION_ERROR = -32002 class SentimentMCPServer: """ Model Context Protocol server for sentiment analysis. Implements JSON-RPC 2.0 protocol with async request handling, tool registration, and comprehensive error management. """ def __init__(self, name: str = "sentiment-analyzer", version: str = "1.0.0"): """ Initialize MCP server. Args: name: Server name version: Server version """ self.name = name self.version = version self.logger = logging.getLogger(__name__) # Server state self.running = False self.request_count = 0 self.error_count = 0 self.start_time = None # Request handlers self._handlers = { "initialize": self._handle_initialize, "tools/list": self._handle_list_tools, "tools/call": self._handle_call_tool, "ping": self._handle_ping, "server/info": self._handle_server_info, "server/stats": self._handle_server_stats } self.logger.info(f"Initialized MCP server '{name}' v{version}") async def start(self) -> None: """Start the MCP server.""" self.running = True self.start_time = datetime.now() self.logger.info(f"MCP server '{self.name}' started") # Pre-load analyzer to improve first request performance try: await get_analyzer("auto") self.logger.info("Sentiment analyzer pre-loaded successfully") except Exception as e: self.logger.warning(f"Failed to pre-load analyzer: {e}") async def stop(self) -> None: """Stop the MCP server.""" self.running = False self.logger.info(f"MCP server '{self.name}' stopped") # Cleanup resources try: analyzer = await get_analyzer("auto") await analyzer.cleanup() except Exception as e: self.logger.error(f"Error during cleanup: {e}") def _create_error_response(self, request_id: Optional[Union[str, int]], code: int, message: str, data: Optional[Dict[str, Any]] = None) -> MCPResponse: """ Create error response. Args: request_id: Request ID code: Error code message: Error message data: Additional error data Returns: MCPResponse with error """ error = { "code": code, "message": message } if data: error["data"] = data return MCPResponse( jsonrpc="2.0", id=request_id, error=error ) def _create_success_response(self, request_id: Optional[Union[str, int]], result: Dict[str, Any]) -> MCPResponse: """ Create success response. Args: request_id: Request ID result: Response result Returns: MCPResponse with result """ return MCPResponse( jsonrpc="2.0", id=request_id, result=result ) def _parse_request(self, message: str) -> MCPRequest: """ Parse JSON-RPC request message. Args: message: JSON message string Returns: Parsed MCPRequest Raises: ValueError: If parsing fails """ try: data = json.loads(message) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON: {e}") # Validate required fields if not isinstance(data, dict): raise ValueError("Request must be a JSON object") if data.get("jsonrpc") != "2.0": raise ValueError("Invalid JSON-RPC version") if "method" not in data: raise ValueError("Missing 'method' field") return MCPRequest( jsonrpc=data["jsonrpc"], method=data["method"], params=data.get("params"), id=data.get("id") ) async def process_request(self, message: str) -> str: """ Process incoming JSON-RPC request. Args: message: JSON-RPC request message Returns: JSON-RPC response message """ request_id = None try: # Parse request try: request = self._parse_request(message) request_id = request.id except ValueError as e: response = self._create_error_response( None, MCPErrorCodes.PARSE_ERROR, str(e) ) return json.dumps(response.__dict__) # Update stats self.request_count += 1 # Log request self.logger.debug(f"Processing request: {request.method} (ID: {request_id})") # Handle request if request.method in self._handlers: handler = self._handlers[request.method] result = await handler(request.params or {}) response = self._create_success_response(request_id, result) else: response = self._create_error_response( request_id, MCPErrorCodes.METHOD_NOT_FOUND, f"Method '{request.method}' not found" ) except Exception as e: self.error_count += 1 self.logger.error(f"Request processing failed: {e}") self.logger.debug(traceback.format_exc()) response = self._create_error_response( request_id, MCPErrorCodes.INTERNAL_ERROR, "Internal server error", {"error": str(e), "type": type(e).__name__} ) # Convert response to JSON response_dict = response.__dict__ # Remove None values response_dict = {k: v for k, v in response_dict.items() if v is not None} return json.dumps(response_dict) async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle initialize request. Args: params: Initialize parameters Returns: Server capabilities """ client_info = params.get("clientInfo", {}) self.logger.info(f"Client connected: {client_info}") return { "protocolVersion": "2024-11-05", "capabilities": { "tools": { "listChanged": False }, "logging": {}, "prompts": { "listChanged": False }, "resources": { "subscribe": False, "listChanged": False } }, "serverInfo": { "name": self.name, "version": self.version, "description": "Sentiment analysis server using TextBlob and Transformers" } } async def _handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle tools/list request. Args: params: List tools parameters Returns: Available tools """ try: tools = await list_tools() return {"tools": tools} except Exception as e: raise RuntimeError(f"Failed to list tools: {e}") async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle tools/call request. Args: params: Tool call parameters Returns: Tool execution result """ try: name = params.get("name") arguments = params.get("arguments", {}) if not name: raise ValueError("Tool name is required") result = await call_tool(name, arguments) return { "content": [ { "type": "text", "text": json.dumps(result, indent=2) } ], "isError": not result.get("success", True) } except Exception as e: self.logger.error(f"Tool call failed: {e}") return { "content": [ { "type": "text", "text": json.dumps({ "success": False, "error": str(e), "error_type": type(e).__name__ }, indent=2) } ], "isError": True } async def _handle_ping(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle ping request. Args: params: Ping parameters Returns: Pong response """ return { "pong": True, "timestamp": datetime.now().isoformat(), "server": self.name, "version": self.version } async def _handle_server_info(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle server/info request. Args: params: Server info parameters Returns: Server information """ try: analyzer = await get_analyzer("auto") analyzer_info = analyzer.get_info() except Exception as e: analyzer_info = {"error": str(e)} return { "server": { "name": self.name, "version": self.version, "running": self.running, "start_time": self.start_time.isoformat() if self.start_time else None }, "analyzer": analyzer_info, "capabilities": { "sentiment_analysis": True, "batch_processing": True, "multiple_backends": True, "async_processing": True } } async def _handle_server_stats(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Handle server/stats request. Args: params: Server stats parameters Returns: Server statistics """ uptime = None if self.start_time: uptime = (datetime.now() - self.start_time).total_seconds() return { "requests_processed": self.request_count, "errors_encountered": self.error_count, "success_rate": ( (self.request_count - self.error_count) / self.request_count if self.request_count > 0 else 0 ), "uptime_seconds": uptime, "running": self.running } class MCPServerRunner: """ Runner for MCP server with stdio communication. Handles stdin/stdout communication for MCP protocol. """ def __init__(self, server: SentimentMCPServer): """ Initialize server runner. Args: server: MCP server instance """ self.server = server self.logger = logging.getLogger(__name__) async def run(self) -> None: """Run the MCP server with stdio communication.""" self.logger.info("Starting MCP server with stdio communication") # Start server await self.server.start() try: # Set up event loop optimization if UVLOOP_AVAILABLE: self.logger.info("Using uvloop for better performance") # Handle stdin/stdout communication reader = asyncio.StreamReader() protocol = asyncio.StreamReaderProtocol(reader) await asyncio.get_event_loop().connect_read_pipe( lambda: protocol, sys.stdin ) writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe( asyncio.streams.FlowControlMixin, sys.stdout ) writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, asyncio.get_event_loop()) self.logger.info("MCP server ready for requests") # Process requests while self.server.running: try: # Read request line = await reader.readline() if not line: break message = line.decode().strip() if not message: continue # Process request response = await self.server.process_request(message) # Send response writer.write((response + '\n').encode()) await writer.drain() except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Communication error: {e}") break finally: await self.server.stop() self.logger.info("MCP server stopped") async def create_server(name: str = "sentiment-analyzer", version: str = "1.0.0") -> SentimentMCPServer: """ Create and configure MCP server. Args: name: Server name version: Server version Returns: Configured MCP server """ # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stderr) # Use stderr to avoid interfering with stdio protocol ] ) # Create server server = SentimentMCPServer(name, version) return server async def main() -> None: """Main entry point for MCP server.""" # Use uvloop if available for better performance if UVLOOP_AVAILABLE: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # Create and run server server = await create_server() runner = MCPServerRunner(server) try: await runner.run() except KeyboardInterrupt: logging.info("Server interrupted by user") except Exception as e: logging.error(f"Server error: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())