import pytest import time import pandas as pd import traceback import sys from io import StringIO from datetime import datetime, date, timedelta from typing import List, Dict, Tuple, Optional, Any from src.utils.load_secrets import load_secrets # Import standardized test utilities from tests.test_utils import get_test_logger, create_test_results # Initialize standardized test logger logger = get_test_logger(__name__) # Load environment variables for agent (if needed) load_secrets("tests/secrets/creds.py") import factory.data.provider as data_provider from src.utils.extract_calendar import extract_ical_entries from src.handlers.mcp_backend import process_message_and_attached_file from src.services import ScheduleService, StateService from src.services.data import DataService from src.factory.data.formatters import schedule_to_dataframe # Add cleanup fixture for proper solver shutdown @pytest.fixture(scope="session", autouse=True) def cleanup_solver(): """Automatically cleanup solver resources after all tests complete.""" yield # Run tests # Cleanup: Terminate all active solver jobs and shutdown solver manager try: import time from constraint_solvers.timetable.solver import solver_manager from src.state import app_state logger.info("๐Ÿงน Starting solver cleanup...") # Clear all stored schedules first app_state.clear_solved_schedules() # Terminate all active solver jobs gracefully using the Timefold terminateEarly method if hasattr(solver_manager, "terminateEarly"): # According to Timefold docs, terminateEarly() affects all jobs for this manager try: solver_manager.terminateEarly() logger.info("๐Ÿงน Terminated all active solver jobs") # Give some time for the termination to complete time.sleep(0.5) except Exception as e: logger.warning(f"โš ๏ธ Error terminating solver jobs: {e}") # Try additional cleanup methods if available if hasattr(solver_manager, "close"): try: solver_manager.close() logger.info("๐Ÿ”’ Closed solver manager") except Exception as e: logger.warning(f"โš ๏ธ Error closing solver manager: {e}") elif hasattr(solver_manager, "shutdown"): try: solver_manager.shutdown() logger.info("๐Ÿ”’ Shutdown solver manager") except Exception as e: logger.warning(f"โš ๏ธ Error shutting down solver manager: {e}") else: logger.warning( "โš ๏ธ No explicit close/shutdown method found on solver manager" ) # Additional small delay to allow cleanup to complete time.sleep(0.2) logger.info("โœ… Solver cleanup completed successfully") except Exception as e: logger.warning(f"โš ๏ธ Error during solver cleanup: {e}") # Don't fail tests if cleanup fails, but log it # Test Configuration TEST_CONFIG = { "valid_calendar": "tests/data/calendar.ics", "invalid_calendar": "tests/data/calendar_wrong.ics", "default_employee_count": 1, "default_project_id": "PROJECT", "solver_max_polls": 30, "solver_poll_interval": 1, "datetime_tolerance_seconds": 60, } # Fixtures and Helper Functions @pytest.fixture def valid_calendar_entries(): """Load valid calendar entries for testing.""" return load_calendar_entries(TEST_CONFIG["valid_calendar"]) @pytest.fixture def invalid_calendar_entries(): """Load invalid calendar entries for testing.""" return load_calendar_entries(TEST_CONFIG["invalid_calendar"]) def load_calendar_entries(file_path: str) -> List[Dict]: """Load and extract calendar entries from an iCS file.""" with open(file_path, "rb") as f: file_bytes = f.read() entries, error = extract_ical_entries(file_bytes) assert error is None, f"Calendar extraction failed: {error}" assert len(entries) > 0, "No calendar entries found" return entries def print_calendar_entries(entries: List[Dict], title: str = "Calendar Entries"): """Print calendar entries in a formatted way.""" logger.debug(f"๐Ÿ“… {title} ({len(entries)} entries):") for i, entry in enumerate(entries): start_dt = entry.get("start_datetime") end_dt = entry.get("end_datetime") logger.debug(f" {i+1}. {entry['summary']}: {start_dt} โ†’ {end_dt}") def calculate_required_schedule_days( calendar_entries: List[Dict], buffer_days: int = 30 ) -> int: """Calculate required schedule days based on calendar entries.""" if not calendar_entries: return 60 # Default earliest_date = None latest_date = None for entry in calendar_entries: for dt_key in ["start_datetime", "end_datetime"]: dt = entry.get(dt_key) if dt and isinstance(dt, datetime): entry_date = dt.date() if earliest_date is None or entry_date < earliest_date: earliest_date = entry_date if latest_date is None or entry_date > latest_date: latest_date = entry_date if earliest_date and latest_date: calendar_span = (latest_date - earliest_date).days + 1 return calendar_span + buffer_days else: return 60 # Fallback async def generate_mcp_data_helper( calendar_entries: List[Dict], user_message: str, project_id: str = None, employee_count: int = None, days_in_schedule: int = None, ) -> pd.DataFrame: """Helper function to generate MCP data with consistent defaults.""" project_id = project_id or TEST_CONFIG["default_project_id"] employee_count = employee_count or TEST_CONFIG["default_employee_count"] if days_in_schedule is None: days_in_schedule = calculate_required_schedule_days(calendar_entries) return await data_provider.generate_mcp_data( calendar_entries=calendar_entries, user_message=user_message, project_id=project_id, employee_count=employee_count, days_in_schedule=days_in_schedule, ) async def solve_schedule_with_polling( initial_df: pd.DataFrame, employee_count: int = None ) -> Optional[pd.DataFrame]: """Solve schedule with polling and return the result.""" employee_count = employee_count or TEST_CONFIG["default_employee_count"] required_days = calculate_required_schedule_days([]) # Use default # Extract date range from pinned tasks for better schedule length calculation pinned_tasks = initial_df[initial_df.get("Pinned", False) == True] if not pinned_tasks.empty: required_days = calculate_required_schedule_days_from_df(pinned_tasks) state_data = { "task_df_json": initial_df.to_json(orient="split"), "employee_count": employee_count, "days_in_schedule": required_days, } # Start solving ( emp_df, task_df, job_id, status, state_data, ) = await ScheduleService.solve_schedule_from_state( state_data=state_data, job_id=None, debug=True ) logger.info(f"Solver started with job_id: {job_id}") logger.debug(f"Initial status: {status}") # Poll for solution using the correct StateService methods max_polls = TEST_CONFIG["solver_max_polls"] poll_interval = TEST_CONFIG["solver_poll_interval"] final_df = None try: for poll_count in range(1, max_polls + 1): logger.debug(f" Polling {poll_count}/{max_polls}...") time.sleep(poll_interval) # Use StateService to check for completed solution if StateService.has_solved_schedule(job_id): solved_schedule = StateService.get_solved_schedule(job_id) if solved_schedule is not None: logger.info(f"โœ… Schedule solved after {poll_count} polls!") # Convert solved schedule to DataFrame final_df = schedule_to_dataframe(solved_schedule) # Generate status message to check for failures status_message = ScheduleService.generate_status_message( solved_schedule ) if "CONSTRAINTS VIOLATED" in status_message: logger.warning(f"โŒ Solver failed: {status_message}") final_df = None else: logger.info(f"โœ… Solver succeeded: {status_message}") break if final_df is None: logger.warning("โฐ Solver timed out after max polls") finally: # Clean up: Ensure solver job is terminated try: from constraint_solvers.timetable.solver import solver_manager # Terminate the specific job to free resources using Timefold's terminateEarly if hasattr(solver_manager, "terminateEarly"): try: solver_manager.terminateEarly(job_id) logger.info(f"๐Ÿงน Terminated solver job: {job_id}") except Exception as e: # If specific job termination fails, try to terminate all jobs logger.warning(f"โš ๏ธ Error terminating specific job {job_id}: {e}") try: solver_manager.terminateEarly() logger.info( f"๐Ÿงน Terminated all solver jobs after specific termination failed" ) except Exception as e2: logger.warning(f"โš ๏ธ Could not terminate any solver jobs: {e2}") else: logger.warning( f"โš ๏ธ terminateEarly method not available on solver_manager" ) except Exception as e: logger.warning(f"โš ๏ธ Could not access solver_manager for cleanup: {e}") return final_df def calculate_required_schedule_days_from_df( pinned_df: pd.DataFrame, buffer_days: int = 30 ) -> int: """Calculate required schedule days from DataFrame with pinned tasks.""" earliest_date = None latest_date = None for _, row in pinned_df.iterrows(): for date_col in ["Start", "End"]: date_val = row.get(date_col) if date_val is not None: try: if isinstance(date_val, str): dt = datetime.fromisoformat(date_val.replace("Z", "+00:00")) else: dt = pd.to_datetime(date_val).to_pydatetime() if earliest_date is None or dt.date() < earliest_date: earliest_date = dt.date() if latest_date is None or dt.date() > latest_date: latest_date = dt.date() except: continue if earliest_date and latest_date: calendar_span = (latest_date - earliest_date).days + 1 return calendar_span + buffer_days else: return 60 # Default def analyze_schedule_dataframe( df: pd.DataFrame, title: str = "Schedule Analysis" ) -> Dict[str, Any]: """Analyze a schedule DataFrame and return summary information.""" existing_tasks = df[df["Project"] == "EXISTING"] project_tasks = df[df["Project"] == "PROJECT"] analysis = { "total_tasks": len(df), "existing_tasks": len(existing_tasks), "project_tasks": len(project_tasks), "existing_df": existing_tasks, "project_df": project_tasks, } logger.debug(f"\n๐Ÿ“Š {title} ({analysis['total_tasks']} tasks):") logger.debug(f" - EXISTING (calendar): {analysis['existing_tasks']} tasks") logger.debug(f" - PROJECT (LLM): {analysis['project_tasks']} tasks") return analysis def verify_calendar_tasks_pinned(existing_tasks_df: pd.DataFrame) -> bool: """Verify that all calendar tasks are pinned.""" logger.debug(f"\n๐Ÿ”’ Verifying calendar tasks are pinned:") all_pinned = True for _, task in existing_tasks_df.iterrows(): is_pinned = task.get("Pinned", False) task_name = task["Task"] logger.debug(f" - {task_name}: pinned = {is_pinned}") if not is_pinned: all_pinned = False logger.warning(f" โŒ Calendar task should be pinned!") else: logger.info(f" โœ… Calendar task properly pinned") return all_pinned def verify_time_preservation( original_times: Dict, final_tasks_df: pd.DataFrame ) -> bool: """Verify that calendar tasks preserved their original times.""" logger.debug(f"\n๐Ÿ” Verifying calendar tasks preserved their original times:") time_preserved = True for _, task in final_tasks_df.iterrows(): task_name = task["Task"] final_start = task["Start"] original = original_times.get(task_name) if original is None: logger.warning(f" - {task_name}: โŒ Not found in original data") time_preserved = False continue # Normalize and compare times preserved = compare_datetime_values(original["start"], final_start) logger.debug(f" - {task_name}:") logger.debug(f" Original: {original['start']}") logger.debug(f" Final: {final_start}") logger.debug(f" Preserved: {'โœ…' if preserved else 'โŒ'}") if not preserved: time_preserved = False return time_preserved def compare_datetime_values(dt1: Any, dt2: Any, tolerance_seconds: int = None) -> bool: """Compare two datetime values with tolerance for timezone differences.""" tolerance = tolerance_seconds or TEST_CONFIG["datetime_tolerance_seconds"] # Convert to comparable datetime objects try: if isinstance(dt1, str): dt1 = datetime.fromisoformat(dt1.replace("Z", "+00:00")) if isinstance(dt2, str): dt2 = datetime.fromisoformat(dt2.replace("Z", "+00:00")) # Normalize timezones for comparison if dt1.tzinfo is not None and dt2.tzinfo is None: dt1 = dt1.replace(tzinfo=None) elif dt1.tzinfo is None and dt2.tzinfo is not None: dt2 = dt2.replace(tzinfo=None) return abs((dt1 - dt2).total_seconds()) < tolerance except: return False def store_original_calendar_times(existing_tasks_df: pd.DataFrame) -> Dict[str, Dict]: """Store original calendar task times for later comparison.""" original_times = {} for _, task in existing_tasks_df.iterrows(): original_times[task["Task"]] = { "start": task["Start"], "end": task["End"], "pinned": task.get("Pinned", False), } logger.debug("\n๐Ÿ“Œ Original calendar task times:") for task_name, times in original_times.items(): logger.debug( f" - {task_name}: {times['start']} โ†’ {times['end']} (pinned: {times['pinned']})" ) return original_times def verify_llm_tasks_scheduled(project_tasks_df: pd.DataFrame) -> bool: """Verify that LLM tasks are properly scheduled and not pinned.""" logger.debug(f"\n๐Ÿ”„ Verifying LLM tasks were properly scheduled:") all_scheduled = True for _, task in project_tasks_df.iterrows(): task_name = task["Task"] start_time = task["Start"] is_pinned = task.get("Pinned", False) logger.debug(f" - {task_name}:") logger.debug(f" Scheduled at: {start_time}") logger.debug(f" Pinned: {is_pinned}") # LLM tasks should not be pinned if is_pinned: all_scheduled = False logger.warning(f" โŒ LLM task should not be pinned!") else: logger.info(f" โœ… LLM task properly unpinned") # LLM tasks should have been scheduled to actual times if start_time is None or start_time == "": all_scheduled = False logger.warning(f" โŒ LLM task was not scheduled!") else: logger.info(f" โœ… LLM task was scheduled") return all_scheduled # Test Functions @pytest.mark.asyncio async def test_factory_demo_agent(): # Use a simple string as the project description test_input = "Test project for schedule generation." # Generate schedule data using generate_agent_data schedule = await data_provider.generate_agent_data(test_input) # Assert basic schedule properties assert len(schedule.employees) > 0 assert schedule.schedule_info.total_slots > 0 assert len(schedule.tasks) > 0 # Verify employee skills for employee in schedule.employees: assert len(employee.skills) > 0 # Check that each employee has at least one required skill assert any( skill in data_provider.SKILL_SET.required_skills for skill in employee.skills ) # Verify task properties for task in schedule.tasks: assert task.duration_slots > 0 assert task.required_skill assert hasattr(task, "project_id") # Print schedule details for debugging logger.info(f"Employee names: {[e.name for e in schedule.employees]}") logger.info(f"Tasks count: {len(schedule.tasks)}") logger.info(f"Total slots: {schedule.schedule_info.total_slots}") @pytest.mark.asyncio async def test_factory_mcp(valid_calendar_entries): print_calendar_entries(valid_calendar_entries, "Loaded Calendar Entries") # Use a made-up user message user_message = "Create a new AWS VPC." # Call generate_mcp_data directly df = await generate_mcp_data_helper(valid_calendar_entries, user_message) # Assert the DataFrame is not empty assert df is not None assert not df.empty # Print the DataFrame for debug logger.debug(df) @pytest.mark.asyncio async def test_mcp_workflow_calendar_pinning(valid_calendar_entries): """ Test that verifies calendar tasks (EXISTING) remain pinned to their original times while LLM tasks (PROJECT) are rescheduled around them in the MCP workflow. """ logger.debug("\n" + "=" * 60) logger.debug("Testing MCP Workflow: Calendar Task Pinning vs LLM Task Scheduling") logger.debug("=" * 60) print_calendar_entries(valid_calendar_entries, "Loaded Calendar Entries") # Generate initial MCP data user_message = "Set up CI/CD pipeline and configure monitoring system" initial_df = await generate_mcp_data_helper(valid_calendar_entries, user_message) # Analyze initial schedule analysis = analyze_schedule_dataframe(initial_df, "Generated Initial Data") # Store original calendar task times and verify they're pinned original_times = store_original_calendar_times(analysis["existing_df"]) calendar_pinned = verify_calendar_tasks_pinned(analysis["existing_df"]) assert calendar_pinned, "Calendar tasks should be pinned!" # Solve the schedule logger.debug(f"\n๐Ÿ”ง Running MCP workflow to solve schedule...") solved_schedule_df = await solve_schedule_with_polling(initial_df) if solved_schedule_df is None: logger.warning("โฐ Solver timed out - this might be due to complex constraints") logger.warning("โš ๏ธ Skipping verification steps for timeout case") return # Analyze final schedule (solved_schedule_df is already a DataFrame) final_analysis = analyze_schedule_dataframe(solved_schedule_df, "Final Schedule") # Verify calendar tasks preserved their times time_preserved = verify_time_preservation( original_times, final_analysis["existing_df"] ) # Verify LLM tasks were properly scheduled llm_scheduled = verify_llm_tasks_scheduled(final_analysis["project_df"]) # Final assertions assert time_preserved, "Calendar tasks did not preserve their original times!" assert llm_scheduled, "LLM tasks were not properly scheduled!" logger.info(f"\n๐ŸŽ‰ MCP Workflow Test Results:") logger.info(f"โœ… Calendar tasks preserved original times: {time_preserved}") logger.info(f"โœ… LLM tasks were properly scheduled: {llm_scheduled}") logger.info( "๐ŸŽฏ MCP workflow test passed! Calendar tasks are pinned, LLM tasks are flexible." ) @pytest.mark.asyncio async def test_calendar_validation_rejects_invalid_entries(invalid_calendar_entries): """ Test that calendar validation properly rejects entries that violate working hours constraints. """ logger.debug("\n" + "=" * 60) logger.debug("Testing Calendar Validation: Constraint Violations") logger.debug("=" * 60) print_calendar_entries(invalid_calendar_entries, "Invalid Calendar Entries") # Test that generate_mcp_data raises an error due to validation failure user_message = "Simple test task" logger.debug( f"\nโŒ Attempting to generate MCP data with invalid calendar (should fail)..." ) with pytest.raises(ValueError) as exc_info: await generate_mcp_data_helper(invalid_calendar_entries, user_message) error_message = str(exc_info.value) logger.debug(f"\nโœ… Validation correctly rejected invalid calendar:") logger.debug(f"Error: {error_message}") # Verify the error message contains expected constraint violations assert "Calendar entries violate working constraints" in error_message # Check for specific violations that should be detected assert ( "Early Morning Meeting" in error_message or "07:00" in error_message or "before 9:00" in error_message ), f"Should detect early morning violation in: {error_message}" assert ( "Evening Meeting" in error_message or "21:00" in error_message or "after 18:00" in error_message ), f"Should detect evening violation in: {error_message}" assert ( "Very Late Meeting" in error_message or "22:00" in error_message ), f"Should detect very late violation in: {error_message}" logger.info("โœ… All expected constraint violations were detected!") @pytest.mark.asyncio async def test_calendar_validation_accepts_valid_entries(valid_calendar_entries): """ Test that calendar validation accepts valid entries and processing continues normally. """ logger.debug("\n" + "=" * 60) logger.debug("Testing Calendar Validation: Valid Entries") logger.debug("=" * 60) print_calendar_entries(valid_calendar_entries, "Valid Calendar Entries") # Test that generate_mcp_data succeeds with valid calendar user_message = "Simple test task" logger.debug( f"\nโœ… Attempting to generate MCP data with valid calendar (should succeed)..." ) try: initial_df = await generate_mcp_data_helper( valid_calendar_entries, user_message ) logger.debug( f"โœ… Validation passed! Generated {len(initial_df)} tasks successfully" ) # Analyze and verify the result analysis = analyze_schedule_dataframe(initial_df, "Generated Schedule") assert analysis["existing_tasks"] > 0, "Should have calendar tasks" assert analysis["project_tasks"] > 0, "Should have LLM tasks" # Verify all calendar tasks are pinned calendar_pinned = verify_calendar_tasks_pinned(analysis["existing_df"]) assert calendar_pinned, "All calendar tasks should be properly pinned!" except Exception as e: pytest.fail(f"Valid calendar should not raise an error, but got: {e}") @pytest.mark.asyncio async def test_mcp_backend_end_to_end(): """ Test the complete MCP backend workflow using the actual handler function. This tests the full process_message_and_attached_file flow. """ logger.debug("\n" + "=" * 50) logger.debug("Testing MCP Backend End-to-End") logger.debug("=" * 50) # Test message for LLM tasks message_body = "Implement user authentication and setup database migrations" file_path = TEST_CONFIG["valid_calendar"] # Read the actual file content as bytes (MCP backend expects bytes, not file path) with open(file_path, "rb") as f: file_content = f.read() # Run the MCP backend handler logger.debug(f"๐Ÿ“จ Processing message: '{message_body}'") logger.debug(f"๐Ÿ“ Using calendar file: {file_path}") logger.debug(f"๐Ÿ“„ File content size: {len(file_content)} bytes") result = await process_message_and_attached_file(file_content, message_body) # Verify the result structure assert isinstance(result, dict), "Result should be a dictionary" assert result.get("status") in [ "success", "timeout", ], f"Unexpected status: {result.get('status')}" if result.get("status") == "success": logger.info("โœ… MCP backend completed successfully!") # Verify result contains expected fields assert "schedule" in result, "Result should contain schedule data" assert "calendar_entries" in result, "Result should contain calendar entries" assert "file_info" in result, "Result should contain file info" schedule = result["schedule"] calendar_entries = result["calendar_entries"] logger.info(f"๐Ÿ“… Calendar entries processed: {len(calendar_entries)}") logger.info(f"๐Ÿ“‹ Total scheduled tasks: {len(schedule)}") # Analyze the schedule existing_tasks = [t for t in schedule if t.get("Project") == "EXISTING"] project_tasks = [t for t in schedule if t.get("Project") == "PROJECT"] print(f"๐Ÿ”’ EXISTING (calendar) tasks: {len(existing_tasks)}") print(f"๐Ÿ”ง PROJECT (LLM) tasks: {len(project_tasks)}") # Verify we have both types of tasks assert len(existing_tasks) > 0, "Should have calendar tasks" assert len(project_tasks) > 0, "Should have LLM-generated tasks" # Check that project tasks exist and are scheduled for task in project_tasks: task_name = task.get("Task", "Unknown") start_time = task.get("Start") print(f"โฐ LLM task '{task_name}': scheduled at {start_time}") assert ( start_time is not None ), f"LLM task '{task_name}' should have a scheduled start time" print("๐ŸŽฏ MCP backend end-to-end test passed!") elif result.get("status") == "timeout": print("โฐ MCP backend timed out - this is acceptable for testing") print("The solver may need more time for complex schedules") # Still verify basic structure assert "calendar_entries" in result, "Result should contain calendar entries" assert "file_info" in result, "Result should contain file info" else: # Handle error cases error_msg = result.get("error", "Unknown error") print(f"โŒ MCP backend failed: {error_msg}") assert False, f"MCP backend failed: {error_msg}" print("โœ… MCP backend structure and behavior verified!") @pytest.mark.asyncio async def test_mcp_datetime_debug(valid_calendar_entries): """ Debug test to isolate the datetime conversion issue in MCP workflow. """ print("\n" + "=" * 50) print("Testing MCP Datetime Conversion Debug") print("=" * 50) print(f"\n๐Ÿ“… Calendar entries debug:") for i, entry in enumerate(valid_calendar_entries): print(f" {i+1}. {entry['summary']}:") print( f" start_datetime: {entry.get('start_datetime')} (type: {type(entry.get('start_datetime'))})" ) print( f" end_datetime: {entry.get('end_datetime')} (type: {type(entry.get('end_datetime'))})" ) # Generate MCP data and check the DataFrame structure user_message = "Simple test task" try: # Generate data with calculated schedule length required_days = calculate_required_schedule_days( valid_calendar_entries, buffer_days=10 ) print(f"๐Ÿ“Š Using {required_days} total schedule days") initial_df = await generate_mcp_data_helper( valid_calendar_entries, user_message, days_in_schedule=required_days ) print(f"\n๐Ÿ“Š Generated DataFrame columns: {list(initial_df.columns)}") print(f"๐Ÿ“Š DataFrame shape: {initial_df.shape}") print(f"๐Ÿ“Š DataFrame dtypes:\n{initial_df.dtypes}") # Check the Start and End column formats print(f"\n๐Ÿ•’ Start column sample:") for i, row in initial_df.head(3).iterrows(): start_val = row.get("Start") print(f" Row {i}: {start_val} (type: {type(start_val)})") # Test conversion to JSON and back json_str = initial_df.to_json(orient="split") print(f"\n๐Ÿ“„ JSON conversion successful") # Test parsing back task_df_back = pd.read_json(StringIO(json_str), orient="split") print(f"๐Ÿ“„ JSON parsing back successful") print(f"๐Ÿ“„ Parsed dtypes:\n{task_df_back.dtypes}") # Test task conversion with minimal error handling print(f"\n๐Ÿ”„ Testing task conversion...") # Only try with the first task to isolate issues single_task_df = task_df_back.head(1) print(f"Single task for testing:\n{single_task_df}") tasks = DataService.convert_dataframe_to_tasks(single_task_df) print(f"โœ… Successfully converted {len(tasks)} tasks") for task in tasks: print(f" Task: {task.description}") print(f" start_slot: {task.start_slot} (type: {type(task.start_slot)})") print(f" pinned: {task.pinned}") print(f" project_id: {task.project_id}") except Exception as e: print(f"โŒ Error in MCP data generation/conversion: {e}") traceback.print_exc() raise print("๐ŸŽฏ MCP datetime debug test completed!") if __name__ == "__main__": """Direct execution for non-pytest testing""" import asyncio logger.section("Factory Integration Tests") logger.info( "Note: This test suite is designed for pytest. For best results, run with:" ) logger.info(" pytest tests/test_factory.py -v") logger.info(" YUGA_DEBUG=true pytest tests/test_factory.py -v -s") # Create test results tracker results = create_test_results(logger) try: # Load test data logger.info("Loading test calendar data...") calendar_entries = load_calendar_entries(TEST_CONFIG["valid_calendar"]) logger.info(f"โœ… Loaded {len(calendar_entries)} calendar entries") # Run a sample factory test logger.info("Running sample factory tests...") async def run_sample_tests(): # Test MCP data generation try: logger.info("Testing MCP data generation...") df = await generate_mcp_data_helper( calendar_entries=calendar_entries, user_message="Create sample tasks for testing", ) logger.info(f"โœ… Generated MCP data with {len(df)} tasks") return True except Exception as e: logger.error(f"โŒ MCP data generation failed: {e}") return False # Run the async test success = asyncio.run(run_sample_tests()) results.add_result("mcp_data_generation", success) logger.info(f"โœ… Completed sample factory tests") except Exception as e: logger.error(f"Failed to run factory tests: {e}") results.add_result("factory_tests_setup", False, str(e)) # Generate summary and exit with appropriate code all_passed = results.summary() if not all_passed: logger.info("๐Ÿ’ก Hint: Use 'pytest tests/test_factory.py' for full test coverage") sys.exit(0 if all_passed else 1)