|
|
|
""" |
|
Main entry point for MCP Sentiment Analysis Server. |
|
|
|
This script provides multiple modes of operation: |
|
1. MCP Server mode - JSON-RPC 2.0 server for AI model integration |
|
2. Gradio Interface mode - Web UI for human interaction |
|
3. Combined mode - Both MCP server and Gradio interface |
|
4. Test mode - Run basic functionality tests |
|
|
|
Usage: |
|
python app.py --mode mcp # Run MCP server only |
|
python app.py --mode gradio # Run Gradio interface only |
|
python app.py --mode combined # Run both (default) |
|
python app.py --mode test # Run tests |
|
""" |
|
|
|
import asyncio |
|
import argparse |
|
import logging |
|
import sys |
|
import signal |
|
import threading |
|
from typing import Optional |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
try: |
|
import uvloop |
|
UVLOOP_AVAILABLE = True |
|
except ImportError: |
|
UVLOOP_AVAILABLE = False |
|
|
|
from src import ( |
|
create_server, |
|
MCPServerRunner, |
|
create_gradio_interface, |
|
get_analyzer |
|
) |
|
|
|
|
|
class ApplicationRunner: |
|
""" |
|
Main application runner that manages different execution modes. |
|
|
|
Supports running MCP server, Gradio interface, or both simultaneously |
|
with proper resource management and graceful shutdown. |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize application runner.""" |
|
self.logger = logging.getLogger(__name__) |
|
self.mcp_server = None |
|
self.gradio_interface = None |
|
self.running = False |
|
self.executor = ThreadPoolExecutor(max_workers=2) |
|
|
|
|
|
signal.signal(signal.SIGINT, self._signal_handler) |
|
signal.signal(signal.SIGTERM, self._signal_handler) |
|
|
|
def _signal_handler(self, signum, frame): |
|
"""Handle shutdown signals.""" |
|
self.logger.info(f"Received signal {signum}, shutting down...") |
|
self.running = False |
|
|
|
async def run_mcp_server(self) -> None: |
|
"""Run MCP server only.""" |
|
self.logger.info("Starting MCP server mode") |
|
|
|
try: |
|
|
|
self.mcp_server = await create_server() |
|
runner = MCPServerRunner(self.mcp_server) |
|
|
|
self.running = True |
|
await runner.run() |
|
|
|
except Exception as e: |
|
self.logger.error(f"MCP server failed: {e}") |
|
raise |
|
finally: |
|
if self.mcp_server: |
|
await self.mcp_server.stop() |
|
|
|
def run_gradio_interface(self, **kwargs) -> None: |
|
"""Run Gradio interface only.""" |
|
self.logger.info("Starting Gradio interface mode") |
|
|
|
try: |
|
|
|
self.gradio_interface = create_gradio_interface() |
|
|
|
|
|
launch_params = { |
|
"server_name": "0.0.0.0", |
|
"server_port": 7860, |
|
"share": False, |
|
"debug": False, |
|
"show_error": True, |
|
"quiet": False |
|
} |
|
launch_params.update(kwargs) |
|
|
|
self.running = True |
|
self.gradio_interface.launch(**launch_params) |
|
|
|
except Exception as e: |
|
self.logger.error(f"Gradio interface failed: {e}") |
|
raise |
|
|
|
async def run_combined(self, **gradio_kwargs) -> None: |
|
"""Run both MCP server and Gradio interface.""" |
|
self.logger.info("Starting combined mode (MCP server + Gradio interface)") |
|
|
|
try: |
|
|
|
self.mcp_server = await create_server() |
|
|
|
|
|
self.gradio_interface = create_gradio_interface() |
|
|
|
|
|
launch_params = { |
|
"server_name": "0.0.0.0", |
|
"server_port": 7860, |
|
"share": False, |
|
"debug": False, |
|
"show_error": True, |
|
"quiet": False |
|
} |
|
launch_params.update(gradio_kwargs) |
|
|
|
self.running = True |
|
|
|
|
|
gradio_future = self.executor.submit( |
|
self.gradio_interface.launch, **launch_params |
|
) |
|
|
|
|
|
runner = MCPServerRunner(self.mcp_server) |
|
|
|
|
|
self.logger.info("Both services starting...") |
|
|
|
|
|
try: |
|
await runner.run() |
|
except Exception as e: |
|
self.logger.error(f"MCP server error: {e}") |
|
raise |
|
finally: |
|
|
|
if gradio_future: |
|
gradio_future.cancel() |
|
|
|
except Exception as e: |
|
self.logger.error(f"Combined mode failed: {e}") |
|
raise |
|
finally: |
|
if self.mcp_server: |
|
await self.mcp_server.stop() |
|
|
|
async def run_tests(self) -> bool: |
|
"""Run basic functionality tests.""" |
|
self.logger.info("Running functionality tests...") |
|
|
|
try: |
|
|
|
self.logger.info("Test 1: Initializing sentiment analyzer...") |
|
analyzer = await get_analyzer("textblob") |
|
self.logger.info(f"✓ Analyzer initialized with backend: {analyzer.backend}") |
|
|
|
|
|
self.logger.info("Test 2: Basic sentiment analysis...") |
|
test_texts = [ |
|
"I love this product!", |
|
"This is terrible.", |
|
"It's okay, nothing special." |
|
] |
|
|
|
for text in test_texts: |
|
result = await analyzer.analyze(text) |
|
self.logger.info(f"✓ '{text}' -> {result.label.value} ({result.confidence:.2f})") |
|
|
|
|
|
self.logger.info("Test 3: Batch analysis...") |
|
batch_results = await analyzer.analyze_batch(test_texts) |
|
self.logger.info(f"✓ Batch analysis completed: {len(batch_results)} results") |
|
|
|
|
|
self.logger.info("Test 4: MCP tools...") |
|
from src.tools import get_tools |
|
tools = get_tools() |
|
available_tools = tools.get_tools() |
|
self.logger.info(f"✓ {len(available_tools)} MCP tools available") |
|
|
|
|
|
self.logger.info("Test 5: Tool execution...") |
|
result = await tools.call_tool("analyze_sentiment", { |
|
"text": "This is a test message", |
|
"backend": "textblob" |
|
}) |
|
self.logger.info(f"✓ Tool execution successful: {result.get('success', False)}") |
|
|
|
|
|
self.logger.info("Test 6: Health check...") |
|
health_result = await tools.call_tool("health_check", {}) |
|
self.logger.info(f"✓ Health check: {health_result.get('status', 'unknown')}") |
|
|
|
|
|
await analyzer.cleanup() |
|
|
|
self.logger.info("🎉 All tests passed!") |
|
return True |
|
|
|
except Exception as e: |
|
self.logger.error(f"❌ Test failed: {e}") |
|
return False |
|
|
|
|
|
def setup_logging(level: str = "INFO") -> None: |
|
""" |
|
Setup logging configuration. |
|
|
|
Args: |
|
level: Logging level |
|
""" |
|
logging.basicConfig( |
|
level=getattr(logging, level.upper()), |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(sys.stderr) |
|
] |
|
) |
|
|
|
|
|
def parse_arguments() -> argparse.Namespace: |
|
""" |
|
Parse command line arguments. |
|
|
|
Returns: |
|
Parsed arguments |
|
""" |
|
parser = argparse.ArgumentParser( |
|
description="MCP Sentiment Analysis Server", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=""" |
|
Examples: |
|
python app.py --mode mcp # MCP server only |
|
python app.py --mode gradio # Gradio interface only |
|
python app.py --mode combined # Both services |
|
python app.py --mode test # Run tests |
|
python app.py --mode gradio --port 8080 # Custom port |
|
python app.py --mode gradio --share # Public sharing |
|
python app.py --mode gradio --mcp-server # Gradio with MCP server |
|
|
|
Environment Variables: |
|
GRADIO_MCP_SERVER=true # Enable MCP server in Gradio |
|
""" |
|
) |
|
|
|
parser.add_argument( |
|
"--mode", |
|
choices=["mcp", "gradio", "combined", "test"], |
|
default="combined", |
|
help="Execution mode (default: combined)" |
|
) |
|
|
|
parser.add_argument( |
|
"--log-level", |
|
choices=["DEBUG", "INFO", "WARNING", "ERROR"], |
|
default="INFO", |
|
help="Logging level (default: INFO)" |
|
) |
|
|
|
|
|
parser.add_argument( |
|
"--port", |
|
type=int, |
|
default=7860, |
|
help="Gradio server port (default: 7860)" |
|
) |
|
|
|
parser.add_argument( |
|
"--host", |
|
default="0.0.0.0", |
|
help="Gradio server host (default: 0.0.0.0)" |
|
) |
|
|
|
parser.add_argument( |
|
"--share", |
|
action="store_true", |
|
help="Enable Gradio public sharing" |
|
) |
|
|
|
parser.add_argument( |
|
"--debug", |
|
action="store_true", |
|
help="Enable debug mode" |
|
) |
|
|
|
parser.add_argument( |
|
"--mcp-server", |
|
action="store_true", |
|
help="Enable MCP server functionality in Gradio interface" |
|
) |
|
|
|
return parser.parse_args() |
|
|
|
|
|
async def main() -> None: |
|
"""Main application entry point.""" |
|
|
|
args = parse_arguments() |
|
|
|
|
|
setup_logging(args.log_level) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
if UVLOOP_AVAILABLE and args.mode in ["mcp", "combined"]: |
|
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) |
|
logger.info("Using uvloop for better performance") |
|
|
|
|
|
runner = ApplicationRunner() |
|
|
|
try: |
|
if args.mode == "mcp": |
|
await runner.run_mcp_server() |
|
|
|
elif args.mode == "gradio": |
|
|
|
gradio_kwargs = { |
|
"server_name": args.host, |
|
"server_port": args.port, |
|
"share": args.share, |
|
"debug": args.debug, |
|
"mcp_server": args.mcp_server |
|
} |
|
runner.run_gradio_interface(**gradio_kwargs) |
|
|
|
elif args.mode == "combined": |
|
gradio_kwargs = { |
|
"server_name": args.host, |
|
"server_port": args.port, |
|
"share": args.share, |
|
"debug": args.debug, |
|
"mcp_server": args.mcp_server |
|
} |
|
await runner.run_combined(**gradio_kwargs) |
|
|
|
elif args.mode == "test": |
|
success = await runner.run_tests() |
|
sys.exit(0 if success else 1) |
|
|
|
except KeyboardInterrupt: |
|
logger.info("Application interrupted by user") |
|
except Exception as e: |
|
logger.error(f"Application error: {e}") |
|
sys.exit(1) |
|
finally: |
|
logger.info("Application shutdown complete") |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |