Spaces:
Paused
Paused
File size: 2,960 Bytes
e2685f7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
"""
MCP Client Service for Yuga Planner
This service handles Model Context Protocol client operations for scheduling tools.
Provides a clean interface for integrating MCP scheduling functionality.
"""
import base64
from typing import Dict, Any
from utils.logging_config import setup_logging, get_logger
from handlers.mcp_backend import process_message_and_attached_file
# Initialize logging
setup_logging()
logger = get_logger(__name__)
class MCPClientService:
"""Service for MCP client operations and scheduling tool integration"""
def __init__(self):
self.tools = [
{
"type": "function",
"function": {
"name": "schedule_tasks_with_calendar",
"description": "Create an optimized schedule by analyzing calendar events and breaking down tasks. Upload a calendar .ics file and describe the task you want to schedule.",
"parameters": {
"type": "object",
"properties": {
"task_description": {
"type": "string",
"description": "Description of the task or project to schedule (e.g., 'Create a new EC2 instance on AWS')",
},
"calendar_file_content": {
"type": "string",
"description": "Base64 encoded content of the .ics calendar file, or 'none' if no calendar provided",
},
},
"required": ["task_description", "calendar_file_content"],
},
},
}
]
async def call_scheduling_tool(
self, task_description: str, calendar_file_content: str
) -> Dict[str, Any]:
"""
Call the scheduling backend tool.
Args:
task_description: Description of the task to schedule
calendar_file_content: Base64 encoded calendar content or 'none'
Returns:
Dict containing the scheduling result
"""
try:
if calendar_file_content.lower() == "none":
file_content = b""
else:
file_content = base64.b64decode(calendar_file_content)
logger.debug(f"Calling MCP backend with task: {task_description}")
result = await process_message_and_attached_file(
file_content=file_content,
message_body=task_description,
file_name="calendar.ics",
)
logger.debug(
f"MCP backend returned status: {result.get('status', 'unknown')}"
)
return result
except Exception as e:
logger.error(f"Error calling scheduling tool: {e}")
return {"error": str(e), "status": "failed"}
|