Spaces:
Running
Running
| """ | |
| MCP Handler for processing requests with API key from headers | |
| """ | |
| import json | |
| from typing import Dict, Any, Optional | |
| from utils import A1DAPIClient, prepare_request_data, format_response_with_preview | |
| from config import TOOLS_CONFIG | |
| def get_api_key_from_headers(headers: Dict[str, str]) -> Optional[str]: | |
| """Extract API key from request headers""" | |
| # Try different header formats | |
| api_key = (headers.get('API_KEY') or | |
| headers.get('api_key') or | |
| headers.get('Api-Key') or | |
| headers.get('X-API-Key') or | |
| headers.get('x-api-key')) | |
| if api_key: | |
| print(f"π‘ Found API key in headers: {api_key[:8]}...") | |
| return api_key | |
| print("β οΈ No API key found in headers") | |
| return None | |
| def process_mcp_request(tool_name: str, params: Dict[str, Any], headers: Dict[str, str]) -> Dict[str, Any]: | |
| """Process MCP request with API key from headers""" | |
| try: | |
| # Get API key from headers | |
| api_key = get_api_key_from_headers(headers) | |
| if not api_key: | |
| return { | |
| "error": "API key required. Please provide API_KEY in request headers.", | |
| "code": "MISSING_API_KEY" | |
| } | |
| # Validate tool | |
| if tool_name not in TOOLS_CONFIG: | |
| return { | |
| "error": f"Unknown tool: {tool_name}", | |
| "code": "INVALID_TOOL" | |
| } | |
| print(f"π§ Processing MCP request for tool: {tool_name}") | |
| print(f"π Parameters: {params}") | |
| # Create API client with header API key | |
| client = A1DAPIClient(api_key=api_key) | |
| # Prepare request data | |
| data = prepare_request_data(tool_name, **params) | |
| # Make request with result | |
| response = client.make_request_with_result( | |
| TOOLS_CONFIG[tool_name]["api_endpoint"], | |
| data, | |
| timeout=120 if "video" not in tool_name else 300 | |
| ) | |
| # Format response | |
| message, media_url = format_response_with_preview(response, tool_name) | |
| return { | |
| "success": True, | |
| "message": message, | |
| "media_url": media_url, | |
| "raw_response": response | |
| } | |
| except Exception as e: | |
| print(f"β MCP request error: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "code": "PROCESSING_ERROR" | |
| } | |
| def create_mcp_tool_functions(): | |
| """Create MCP tool functions that can handle header-based API keys""" | |
| def mcp_remove_bg(image_url: str, headers: Dict[str, str] = None): | |
| """Remove background from images using AI (MCP version)""" | |
| return process_mcp_request("remove_bg", {"image_url": image_url}, headers or {}) | |
| def mcp_image_upscaler(image_url: str, scale: int = 2, headers: Dict[str, str] = None): | |
| """Upscale images using AI (MCP version)""" | |
| return process_mcp_request("image_upscaler", {"image_url": image_url, "scale": scale}, headers or {}) | |
| def mcp_video_upscaler(video_url: str, headers: Dict[str, str] = None): | |
| """Upscale videos using AI (MCP version)""" | |
| return process_mcp_request("video_upscaler", {"video_url": video_url}, headers or {}) | |
| def mcp_image_vectorization(image_url: str, headers: Dict[str, str] = None): | |
| """Convert images to vector format using AI (MCP version)""" | |
| return process_mcp_request("image_vectorization", {"image_url": image_url}, headers or {}) | |
| def mcp_image_extends(image_url: str, headers: Dict[str, str] = None): | |
| """Extend images using AI (MCP version)""" | |
| return process_mcp_request("image_extends", {"image_url": image_url}, headers or {}) | |
| def mcp_image_generator(prompt: str, headers: Dict[str, str] = None): | |
| """Generate images using AI from text prompts (MCP version)""" | |
| return process_mcp_request("image_generator", {"prompt": prompt}, headers or {}) | |
| return { | |
| "remove_bg": mcp_remove_bg, | |
| "image_upscaler": mcp_image_upscaler, | |
| "video_upscaler": mcp_video_upscaler, | |
| "image_vectorization": mcp_image_vectorization, | |
| "image_extends": mcp_image_extends, | |
| "image_generator": mcp_image_generator | |
| } | |