sentiment / app.py
Denys Kanunnikov
fix merge sytax error
b2c2818
#!/usr/bin/env python3
"""
Main entry point for MCP Sentiment Analysis Server.
This script provides multiple modes of operation:
1. MCP Server mode - JSON-RPC 2.0 server for AI model integration
2. Gradio Interface mode - Web UI for human interaction
3. Combined mode - Both MCP server and Gradio interface
4. Test mode - Run basic functionality tests
Usage:
python app.py --mode mcp # Run MCP server only
python app.py --mode gradio # Run Gradio interface only
python app.py --mode combined # Run both (default)
python app.py --mode test # Run tests
"""
import asyncio
import argparse
import logging
import sys
import signal
import threading
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
try:
import uvloop
UVLOOP_AVAILABLE = True
except ImportError:
UVLOOP_AVAILABLE = False
from src import (
create_server,
MCPServerRunner,
create_gradio_interface,
get_analyzer
)
class ApplicationRunner:
"""
Main application runner that manages different execution modes.
Supports running MCP server, Gradio interface, or both simultaneously
with proper resource management and graceful shutdown.
"""
def __init__(self):
"""Initialize application runner."""
self.logger = logging.getLogger(__name__)
self.mcp_server = None
self.gradio_interface = None
self.running = False
self.executor = ThreadPoolExecutor(max_workers=2)
# Setup signal handlers
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
"""Handle shutdown signals."""
self.logger.info(f"Received signal {signum}, shutting down...")
self.running = False
async def run_mcp_server(self) -> None:
"""Run MCP server only."""
self.logger.info("Starting MCP server mode")
try:
# Create and run MCP server
self.mcp_server = await create_server()
runner = MCPServerRunner(self.mcp_server)
self.running = True
await runner.run()
except Exception as e:
self.logger.error(f"MCP server failed: {e}")
raise
finally:
if self.mcp_server:
await self.mcp_server.stop()
def run_gradio_interface(self, **kwargs) -> None:
"""Run Gradio interface only."""
self.logger.info("Starting Gradio interface mode")
try:
# Create and launch Gradio interface
self.gradio_interface = create_gradio_interface()
# Default launch parameters
launch_params = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"debug": False,
"show_error": True,
"quiet": False
}
launch_params.update(kwargs)
self.running = True
self.gradio_interface.launch(**launch_params)
except Exception as e:
self.logger.error(f"Gradio interface failed: {e}")
raise
async def run_combined(self, **gradio_kwargs) -> None:
"""Run both MCP server and Gradio interface."""
self.logger.info("Starting combined mode (MCP server + Gradio interface)")
try:
# Create MCP server
self.mcp_server = await create_server()
# Create Gradio interface
self.gradio_interface = create_gradio_interface()
# Default Gradio launch parameters
launch_params = {
"server_name": "0.0.0.0",
"server_port": 7860,
"share": False,
"debug": False,
"show_error": True,
"quiet": False
}
launch_params.update(gradio_kwargs)
self.running = True
# Run Gradio in thread pool
gradio_future = self.executor.submit(
self.gradio_interface.launch, **launch_params
)
# Run MCP server in main thread
runner = MCPServerRunner(self.mcp_server)
# Start both services
self.logger.info("Both services starting...")
# Wait for either to complete or fail
try:
await runner.run()
except Exception as e:
self.logger.error(f"MCP server error: {e}")
raise
finally:
# Cleanup
if gradio_future:
gradio_future.cancel()
except Exception as e:
self.logger.error(f"Combined mode failed: {e}")
raise
finally:
if self.mcp_server:
await self.mcp_server.stop()
async def run_tests(self) -> bool:
"""Run basic functionality tests."""
self.logger.info("Running functionality tests...")
try:
# Test 1: Sentiment analyzer initialization
self.logger.info("Test 1: Initializing sentiment analyzer...")
analyzer = await get_analyzer("textblob")
self.logger.info(f"✓ Analyzer initialized with backend: {analyzer.backend}")
# Test 2: Basic sentiment analysis
self.logger.info("Test 2: Basic sentiment analysis...")
test_texts = [
"I love this product!",
"This is terrible.",
"It's okay, nothing special."
]
for text in test_texts:
result = await analyzer.analyze(text)
self.logger.info(f"✓ '{text}' -> {result.label.value} ({result.confidence:.2f})")
# Test 3: Batch analysis
self.logger.info("Test 3: Batch analysis...")
batch_results = await analyzer.analyze_batch(test_texts)
self.logger.info(f"✓ Batch analysis completed: {len(batch_results)} results")
# Test 4: MCP tools
self.logger.info("Test 4: MCP tools...")
from src.tools import get_tools
tools = get_tools()
available_tools = tools.get_tools()
self.logger.info(f"✓ {len(available_tools)} MCP tools available")
# Test 5: Tool execution
self.logger.info("Test 5: Tool execution...")
result = await tools.call_tool("analyze_sentiment", {
"text": "This is a test message",
"backend": "textblob"
})
self.logger.info(f"✓ Tool execution successful: {result.get('success', False)}")
# Test 6: Health check
self.logger.info("Test 6: Health check...")
health_result = await tools.call_tool("health_check", {})
self.logger.info(f"✓ Health check: {health_result.get('status', 'unknown')}")
# Cleanup
await analyzer.cleanup()
self.logger.info("🎉 All tests passed!")
return True
except Exception as e:
self.logger.error(f"❌ Test failed: {e}")
return False
def setup_logging(level: str = "INFO") -> None:
"""
Setup logging configuration.
Args:
level: Logging level
"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stderr)
]
)
def parse_arguments() -> argparse.Namespace:
"""
Parse command line arguments.
Returns:
Parsed arguments
"""
parser = argparse.ArgumentParser(
description="MCP Sentiment Analysis Server",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python app.py --mode mcp # MCP server only
python app.py --mode gradio # Gradio interface only
python app.py --mode combined # Both services
python app.py --mode test # Run tests
python app.py --mode gradio --port 8080 # Custom port
python app.py --mode gradio --share # Public sharing
python app.py --mode gradio --mcp-server # Gradio with MCP server
Environment Variables:
GRADIO_MCP_SERVER=true # Enable MCP server in Gradio
"""
)
parser.add_argument(
"--mode",
choices=["mcp", "gradio", "combined", "test"],
default="combined",
help="Execution mode (default: combined)"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="Logging level (default: INFO)"
)
# Gradio-specific options
parser.add_argument(
"--port",
type=int,
default=7860,
help="Gradio server port (default: 7860)"
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="Gradio server host (default: 0.0.0.0)"
)
parser.add_argument(
"--share",
action="store_true",
help="Enable Gradio public sharing"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode"
)
parser.add_argument(
"--mcp-server",
action="store_true",
help="Enable MCP server functionality in Gradio interface"
)
return parser.parse_args()
async def main() -> None:
"""Main application entry point."""
# Parse arguments
args = parse_arguments()
# Setup logging
setup_logging(args.log_level)
logger = logging.getLogger(__name__)
# Use uvloop if available for better performance
if UVLOOP_AVAILABLE and args.mode in ["mcp", "combined"]:
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger.info("Using uvloop for better performance")
# Create application runner
runner = ApplicationRunner()
try:
if args.mode == "mcp":
await runner.run_mcp_server()
elif args.mode == "gradio":
# Gradio runs in sync mode
gradio_kwargs = {
"server_name": args.host,
"server_port": args.port,
"share": args.share,
"debug": args.debug,
"mcp_server": args.mcp_server
}
runner.run_gradio_interface(**gradio_kwargs)
elif args.mode == "combined":
gradio_kwargs = {
"server_name": args.host,
"server_port": args.port,
"share": args.share,
"debug": args.debug,
"mcp_server": args.mcp_server
}
await runner.run_combined(**gradio_kwargs)
elif args.mode == "test":
success = await runner.run_tests()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
logger.info("Application interrupted by user")
except Exception as e:
logger.error(f"Application error: {e}")
sys.exit(1)
finally:
logger.info("Application shutdown complete")
if __name__ == "__main__":
asyncio.run(main())