Spaces:
Paused
Paused
import json | |
import re | |
import asyncio | |
from typing import Dict, List, Any, Optional | |
from utils.logging_config import setup_logging, get_logger | |
# Initialize logging | |
setup_logging() | |
logger = get_logger(__name__) | |
class ToolCallAssembler: | |
"""Handles streaming tool call assembly from API responses""" | |
def __init__(self): | |
self.tool_calls: Dict[int, Dict[str, Any]] = {} | |
self.reset() | |
def reset(self): | |
"""Reset the assembler for a new conversation""" | |
self.tool_calls = {} | |
def process_delta(self, delta: Dict[str, Any]) -> None: | |
"""Process a single delta from streaming response""" | |
if "tool_calls" not in delta: | |
return | |
for tool_call_delta in delta["tool_calls"]: | |
index = tool_call_delta.get("index", 0) | |
# Initialize tool call if not exists | |
if index not in self.tool_calls: | |
self.tool_calls[index] = { | |
"id": "", | |
"type": "function", | |
"function": {"name": "", "arguments": ""}, | |
} | |
# Update tool call components | |
if "id" in tool_call_delta: | |
self.tool_calls[index]["id"] = tool_call_delta["id"] | |
if "type" in tool_call_delta: | |
self.tool_calls[index]["type"] = tool_call_delta["type"] | |
if "function" in tool_call_delta: | |
if "name" in tool_call_delta["function"]: | |
self.tool_calls[index]["function"]["name"] = tool_call_delta[ | |
"function" | |
]["name"] | |
if "arguments" in tool_call_delta["function"]: | |
# Append arguments (they come in chunks) | |
self.tool_calls[index]["function"]["arguments"] += tool_call_delta[ | |
"function" | |
]["arguments"] | |
def get_completed_tool_calls(self) -> List[Dict[str, Any]]: | |
"""Get list of completed tool calls""" | |
completed = [] | |
for tool_call in self.tool_calls.values(): | |
# Check if tool call is complete (has name and valid JSON arguments) | |
if tool_call["function"]["name"] and tool_call["function"]["arguments"]: | |
try: | |
# Validate JSON arguments | |
json.loads(tool_call["function"]["arguments"]) | |
completed.append(tool_call) | |
except json.JSONDecodeError as e: | |
logger.warning( | |
f"Tool call {tool_call['id']} has invalid JSON arguments: {e}" | |
) | |
logger.debug(f"Arguments: {tool_call['function']['arguments']}") | |
# Enhanced debugging for character 805 issue | |
args = tool_call["function"]["arguments"] | |
error_pos = getattr(e, "pos", 804) # Get error position | |
if error_pos > 0: | |
# Show context around the error | |
start = max(0, error_pos - 50) | |
end = min(len(args), error_pos + 50) | |
context = args[start:end] | |
logger.error(f"JSON Error Context (around char {error_pos}):") | |
logger.error( | |
f" Before error: '{args[max(0, error_pos-20):error_pos]}'" | |
) | |
logger.error( | |
f" At error: '{args[error_pos:error_pos+1] if error_pos < len(args) else 'END'}'" | |
) | |
logger.error( | |
f" After error: '{args[error_pos+1:error_pos+21] if error_pos < len(args) else ''}'" | |
) | |
logger.error(f" Full context: '{context}'") | |
# Check if it's the calendar data causing issues | |
if "calendar_file_content" in args: | |
# Find where calendar data starts and ends | |
cal_start = args.find('"calendar_file_content":"') | |
if cal_start != -1: | |
cal_data_start = cal_start + len( | |
'"calendar_file_content":"' | |
) | |
# Look for the closing quote | |
cal_end = args.find('"', cal_data_start + 1) | |
if cal_end != -1: | |
logger.error( | |
f"Calendar data length: {cal_end - cal_data_start}" | |
) | |
logger.error( | |
f"Calendar data starts at: {cal_data_start}" | |
) | |
logger.error(f"Calendar data ends at: {cal_end}") | |
logger.error( | |
f"Error position relative to cal data: {error_pos - cal_data_start}" | |
) | |
else: | |
logger.error("Calendar data has no closing quote!") | |
else: | |
logger.error( | |
"No calendar_file_content found in arguments" | |
) | |
# Try to repair the JSON by attempting common fixes | |
try: | |
repaired_args = self._attempt_json_repair(args) | |
if repaired_args: | |
json.loads(repaired_args) # Test if repair worked | |
logger.info( | |
f"Successfully repaired JSON for tool call {tool_call['id']}" | |
) | |
# Update the tool call with repaired arguments | |
tool_call["function"]["arguments"] = repaired_args | |
completed.append(tool_call) | |
continue | |
except: | |
logger.debug("JSON repair attempt failed") | |
return completed | |
def _attempt_json_repair(self, broken_json: str) -> str: | |
"""Attempt to repair common JSON issues""" | |
try: | |
# Common issue: Missing closing brace | |
if not broken_json.strip().endswith("}"): | |
return broken_json.strip() + "}" | |
# Common issue: Malformed calendar data causing JSON breaks | |
# The error is likely around character 795 in the base64 data | |
if '"calendar_file_content":"' in broken_json: | |
# Try to find and fix the calendar field | |
start_pattern = '"calendar_file_content":"' | |
start_idx = broken_json.find(start_pattern) | |
if start_idx != -1: | |
content_start = start_idx + len(start_pattern) | |
# Find the actual end of the JSON by looking for the pattern | |
# that should follow calendar content: "} or ", | |
remaining = broken_json[content_start:] | |
# Look for what appears to be a duplicate task_description field | |
# This indicates where the JSON got corrupted | |
duplicate_pattern = r'"[\s\S]*?\{\s*"task_description"' | |
match = re.search(duplicate_pattern, remaining) | |
if match: | |
# The base64 data should end before this match | |
clean_end_pos = content_start + match.start() | |
# Extract the clean calendar data (everything before the corrupted part) | |
clean_calendar_data = ( | |
broken_json[content_start:clean_end_pos] | |
.rstrip('"') | |
.rstrip() | |
) | |
# Extract the task description from the duplicate section | |
dup_start = content_start + match.start() | |
dup_content = broken_json[dup_start:] | |
# Find the actual JSON structure in the duplicate content | |
dup_json_start = dup_content.find('{"task_description"') | |
if dup_json_start != -1: | |
clean_rest = dup_content[dup_json_start:] | |
# Remove any trailing garbage that might cause JSON issues | |
if not clean_rest.endswith("}"): | |
# Try to find the proper ending | |
brace_count = 0 | |
proper_end = -1 | |
for i, char in enumerate(clean_rest): | |
if char == "{": | |
brace_count += 1 | |
elif char == "}": | |
brace_count -= 1 | |
if brace_count == 0: | |
proper_end = i + 1 | |
break | |
if proper_end != -1: | |
clean_rest = clean_rest[:proper_end] | |
else: | |
clean_rest = clean_rest.rstrip() + "}" | |
# Parse the clean rest to extract just the fields we need | |
try: | |
rest_obj = json.loads(clean_rest) | |
task_desc = rest_obj.get("task_description", "") | |
# Reconstruct the proper JSON | |
repaired_obj = { | |
"task_description": task_desc, | |
"calendar_file_content": clean_calendar_data, | |
} | |
repaired = json.dumps(repaired_obj) | |
logger.debug( | |
f"Successfully reconstructed JSON with task: {task_desc[:50]}..." | |
) | |
logger.debug( | |
f"Calendar data length: {len(clean_calendar_data)}" | |
) | |
return repaired | |
except json.JSONDecodeError as e: | |
logger.debug( | |
f"Failed to parse extracted duplicate section: {e}" | |
) | |
# Fallback to simple reconstruction | |
repaired = f'{{"task_description":"","calendar_file_content":"{clean_calendar_data}"}}' | |
logger.debug( | |
f"Fallback repair with calendar data length: {len(clean_calendar_data)}" | |
) | |
return repaired | |
# Alternative: Look for more traditional closing patterns | |
# Find where the base64 data properly ends with quote+comma or quote+brace | |
for i, char in enumerate(remaining): | |
if char == '"': | |
# Check what follows the quote | |
next_chars = ( | |
remaining[i : i + 3] | |
if i + 3 <= len(remaining) | |
else remaining[i:] | |
) | |
if next_chars.startswith('",') or next_chars.startswith( | |
'"}' | |
): | |
# This looks like a proper end | |
calendar_data = remaining[:i] | |
rest = remaining[i:] | |
repaired = ( | |
broken_json[:content_start] + calendar_data + rest | |
) | |
logger.debug( | |
f"Pattern-based repair: calendar data length {len(calendar_data)}" | |
) | |
return repaired | |
# If calendar repair didn't work, try other common fixes | |
# Remove any stray characters that might have been inserted | |
cleaned = re.sub( | |
r"[^\x20-\x7E]", "", broken_json | |
) # Remove non-printable chars | |
if cleaned != broken_json: | |
logger.debug("Attempted to remove non-printable characters") | |
return cleaned | |
# Last resort: Try to extract valid JSON from the string | |
# Look for the first { and try to find matching } | |
first_brace = broken_json.find("{") | |
if first_brace != -1: | |
brace_count = 0 | |
for i in range(first_brace, len(broken_json)): | |
if broken_json[i] == "{": | |
brace_count += 1 | |
elif broken_json[i] == "}": | |
brace_count -= 1 | |
if brace_count == 0: | |
candidate = broken_json[first_brace : i + 1] | |
try: | |
json.loads(candidate) | |
logger.debug( | |
f"Extracted valid JSON segment of length {len(candidate)}" | |
) | |
return candidate | |
except: | |
continue | |
return None | |
except Exception as e: | |
logger.debug(f"JSON repair attempt failed: {e}") | |
return None | |
def debug_info(self) -> Dict[str, Any]: | |
"""Get debug information about current tool calls""" | |
info = { | |
"total_tool_calls": len(self.tool_calls), | |
"completed_tool_calls": len(self.get_completed_tool_calls()), | |
"tool_calls_detail": {}, | |
} | |
for index, tool_call in self.tool_calls.items(): | |
info["tool_calls_detail"][index] = { | |
"id": tool_call["id"], | |
"function_name": tool_call["function"]["name"], | |
"arguments_length": len(tool_call["function"]["arguments"]), | |
"arguments_preview": tool_call["function"]["arguments"][:100] + "..." | |
if len(tool_call["function"]["arguments"]) > 100 | |
else tool_call["function"]["arguments"], | |
"is_json_valid": self._is_valid_json( | |
tool_call["function"]["arguments"] | |
), | |
} | |
return info | |
def _is_valid_json(self, json_str: str) -> bool: | |
"""Check if string is valid JSON""" | |
try: | |
json.loads(json_str) | |
return True | |
except: | |
return False | |
class ToolCallProcessor: | |
"""Processes completed tool calls""" | |
def __init__(self, mcp_client): | |
self.mcp_client = mcp_client | |
self.loop = None | |
try: | |
self.loop = asyncio.get_event_loop() | |
except RuntimeError: | |
self.loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.loop) | |
def process_tool_calls(self, tool_calls: List[Dict[str, Any]], message: str) -> str: | |
"""Process a list of tool calls and return response text""" | |
if not tool_calls: | |
return "" | |
response_parts = [] | |
for tool_call in tool_calls: | |
logger.info(f"Processing tool call: {tool_call}") | |
try: | |
# Extract function details | |
function_name = tool_call.get("function", {}).get("name", "") | |
function_args_str = tool_call.get("function", {}).get("arguments", "{}") | |
if function_name == "schedule_tasks_with_calendar": | |
result = self._process_scheduling_tool(function_args_str, message) | |
response_parts.append(result) | |
else: | |
logger.debug(f"Ignoring non-scheduling tool: {function_name}") | |
except Exception as e: | |
logger.error(f"Error processing tool call: {e}") | |
response_parts.append(f"\n\nβ **Error processing tool call:** {str(e)}") | |
return "".join(response_parts) | |
def _process_scheduling_tool(self, function_args_str: str, message: str) -> str: | |
"""Process scheduling tool call""" | |
try: | |
# Parse arguments | |
args = json.loads(function_args_str) | |
task_description = args.get("task_description", "") | |
calendar_content = args.get("calendar_file_content", "none") | |
# Extract calendar data from message if available (override args) | |
calendar_match = re.search(r"\[CALENDAR_DATA:([^\]]+)\]", message) | |
if calendar_match: | |
calendar_content = calendar_match.group(1) | |
logger.debug("Found calendar data in message, overriding args") | |
logger.info(f"Calling MCP scheduling tool with task: {task_description}") | |
# Call the scheduling tool | |
result = self.loop.run_until_complete( | |
self.mcp_client.call_scheduling_tool(task_description, calendar_content) | |
) | |
logger.info( | |
f"MCP tool completed with status: {result.get('status', 'unknown')}" | |
) | |
return self._format_scheduling_result(result, task_description) | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON decode error in tool arguments: {e}") | |
logger.debug(f"Raw arguments: {function_args_str}") | |
return f"\n\nβ **Error parsing tool arguments:** {str(e)}\n\nRaw arguments preview: {function_args_str[:200]}..." | |
except Exception as e: | |
logger.error(f"Tool execution error: {e}") | |
return f"\n\nβ **Error processing scheduling request:** {str(e)}" | |
def _format_scheduling_result( | |
self, result: Dict[str, Any], task_description: str | |
) -> str: | |
"""Format the scheduling result for display""" | |
if result.get("status") == "success": | |
schedule = result.get("schedule", []) | |
calendar_entries = result.get("calendar_entries", []) | |
return f""" | |
π **Schedule Generated Successfully!** | |
**Task:** {task_description} | |
**Calendar Events Processed:** {len(calendar_entries)} | |
**Total Scheduled Items:** {len(schedule)} | |
**Summary:** | |
- β Existing calendar events preserved at original times | |
- π New tasks optimized around existing commitments | |
- β° All scheduling respects business hours (9:00-18:00) | |
- π Complete schedule integration | |
To see the detailed schedule, ask me to "show the schedule as a table" or "format the schedule results". | |
""" | |
elif result.get("status") == "timeout": | |
return f""" | |
β° **Scheduling Analysis In Progress** | |
The schedule optimizer is still working on your complex task: "{task_description}" | |
This indicates a sophisticated scheduling challenge with multiple constraints. The system is finding the optimal arrangement for your tasks around existing calendar commitments. | |
You can check back in a few moments or try with a simpler task description. | |
""" | |
else: | |
error_msg = result.get("error", "Unknown error") | |
return f""" | |
β **Scheduling Error** | |
I encountered an issue while processing your scheduling request: {error_msg} | |
Please try: | |
- Simplifying your task description | |
- Checking if you have calendar conflicts | |
- Ensuring your .ics file is valid (if uploaded) | |
""" | |
def create_tool_call_handler(mcp_client): | |
"""Factory function to create a complete tool call handler""" | |
assembler = ToolCallAssembler() | |
processor = ToolCallProcessor(mcp_client) | |
return assembler, processor | |