Spaces:
Running
Running
File size: 4,305 Bytes
aaa3e82 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
"""
MCP Handler for processing requests with API key from headers
"""
import json
from typing import Dict, Any, Optional
from utils import A1DAPIClient, prepare_request_data, format_response_with_preview
from config import TOOLS_CONFIG
def get_api_key_from_headers(headers: Dict[str, str]) -> Optional[str]:
"""Extract API key from request headers"""
# Try different header formats
api_key = (headers.get('API_KEY') or
headers.get('api_key') or
headers.get('Api-Key') or
headers.get('X-API-Key') or
headers.get('x-api-key'))
if api_key:
print(f"π‘ Found API key in headers: {api_key[:8]}...")
return api_key
print("β οΈ No API key found in headers")
return None
def process_mcp_request(tool_name: str, params: Dict[str, Any], headers: Dict[str, str]) -> Dict[str, Any]:
"""Process MCP request with API key from headers"""
try:
# Get API key from headers
api_key = get_api_key_from_headers(headers)
if not api_key:
return {
"error": "API key required. Please provide API_KEY in request headers.",
"code": "MISSING_API_KEY"
}
# Validate tool
if tool_name not in TOOLS_CONFIG:
return {
"error": f"Unknown tool: {tool_name}",
"code": "INVALID_TOOL"
}
print(f"π§ Processing MCP request for tool: {tool_name}")
print(f"π Parameters: {params}")
# Create API client with header API key
client = A1DAPIClient(api_key=api_key)
# Prepare request data
data = prepare_request_data(tool_name, **params)
# Make request with result
response = client.make_request_with_result(
TOOLS_CONFIG[tool_name]["api_endpoint"],
data,
timeout=120 if "video" not in tool_name else 300
)
# Format response
message, media_url = format_response_with_preview(response, tool_name)
return {
"success": True,
"message": message,
"media_url": media_url,
"raw_response": response
}
except Exception as e:
print(f"β MCP request error: {str(e)}")
return {
"error": str(e),
"code": "PROCESSING_ERROR"
}
def create_mcp_tool_functions():
"""Create MCP tool functions that can handle header-based API keys"""
def mcp_remove_bg(image_url: str, headers: Dict[str, str] = None):
"""Remove background from images using AI (MCP version)"""
return process_mcp_request("remove_bg", {"image_url": image_url}, headers or {})
def mcp_image_upscaler(image_url: str, scale: int = 2, headers: Dict[str, str] = None):
"""Upscale images using AI (MCP version)"""
return process_mcp_request("image_upscaler", {"image_url": image_url, "scale": scale}, headers or {})
def mcp_video_upscaler(video_url: str, headers: Dict[str, str] = None):
"""Upscale videos using AI (MCP version)"""
return process_mcp_request("video_upscaler", {"video_url": video_url}, headers or {})
def mcp_image_vectorization(image_url: str, headers: Dict[str, str] = None):
"""Convert images to vector format using AI (MCP version)"""
return process_mcp_request("image_vectorization", {"image_url": image_url}, headers or {})
def mcp_image_extends(image_url: str, headers: Dict[str, str] = None):
"""Extend images using AI (MCP version)"""
return process_mcp_request("image_extends", {"image_url": image_url}, headers or {})
def mcp_image_generator(prompt: str, headers: Dict[str, str] = None):
"""Generate images using AI from text prompts (MCP version)"""
return process_mcp_request("image_generator", {"prompt": prompt}, headers or {})
return {
"remove_bg": mcp_remove_bg,
"image_upscaler": mcp_image_upscaler,
"video_upscaler": mcp_video_upscaler,
"image_vectorization": mcp_image_vectorization,
"image_extends": mcp_image_extends,
"image_generator": mcp_image_generator
}
|