sentiment / src /mcp_server.py
Denys Kanunnikov
update logic
776e7c0
"""
MCP Server implementation for sentiment analysis.
This module implements a Model Context Protocol server that provides
sentiment analysis capabilities through JSON-RPC 2.0 protocol with
async request handling and comprehensive error management.
"""
import asyncio
import json
import logging
import sys
import traceback
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
import uuid
from datetime import datetime
try:
import uvloop
UVLOOP_AVAILABLE = True
except ImportError:
UVLOOP_AVAILABLE = False
from .tools import get_tools, list_tools, call_tool
from .sentiment_analyzer import get_analyzer
class MCPMessageType(Enum):
"""MCP message types."""
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
@dataclass
class MCPRequest:
"""MCP request message structure."""
jsonrpc: str
method: str
params: Optional[Dict[str, Any]] = None
id: Optional[Union[str, int]] = None
@dataclass
class MCPResponse:
"""MCP response message structure."""
jsonrpc: str
id: Optional[Union[str, int]]
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
@dataclass
class MCPError:
"""MCP error structure."""
code: int
message: str
data: Optional[Dict[str, Any]] = None
class MCPErrorCodes:
"""Standard JSON-RPC 2.0 error codes."""
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603
# Custom error codes
TOOL_ERROR = -32000
ANALYZER_ERROR = -32001
VALIDATION_ERROR = -32002
class SentimentMCPServer:
"""
Model Context Protocol server for sentiment analysis.
Implements JSON-RPC 2.0 protocol with async request handling,
tool registration, and comprehensive error management.
"""
def __init__(self, name: str = "sentiment-analyzer", version: str = "1.0.0"):
"""
Initialize MCP server.
Args:
name: Server name
version: Server version
"""
self.name = name
self.version = version
self.logger = logging.getLogger(__name__)
# Server state
self.running = False
self.request_count = 0
self.error_count = 0
self.start_time = None
# Request handlers
self._handlers = {
"initialize": self._handle_initialize,
"tools/list": self._handle_list_tools,
"tools/call": self._handle_call_tool,
"ping": self._handle_ping,
"server/info": self._handle_server_info,
"server/stats": self._handle_server_stats
}
self.logger.info(f"Initialized MCP server '{name}' v{version}")
async def start(self) -> None:
"""Start the MCP server."""
self.running = True
self.start_time = datetime.now()
self.logger.info(f"MCP server '{self.name}' started")
# Pre-load analyzer to improve first request performance
try:
await get_analyzer("auto")
self.logger.info("Sentiment analyzer pre-loaded successfully")
except Exception as e:
self.logger.warning(f"Failed to pre-load analyzer: {e}")
async def stop(self) -> None:
"""Stop the MCP server."""
self.running = False
self.logger.info(f"MCP server '{self.name}' stopped")
# Cleanup resources
try:
analyzer = await get_analyzer("auto")
await analyzer.cleanup()
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")
def _create_error_response(self, request_id: Optional[Union[str, int]],
code: int, message: str,
data: Optional[Dict[str, Any]] = None) -> MCPResponse:
"""
Create error response.
Args:
request_id: Request ID
code: Error code
message: Error message
data: Additional error data
Returns:
MCPResponse with error
"""
error = {
"code": code,
"message": message
}
if data:
error["data"] = data
return MCPResponse(
jsonrpc="2.0",
id=request_id,
error=error
)
def _create_success_response(self, request_id: Optional[Union[str, int]],
result: Dict[str, Any]) -> MCPResponse:
"""
Create success response.
Args:
request_id: Request ID
result: Response result
Returns:
MCPResponse with result
"""
return MCPResponse(
jsonrpc="2.0",
id=request_id,
result=result
)
def _parse_request(self, message: str) -> MCPRequest:
"""
Parse JSON-RPC request message.
Args:
message: JSON message string
Returns:
Parsed MCPRequest
Raises:
ValueError: If parsing fails
"""
try:
data = json.loads(message)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON: {e}")
# Validate required fields
if not isinstance(data, dict):
raise ValueError("Request must be a JSON object")
if data.get("jsonrpc") != "2.0":
raise ValueError("Invalid JSON-RPC version")
if "method" not in data:
raise ValueError("Missing 'method' field")
return MCPRequest(
jsonrpc=data["jsonrpc"],
method=data["method"],
params=data.get("params"),
id=data.get("id")
)
async def process_request(self, message: str) -> str:
"""
Process incoming JSON-RPC request.
Args:
message: JSON-RPC request message
Returns:
JSON-RPC response message
"""
request_id = None
try:
# Parse request
try:
request = self._parse_request(message)
request_id = request.id
except ValueError as e:
response = self._create_error_response(
None, MCPErrorCodes.PARSE_ERROR, str(e)
)
return json.dumps(response.__dict__)
# Update stats
self.request_count += 1
# Log request
self.logger.debug(f"Processing request: {request.method} (ID: {request_id})")
# Handle request
if request.method in self._handlers:
handler = self._handlers[request.method]
result = await handler(request.params or {})
response = self._create_success_response(request_id, result)
else:
response = self._create_error_response(
request_id,
MCPErrorCodes.METHOD_NOT_FOUND,
f"Method '{request.method}' not found"
)
except Exception as e:
self.error_count += 1
self.logger.error(f"Request processing failed: {e}")
self.logger.debug(traceback.format_exc())
response = self._create_error_response(
request_id,
MCPErrorCodes.INTERNAL_ERROR,
"Internal server error",
{"error": str(e), "type": type(e).__name__}
)
# Convert response to JSON
response_dict = response.__dict__
# Remove None values
response_dict = {k: v for k, v in response_dict.items() if v is not None}
return json.dumps(response_dict)
async def _handle_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle initialize request.
Args:
params: Initialize parameters
Returns:
Server capabilities
"""
client_info = params.get("clientInfo", {})
self.logger.info(f"Client connected: {client_info}")
return {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {
"listChanged": False
},
"logging": {},
"prompts": {
"listChanged": False
},
"resources": {
"subscribe": False,
"listChanged": False
}
},
"serverInfo": {
"name": self.name,
"version": self.version,
"description": "Sentiment analysis server using TextBlob and Transformers"
}
}
async def _handle_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle tools/list request.
Args:
params: List tools parameters
Returns:
Available tools
"""
try:
tools = await list_tools()
return {"tools": tools}
except Exception as e:
raise RuntimeError(f"Failed to list tools: {e}")
async def _handle_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle tools/call request.
Args:
params: Tool call parameters
Returns:
Tool execution result
"""
try:
name = params.get("name")
arguments = params.get("arguments", {})
if not name:
raise ValueError("Tool name is required")
result = await call_tool(name, arguments)
return {
"content": [
{
"type": "text",
"text": json.dumps(result, indent=2)
}
],
"isError": not result.get("success", True)
}
except Exception as e:
self.logger.error(f"Tool call failed: {e}")
return {
"content": [
{
"type": "text",
"text": json.dumps({
"success": False,
"error": str(e),
"error_type": type(e).__name__
}, indent=2)
}
],
"isError": True
}
async def _handle_ping(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle ping request.
Args:
params: Ping parameters
Returns:
Pong response
"""
return {
"pong": True,
"timestamp": datetime.now().isoformat(),
"server": self.name,
"version": self.version
}
async def _handle_server_info(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle server/info request.
Args:
params: Server info parameters
Returns:
Server information
"""
try:
analyzer = await get_analyzer("auto")
analyzer_info = analyzer.get_info()
except Exception as e:
analyzer_info = {"error": str(e)}
return {
"server": {
"name": self.name,
"version": self.version,
"running": self.running,
"start_time": self.start_time.isoformat() if self.start_time else None
},
"analyzer": analyzer_info,
"capabilities": {
"sentiment_analysis": True,
"batch_processing": True,
"multiple_backends": True,
"async_processing": True
}
}
async def _handle_server_stats(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle server/stats request.
Args:
params: Server stats parameters
Returns:
Server statistics
"""
uptime = None
if self.start_time:
uptime = (datetime.now() - self.start_time).total_seconds()
return {
"requests_processed": self.request_count,
"errors_encountered": self.error_count,
"success_rate": (
(self.request_count - self.error_count) / self.request_count
if self.request_count > 0 else 0
),
"uptime_seconds": uptime,
"running": self.running
}
class MCPServerRunner:
"""
Runner for MCP server with stdio communication.
Handles stdin/stdout communication for MCP protocol.
"""
def __init__(self, server: SentimentMCPServer):
"""
Initialize server runner.
Args:
server: MCP server instance
"""
self.server = server
self.logger = logging.getLogger(__name__)
async def run(self) -> None:
"""Run the MCP server with stdio communication."""
self.logger.info("Starting MCP server with stdio communication")
# Start server
await self.server.start()
try:
# Set up event loop optimization
if UVLOOP_AVAILABLE:
self.logger.info("Using uvloop for better performance")
# Handle stdin/stdout communication
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await asyncio.get_event_loop().connect_read_pipe(
lambda: protocol, sys.stdin
)
writer_transport, writer_protocol = await asyncio.get_event_loop().connect_write_pipe(
asyncio.streams.FlowControlMixin, sys.stdout
)
writer = asyncio.StreamWriter(writer_transport, writer_protocol, reader, asyncio.get_event_loop())
self.logger.info("MCP server ready for requests")
# Process requests
while self.server.running:
try:
# Read request
line = await reader.readline()
if not line:
break
message = line.decode().strip()
if not message:
continue
# Process request
response = await self.server.process_request(message)
# Send response
writer.write((response + '\n').encode())
await writer.drain()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Communication error: {e}")
break
finally:
await self.server.stop()
self.logger.info("MCP server stopped")
async def create_server(name: str = "sentiment-analyzer",
version: str = "1.0.0") -> SentimentMCPServer:
"""
Create and configure MCP server.
Args:
name: Server name
version: Server version
Returns:
Configured MCP server
"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr) # Use stderr to avoid interfering with stdio protocol
]
)
# Create server
server = SentimentMCPServer(name, version)
return server
async def main() -> None:
"""Main entry point for MCP server."""
# Use uvloop if available for better performance
if UVLOOP_AVAILABLE:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Create and run server
server = await create_server()
runner = MCPServerRunner(server)
try:
await runner.run()
except KeyboardInterrupt:
logging.info("Server interrupted by user")
except Exception as e:
logging.error(f"Server error: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())