Spaces:
Running
Running
""" | |
MCP Handler for processing requests with API key from headers | |
""" | |
import json | |
from typing import Dict, Any, Optional | |
from utils import A1DAPIClient, prepare_request_data, format_response_with_preview | |
from config import TOOLS_CONFIG | |
def get_api_key_from_headers(headers: Dict[str, str]) -> Optional[str]: | |
"""Extract API key from request headers""" | |
# Try different header formats | |
api_key = (headers.get('API_KEY') or | |
headers.get('api_key') or | |
headers.get('Api-Key') or | |
headers.get('X-API-Key') or | |
headers.get('x-api-key')) | |
if api_key: | |
print(f"π‘ Found API key in headers: {api_key[:8]}...") | |
return api_key | |
print("β οΈ No API key found in headers") | |
return None | |
def process_mcp_request(tool_name: str, params: Dict[str, Any], headers: Dict[str, str]) -> Dict[str, Any]: | |
"""Process MCP request with API key from headers""" | |
try: | |
# Get API key from headers | |
api_key = get_api_key_from_headers(headers) | |
if not api_key: | |
return { | |
"error": "API key required. Please provide API_KEY in request headers.", | |
"code": "MISSING_API_KEY" | |
} | |
# Validate tool | |
if tool_name not in TOOLS_CONFIG: | |
return { | |
"error": f"Unknown tool: {tool_name}", | |
"code": "INVALID_TOOL" | |
} | |
print(f"π§ Processing MCP request for tool: {tool_name}") | |
print(f"π Parameters: {params}") | |
# Create API client with header API key | |
client = A1DAPIClient(api_key=api_key) | |
# Prepare request data | |
data = prepare_request_data(tool_name, **params) | |
# Make request with result | |
response = client.make_request_with_result( | |
TOOLS_CONFIG[tool_name]["api_endpoint"], | |
data, | |
timeout=120 if "video" not in tool_name else 300 | |
) | |
# Format response | |
message, media_url = format_response_with_preview(response, tool_name) | |
return { | |
"success": True, | |
"message": message, | |
"media_url": media_url, | |
"raw_response": response | |
} | |
except Exception as e: | |
print(f"β MCP request error: {str(e)}") | |
return { | |
"error": str(e), | |
"code": "PROCESSING_ERROR" | |
} | |
def create_mcp_tool_functions(): | |
"""Create MCP tool functions that can handle header-based API keys""" | |
def mcp_remove_bg(image_url: str, headers: Dict[str, str] = None): | |
"""Remove background from images using AI (MCP version)""" | |
return process_mcp_request("remove_bg", {"image_url": image_url}, headers or {}) | |
def mcp_image_upscaler(image_url: str, scale: int = 2, headers: Dict[str, str] = None): | |
"""Upscale images using AI (MCP version)""" | |
return process_mcp_request("image_upscaler", {"image_url": image_url, "scale": scale}, headers or {}) | |
def mcp_video_upscaler(video_url: str, headers: Dict[str, str] = None): | |
"""Upscale videos using AI (MCP version)""" | |
return process_mcp_request("video_upscaler", {"video_url": video_url}, headers or {}) | |
def mcp_image_vectorization(image_url: str, headers: Dict[str, str] = None): | |
"""Convert images to vector format using AI (MCP version)""" | |
return process_mcp_request("image_vectorization", {"image_url": image_url}, headers or {}) | |
def mcp_image_extends(image_url: str, headers: Dict[str, str] = None): | |
"""Extend images using AI (MCP version)""" | |
return process_mcp_request("image_extends", {"image_url": image_url}, headers or {}) | |
def mcp_image_generator(prompt: str, headers: Dict[str, str] = None): | |
"""Generate images using AI from text prompts (MCP version)""" | |
return process_mcp_request("image_generator", {"prompt": prompt}, headers or {}) | |
return { | |
"remove_bg": mcp_remove_bg, | |
"image_upscaler": mcp_image_upscaler, | |
"video_upscaler": mcp_video_upscaler, | |
"image_vectorization": mcp_image_vectorization, | |
"image_extends": mcp_image_extends, | |
"image_generator": mcp_image_generator | |
} | |