File size: 2,960 Bytes
e2685f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
MCP Client Service for Yuga Planner

This service handles Model Context Protocol client operations for scheduling tools.
Provides a clean interface for integrating MCP scheduling functionality.
"""

import base64
from typing import Dict, Any

from utils.logging_config import setup_logging, get_logger
from handlers.mcp_backend import process_message_and_attached_file

# Initialize logging
setup_logging()
logger = get_logger(__name__)


class MCPClientService:
    """Service for MCP client operations and scheduling tool integration"""

    def __init__(self):
        self.tools = [
            {
                "type": "function",
                "function": {
                    "name": "schedule_tasks_with_calendar",
                    "description": "Create an optimized schedule by analyzing calendar events and breaking down tasks. Upload a calendar .ics file and describe the task you want to schedule.",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "task_description": {
                                "type": "string",
                                "description": "Description of the task or project to schedule (e.g., 'Create a new EC2 instance on AWS')",
                            },
                            "calendar_file_content": {
                                "type": "string",
                                "description": "Base64 encoded content of the .ics calendar file, or 'none' if no calendar provided",
                            },
                        },
                        "required": ["task_description", "calendar_file_content"],
                    },
                },
            }
        ]

    async def call_scheduling_tool(
        self, task_description: str, calendar_file_content: str
    ) -> Dict[str, Any]:
        """
        Call the scheduling backend tool.

        Args:
            task_description: Description of the task to schedule
            calendar_file_content: Base64 encoded calendar content or 'none'

        Returns:
            Dict containing the scheduling result
        """
        try:
            if calendar_file_content.lower() == "none":
                file_content = b""
            else:
                file_content = base64.b64decode(calendar_file_content)

            logger.debug(f"Calling MCP backend with task: {task_description}")
            result = await process_message_and_attached_file(
                file_content=file_content,
                message_body=task_description,
                file_name="calendar.ics",
            )

            logger.debug(
                f"MCP backend returned status: {result.get('status', 'unknown')}"
            )
            return result

        except Exception as e:
            logger.error(f"Error calling scheduling tool: {e}")
            return {"error": str(e), "status": "failed"}