Spaces:
Paused
Paused
""" | |
MCP (Model Context Protocol) Handlers for Yuga Planner | |
This module provides an MCP tool endpoint for external integrations and is separate from the Gradio UI's workflow. | |
Key Features: | |
- Centralized logging integration with debug mode support | |
- Performance timing for API monitoring | |
- Comprehensive error handling for external consumers | |
- Automatic debug mode detection from environment variables | |
Usage: | |
The main endpoint is registered as a Gradio API and can be called by MCP clients: | |
POST /api/process_message_and_attached_file | |
{ | |
"file_path": "/path/to/calendar.ics", | |
"message_body": "Create tasks for this week's meetings" | |
} | |
Environment Variables: | |
YUGA_DEBUG: Set to "true" to enable detailed debug logging for API requests | |
Logging: | |
- Uses centralized logging system from utils.logging_config | |
- Respects YUGA_DEBUG environment variable (from CLI flag --debug) | |
- Includes performance timing and detailed error information | |
- Provides different log levels for production vs development usage | |
""" | |
import time | |
from utils.extract_calendar import extract_ical_entries | |
from factory.data.provider import generate_mcp_data | |
from services import ScheduleService, StateService | |
from factory.data.formatters import schedule_to_dataframe | |
from utils.logging_config import setup_logging, get_logger, is_debug_enabled | |
setup_logging() | |
logger = get_logger(__name__) | |
async def process_message_and_attached_file( | |
file_content: bytes, message_body: str, file_name: str = "calendar.ics" | |
) -> dict: | |
""" | |
MCP API endpoint for processing calendar files and task descriptions. | |
This is a separate workflow from the main Gradio UI and handles external API requests. | |
Args: | |
file_content (bytes): The actual file content bytes (typically .ics calendar file) | |
message_body (str): The body of the last chat message, which contains the task description | |
file_name (str): Optional filename for logging purposes | |
Returns: | |
dict: Contains confirmation, file info, calendar entries, error, and solved schedule info | |
""" | |
# Determine debug mode from environment or default to False for API calls | |
debug_mode = is_debug_enabled() | |
logger.info("MCP Handler: Processing message with attached file") | |
logger.debug("File name: %s", file_name) | |
logger.debug( | |
"File content size: %d bytes", len(file_content) if file_content else 0 | |
) | |
logger.debug("Message: %s", message_body) | |
logger.debug("Debug mode: %s", debug_mode) | |
# Track timing for API performance | |
start_time = time.time() | |
try: | |
# Step 1: Extract calendar entries from the file content | |
logger.info("Step 1: Extracting calendar entries...") | |
if not file_content: | |
logger.error("No file content provided") | |
return { | |
"error": "No file content provided", | |
"status": "no_file_content", | |
"timestamp": time.time(), | |
"processing_time_seconds": time.time() - start_time, | |
} | |
calendar_entries, error = extract_ical_entries(file_content) | |
if error: | |
logger.error("Failed to extract calendar entries: %s", error) | |
return { | |
"error": f"Failed to extract calendar entries: {error}", | |
"status": "calendar_parse_failed", | |
"timestamp": time.time(), | |
"processing_time_seconds": time.time() - start_time, | |
} | |
logger.info("Extracted %d calendar entries", len(calendar_entries)) | |
# Log the calendar entries for debugging | |
if debug_mode and calendar_entries: | |
logger.debug( | |
"Calendar entries details: %s", | |
[e.get("summary", "No summary") for e in calendar_entries[:5]], | |
) | |
# Step 2: Generate MCP data (combines calendar and LLM tasks) | |
logger.info("Step 2: Generating tasks using MCP data provider...") | |
schedule_data = await generate_mcp_data( | |
calendar_entries=calendar_entries, | |
user_message=message_body, | |
project_id="PROJECT", | |
employee_count=1, # MCP uses single user | |
days_in_schedule=365, | |
) | |
logger.info("Generated schedule with %d total tasks", len(schedule_data)) | |
# Step 3: Convert to format needed for solving | |
logger.info("Step 3: Preparing schedule for solving...") | |
# Create state data format expected by ScheduleService | |
state_data = { | |
"task_df_json": schedule_data.to_json(orient="split"), | |
"employee_count": 1, | |
"days_in_schedule": 365, | |
} | |
# Step 4: Start solving the schedule | |
logger.info("Step 4: Starting schedule solver...") | |
( | |
emp_df, | |
task_df, | |
job_id, | |
status, | |
state_data, | |
) = await ScheduleService.solve_schedule_from_state( | |
state_data=state_data, | |
job_id=None, | |
debug=debug_mode, # Respect debug mode for MCP calls | |
) | |
logger.info("Solver started with job_id: %s", job_id) | |
logger.debug("Initial status: %s", status) | |
# Step 5: Poll until the schedule is solved | |
logger.info("Step 5: Polling for solution...") | |
max_polls = 60 # Maximum 60 polls (about 2 minutes) | |
poll_interval = 2 # Poll every 2 seconds | |
for poll_count in range(max_polls): | |
if StateService.has_solved_schedule(job_id): | |
solved_schedule = StateService.get_solved_schedule(job_id) | |
# Check if we have a valid solution | |
if solved_schedule is not None: | |
processing_time = time.time() - start_time | |
logger.info( | |
"Schedule solved after %d polls! (Total time: %.2fs)", | |
poll_count + 1, | |
processing_time, | |
) | |
try: | |
# Convert to final dataframe | |
final_df = schedule_to_dataframe(solved_schedule) | |
# Generate status message | |
status_message = ScheduleService.generate_status_message( | |
solved_schedule | |
) | |
logger.info("Final Status: %s", status_message) | |
# Return comprehensive JSON response | |
response_data = { | |
"status": "success", | |
"message": "Schedule solved successfully", | |
"file_info": { | |
"name": file_name, | |
"size_bytes": len(file_content), | |
"calendar_entries_count": len(calendar_entries), | |
}, | |
"calendar_entries": calendar_entries, | |
"solution_status": status_message, | |
"schedule": final_df.to_dict( | |
orient="records" | |
), # Convert to list of dicts for JSON | |
"job_id": job_id, | |
"polls_required": poll_count + 1, | |
"processing_time_seconds": processing_time, | |
"timestamp": time.time(), | |
"debug_mode": debug_mode, | |
} | |
logger.debug( | |
"Returning JSON response with %d schedule entries", | |
len(response_data["schedule"]), | |
) | |
return response_data | |
except Exception as e: | |
logger.error( | |
"Error converting schedule to JSON: %s", | |
e, | |
exc_info=debug_mode, | |
) | |
# Return error response instead of raising | |
return { | |
"error": f"Error converting schedule to JSON: {str(e)}", | |
"status": "conversion_failed", | |
"job_id": job_id, | |
"processing_time_seconds": processing_time, | |
"timestamp": time.time(), | |
"debug_mode": debug_mode, | |
} | |
if debug_mode: | |
logger.debug("Poll %d/%d: Still solving...", poll_count + 1, max_polls) | |
time.sleep(poll_interval) | |
# If we get here, polling timed out | |
processing_time = time.time() - start_time | |
logger.warning( | |
"Polling timed out after %.2fs - returning partial results", processing_time | |
) | |
return { | |
"status": "timeout", | |
"message": "Schedule solving timed out after maximum polls", | |
"file_info": { | |
"name": file_name, | |
"size_bytes": len(file_content), | |
"calendar_entries_count": len(calendar_entries), | |
}, | |
"calendar_entries": calendar_entries, | |
"job_id": job_id, | |
"max_polls_reached": max_polls, | |
"processing_time_seconds": processing_time, | |
"timestamp": time.time(), | |
"debug_mode": debug_mode, | |
} | |
except Exception as e: | |
processing_time = time.time() - start_time | |
logger.error( | |
"MCP handler error after %.2fs: %s", processing_time, e, exc_info=debug_mode | |
) | |
return { | |
"error": str(e), | |
"status": "failed", | |
"file_name": file_name, | |
"message_body": message_body, | |
"processing_time_seconds": processing_time, | |
"timestamp": time.time(), | |
"debug_mode": debug_mode, | |
} | |