File size: 11,742 Bytes
776e7c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
#!/usr/bin/env python3
"""
Main entry point for MCP Sentiment Analysis Server.
This script provides multiple modes of operation:
1. MCP Server mode - JSON-RPC 2.0 server for AI model integration
2. Gradio Interface mode - Web UI for human interaction
3. Combined mode - Both MCP server and Gradio interface
4. Test mode - Run basic functionality tests
Usage:
python app.py --mode mcp # Run MCP server only
python app.py --mode gradio # Run Gradio interface only
python app.py --mode combined # Run both (default)
python app.py --mode test # Run tests
"""
import asyncio
import argparse
import logging
import sys
import signal
import threading
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
try:
import uvloop
UVLOOP_AVAILABLE = True
except ImportError:
UVLOOP_AVAILABLE = False
from src import (
create_server,
MCPServerRunner,
create_gradio_interface,
get_analyzer
)
class ApplicationRunner:
"""
Main application runner that manages different execution modes.
Supports running MCP server, Gradio interface, or both simultaneously
with proper resource management and graceful shutdown.
"""
def __init__(self):
"""Initialize application runner."""
self.logger = logging.getLogger(__name__)
self.mcp_server = None
self.gradio_interface = None
self.running = False
self.executor = ThreadPoolExecutor(max_workers=2)
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
self.logger.info(f"Received signal {signum}, shutting down...")
self.running = False
async def run_mcp_server(self) -> None:
"""Run MCP server only."""
self.logger.info("Starting MCP server mode")
try:
# Create and run MCP server
self.mcp_server = await create_server()
runner = MCPServerRunner(self.mcp_server)
self.running = True
await runner.run()
except Exception as e:
self.logger.error(f"MCP server failed: {e}")
raise
finally:
if self.mcp_server:
await self.mcp_server.stop()
def run_gradio_interface(self, **kwargs) -> None:
"""Run Gradio interface only."""
self.logger.info("Starting Gradio interface mode")
try:
# Create and launch Gradio interface
self.gradio_interface = create_gradio_interface()
# Default launch parameters
launch_params = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"debug": False,
"show_error": True,
"quiet": False
}
launch_params.update(kwargs)
self.running = True
self.gradio_interface.launch(**launch_params)
except Exception as e:
self.logger.error(f"Gradio interface failed: {e}")
raise
async def run_combined(self, **gradio_kwargs) -> None:
"""Run both MCP server and Gradio interface."""
self.logger.info("Starting combined mode (MCP server + Gradio interface)")
try:
# Create MCP server
self.mcp_server = await create_server()
# Create Gradio interface
self.gradio_interface = create_gradio_interface()
# Default Gradio launch parameters
launch_params = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"debug": False,
"show_error": True,
"quiet": False
}
launch_params.update(gradio_kwargs)
self.running = True
# Run Gradio in thread pool
gradio_future = self.executor.submit(
self.gradio_interface.launch, **launch_params
)
# Run MCP server in main thread
runner = MCPServerRunner(self.mcp_server)
# Start both services
self.logger.info("Both services starting...")
# Wait for either to complete or fail
try:
await runner.run()
except Exception as e:
self.logger.error(f"MCP server error: {e}")
raise
finally:
# Cleanup
if gradio_future:
gradio_future.cancel()
except Exception as e:
self.logger.error(f"Combined mode failed: {e}")
raise
finally:
if self.mcp_server:
await self.mcp_server.stop()
async def run_tests(self) -> bool:
"""Run basic functionality tests."""
self.logger.info("Running functionality tests...")
try:
# Test 1: Sentiment analyzer initialization
self.logger.info("Test 1: Initializing sentiment analyzer...")
analyzer = await get_analyzer("textblob")
self.logger.info(f"β Analyzer initialized with backend: {analyzer.backend}")
# Test 2: Basic sentiment analysis
self.logger.info("Test 2: Basic sentiment analysis...")
test_texts = [
"I love this product!",
"This is terrible.",
"It's okay, nothing special."
]
for text in test_texts:
result = await analyzer.analyze(text)
self.logger.info(f"β '{text}' -> {result.label.value} ({result.confidence:.2f})")
# Test 3: Batch analysis
self.logger.info("Test 3: Batch analysis...")
batch_results = await analyzer.analyze_batch(test_texts)
self.logger.info(f"β Batch analysis completed: {len(batch_results)} results")
# Test 4: MCP tools
self.logger.info("Test 4: MCP tools...")
from src.tools import get_tools
tools = get_tools()
available_tools = tools.get_tools()
self.logger.info(f"β {len(available_tools)} MCP tools available")
# Test 5: Tool execution
self.logger.info("Test 5: Tool execution...")
result = await tools.call_tool("analyze_sentiment", {
"text": "This is a test message",
"backend": "textblob"
})
self.logger.info(f"β Tool execution successful: {result.get('success', False)}")
# Test 6: Health check
self.logger.info("Test 6: Health check...")
health_result = await tools.call_tool("health_check", {})
self.logger.info(f"β Health check: {health_result.get('status', 'unknown')}")
# Cleanup
await analyzer.cleanup()
self.logger.info("π All tests passed!")
return True
except Exception as e:
self.logger.error(f"β Test failed: {e}")
return False
def setup_logging(level: str = "INFO") -> None:
"""
Setup logging configuration.
Args:
level: Logging level
"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
def parse_arguments() -> argparse.Namespace:
"""
Parse command line arguments.
Returns:
Parsed arguments
"""
parser = argparse.ArgumentParser(
description="MCP Sentiment Analysis Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python app.py --mode mcp # MCP server only
python app.py --mode gradio # Gradio interface only
python app.py --mode combined # Both services
python app.py --mode test # Run tests
python app.py --mode gradio --port 8080 # Custom port
python app.py --mode gradio --share # Public sharing
python app.py --mode gradio --mcp-server # Gradio with MCP server
Environment Variables:
GRADIO_MCP_SERVER=true # Enable MCP server in Gradio
"""
)
parser.add_argument(
"--mode",
choices=["mcp", "gradio", "combined", "test"],
default="combined",
help="Execution mode (default: combined)"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="Logging level (default: INFO)"
)
# Gradio-specific options
parser.add_argument(
"--port",
type=int,
default=7860,
help="Gradio server port (default: 7860)"
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Gradio server host (default: 0.0.0.0)"
)
parser.add_argument(
"--share",
action="store_true",
help="Enable Gradio public sharing"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode"
)
parser.add_argument(
"--mcp-server",
action="store_true",
help="Enable MCP server functionality in Gradio interface"
)
return parser.parse_args()
async def main() -> None:
"""Main application entry point."""
# Parse arguments
args = parse_arguments()
# Setup logging
setup_logging(args.log_level)
logger = logging.getLogger(__name__)
# Use uvloop if available for better performance
if UVLOOP_AVAILABLE and args.mode in ["mcp", "combined"]:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for better performance")
# Create application runner
runner = ApplicationRunner()
try:
if args.mode == "mcp":
await runner.run_mcp_server()
elif args.mode == "gradio":
# Gradio runs in sync mode
gradio_kwargs = {
"server_name": args.host,
"server_port": args.port,
"share": args.share,
"debug": args.debug,
"mcp_server": args.mcp_server
}
runner.run_gradio_interface(**gradio_kwargs)
elif args.mode == "combined":
gradio_kwargs = {
"server_name": args.host,
"server_port": args.port,
"share": args.share,
"debug": args.debug,
"mcp_server": args.mcp_server
}
await runner.run_combined(**gradio_kwargs)
elif args.mode == "test":
success = await runner.run_tests()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("Application interrupted by user")
except Exception as e:
logger.error(f"Application error: {e}")
sys.exit(1)
finally:
logger.info("Application shutdown complete")
if __name__ == "__main__":
asyncio.run(main()) |