Spaces:
Paused
Paused
import os | |
import pandas as pd | |
from datetime import date | |
from random import Random | |
pd.set_option("display.max_columns", None) | |
from factory.data.formatters import schedule_to_dataframe | |
from factory.data.generators import * | |
from factory.data.models import * | |
from factory.agents.task_composer_agent import TaskComposerAgent | |
from constraint_solvers.timetable.domain import * | |
from utils.logging_config import setup_logging, get_logger | |
from utils.extract_calendar import ( | |
get_earliest_calendar_date, | |
datetime_to_slot, | |
validate_calendar_working_hours, | |
) | |
# Initialize logging | |
setup_logging() | |
logger = get_logger(__name__) | |
# ========================= | |
# CONSTANTS | |
# ========================= | |
# Import working hours configuration | |
from constraint_solvers.timetable.working_hours import SLOTS_PER_WORKING_DAY | |
# ========================= | |
# DEMO PARAMS | |
# ========================= | |
SKILL_SET = SkillSet( | |
required_skills=("Frontend Engineer", "Backend Engineer", "Cloud Engineer"), | |
optional_skills=( | |
"Security Expert", | |
"DevOps Engineer", | |
"Data Engineer", | |
"Network Engineer", | |
"AI Engineer", | |
), | |
) | |
DATA_PARAMS = TimeTableDataParameters( | |
skill_set=SKILL_SET, | |
days_in_schedule=365, | |
employee_count=12, | |
optional_skill_distribution=( | |
CountDistribution(count=1, weight=3), | |
CountDistribution(count=2, weight=1), | |
), | |
availability_count_distribution=( | |
CountDistribution(count=5, weight=4), | |
CountDistribution(count=10, weight=3), | |
CountDistribution(count=15, weight=2), | |
CountDistribution(count=20, weight=1), | |
), | |
random_seed=37, | |
) | |
MCP_PARAMS = TimeTableDataParameters( | |
skill_set=SKILL_SET, | |
days_in_schedule=365, | |
# In this case, we only have one user | |
employee_count=1, | |
optional_skill_distribution=( | |
CountDistribution(count=len(SKILL_SET.optional_skills), weight=1), | |
), | |
availability_count_distribution=( | |
# Full availability for one user | |
CountDistribution(count=20, weight=1), | |
), | |
random_seed=37, | |
) | |
# ========================= | |
# AGENT DATA | |
# ========================= | |
async def generate_agent_data( | |
file, project_id: str = "", employee_count: int = None, days_in_schedule: int = None | |
) -> EmployeeSchedule: | |
# Use DATA_PARAMS, but allow override | |
parameters = DATA_PARAMS | |
if employee_count is not None or days_in_schedule is not None: | |
parameters = TimeTableDataParameters( | |
skill_set=parameters.skill_set, | |
days_in_schedule=days_in_schedule | |
if days_in_schedule is not None | |
else parameters.days_in_schedule, | |
employee_count=employee_count | |
if employee_count is not None | |
else parameters.employee_count, | |
optional_skill_distribution=parameters.optional_skill_distribution, | |
availability_count_distribution=parameters.availability_count_distribution, | |
random_seed=parameters.random_seed, | |
) | |
start_date: date = earliest_monday_on_or_after(date.today()) | |
randomizer: Random = Random(parameters.random_seed) | |
employees: list[Employee] = generate_employees(parameters, randomizer) | |
total_slots: int = parameters.days_in_schedule * SLOTS_PER_WORKING_DAY | |
logger.debug("Processing file object: %s (type: %s)", file, type(file)) | |
match file: | |
case file if hasattr(file, "read"): | |
input_str = file.read() | |
case bytes(): | |
input_str = file.decode("utf-8") | |
case str() if os.path.exists(file): | |
with open(file, "r", encoding="utf-8") as f: | |
input_str = f.read() | |
case str(): | |
input_str = file | |
case _: | |
raise ValueError(f"Unsupported file type: {type(file)}") | |
agent_output = await run_task_composer_agent(input_str, parameters) | |
tasks = tasks_from_agent_output(agent_output, parameters, project_id) | |
generate_employee_availability(employees, parameters, start_date, randomizer) | |
return EmployeeSchedule( | |
employees=employees, | |
tasks=tasks, | |
schedule_info=ScheduleInfo(total_slots=total_slots), | |
) | |
async def generate_mcp_data( | |
calendar_entries, | |
user_message: str, | |
project_id: str = "PROJECT", | |
employee_count: int = None, | |
days_in_schedule: int = None, | |
): | |
parameters = MCP_PARAMS | |
# --- DETERMINE START DATE AND REQUIRED SCHEDULE LENGTH FROM CALENDAR --- | |
# Validate calendar entries are within working hours first | |
if calendar_entries: | |
is_valid, error_msg = validate_calendar_working_hours(calendar_entries) | |
if not is_valid: | |
logger.error(f"β Calendar validation failed: {error_msg}") | |
raise ValueError( | |
f"Calendar entries violate working hours constraints:\n{error_msg}" | |
) | |
else: | |
logger.info( | |
f"β All {len(calendar_entries)} calendar entries are within working hours (8:00-18:00)" | |
) | |
# Use earliest calendar date as the base, or fall back to next Monday if no calendar | |
earliest_calendar_date = ( | |
get_earliest_calendar_date(calendar_entries) if calendar_entries else None | |
) | |
if earliest_calendar_date: | |
start_date: date = earliest_calendar_date | |
# Calculate required schedule length to accommodate all calendar entries | |
if calendar_entries and days_in_schedule is None: | |
# Find the latest calendar date to determine required schedule length | |
latest_date = earliest_calendar_date | |
for entry in calendar_entries: | |
end_dt = entry.get("end_datetime") | |
if end_dt and end_dt.date() > latest_date: | |
latest_date = end_dt.date() | |
# Calculate days needed plus buffer for LLM tasks | |
calendar_days_span = (latest_date - earliest_calendar_date).days + 1 | |
min_required_days = ( | |
calendar_days_span + 30 | |
) # Add 30 days buffer for LLM tasks | |
# Use the larger of user-specified or calculated requirement | |
calculated_days = max(min_required_days, parameters.days_in_schedule) | |
logger.info( | |
f"π Calendar span: {calendar_days_span} days, using {calculated_days} total schedule days" | |
) | |
else: | |
calculated_days = ( | |
days_in_schedule if days_in_schedule else parameters.days_in_schedule | |
) | |
else: | |
start_date: date = earliest_monday_on_or_after(date.today()) | |
calculated_days = ( | |
days_in_schedule if days_in_schedule else parameters.days_in_schedule | |
) | |
# Update parameters with calculated values | |
if employee_count is not None or calculated_days != parameters.days_in_schedule: | |
parameters = TimeTableDataParameters( | |
skill_set=parameters.skill_set, | |
days_in_schedule=calculated_days, | |
employee_count=employee_count | |
if employee_count is not None | |
else parameters.employee_count, | |
optional_skill_distribution=parameters.optional_skill_distribution, | |
availability_count_distribution=parameters.availability_count_distribution, | |
random_seed=parameters.random_seed, | |
) | |
randomizer: Random = Random(parameters.random_seed) | |
total_slots: int = parameters.days_in_schedule * SLOTS_PER_WORKING_DAY | |
# --- CALENDAR TASKS --- | |
calendar_tasks = generate_tasks_from_calendar( | |
parameters, randomizer, calendar_entries, base_date=start_date | |
) | |
# Validate that all calendar tasks have valid slot assignments | |
for task in calendar_tasks: | |
if task.start_slot >= total_slots: | |
logger.error( | |
f"Calendar task '{task.description}' has slot {task.start_slot} >= {total_slots}" | |
) | |
raise ValueError( | |
f"Calendar task slot {task.start_slot} exceeds schedule length {total_slots}. " | |
f"Increase days_in_schedule or check calendar dates." | |
) | |
# Assign project_id 'EXISTING' to all calendar tasks | |
for t in calendar_tasks: | |
t.sequence_number = 0 # will be overwritten later | |
t.project_id = "EXISTING" | |
# --- LLM TASKS --- | |
llm_tasks = [] | |
if user_message: | |
agent_output = await run_task_composer_agent(user_message, parameters) | |
llm_tasks = tasks_from_agent_output(agent_output, parameters, "PROJECT") | |
for t in llm_tasks: | |
t.sequence_number = 0 # will be overwritten later | |
t.project_id = "PROJECT" | |
# --- ANALYZE REQUIRED SKILLS --- | |
all_tasks = calendar_tasks + llm_tasks | |
required_skills_needed = set() | |
for task in all_tasks: | |
if hasattr(task, "required_skill") and task.required_skill: | |
required_skills_needed.add(task.required_skill) | |
# --- GENERATE EMPLOYEES WITH REQUIRED SKILLS --- | |
employees: list[Employee] = generate_employees( | |
parameters, randomizer, required_skills_needed | |
) | |
# Set the single employee's name to 'Chatbot User' | |
if len(employees) == 1: | |
employees[0].name = "Chatbot User" | |
else: | |
raise ValueError("MCP data provider only supports one employee") | |
# Ensure all date sets are empty | |
for emp in employees: | |
emp.unavailable_dates.clear() | |
emp.undesired_dates.clear() | |
emp.desired_dates.clear() | |
# --- ASSIGN EMPLOYEES TO TASKS --- | |
for t in all_tasks: | |
t.employee = employees[0] | |
# Create DataFrames for debugging | |
calendar_df = pd.DataFrame( | |
[ | |
{ | |
"id": t.id, | |
"description": t.description, | |
"duration_slots": t.duration_slots, | |
"start_slot": t.start_slot, | |
"required_skill": t.required_skill, | |
"sequence_number": t.sequence_number, | |
"employee": t.employee.name if hasattr(t.employee, "name") else None, | |
"project_id": t.project_id, | |
} | |
for t in calendar_tasks | |
] | |
) | |
logger.debug("Generated calendar tasks DataFrame:\n%s", calendar_df) | |
llm_df = pd.DataFrame( | |
[ | |
{ | |
"id": t.id, | |
"description": t.description, | |
"duration_slots": t.duration_slots, | |
"start_slot": t.start_slot, | |
"required_skill": t.required_skill, | |
"sequence_number": t.sequence_number, | |
"employee": t.employee.name if hasattr(t.employee, "name") else None, | |
"project_id": t.project_id, | |
} | |
for t in llm_tasks | |
] | |
) | |
logger.debug("Generated LLM tasks DataFrame:\n%s", llm_df) | |
# --- ASSIGN SEQUENCE NUMBERS --- | |
existing_seq = 0 | |
project_seq = 0 | |
for t in all_tasks: | |
if t.project_id == "EXISTING": | |
t.sequence_number = existing_seq | |
existing_seq += 1 | |
elif t.project_id == "PROJECT": | |
t.sequence_number = project_seq | |
project_seq += 1 | |
schedule = EmployeeSchedule( | |
employees=employees, | |
tasks=all_tasks, | |
schedule_info=ScheduleInfo( | |
total_slots=total_slots, base_date=start_date, base_timezone=None | |
), | |
) | |
final_df = schedule_to_dataframe(schedule) | |
logger.debug("Final schedule DataFrame (MCP-aligned):\n%s", final_df) | |
return final_df | |
async def run_task_composer_agent( | |
input_str: str, parameters: TimeTableDataParameters | |
) -> list: | |
"""Runs the task composition agent with the given input and parameters.""" | |
try: | |
# Initialize the agent | |
agent = TaskComposerAgent() | |
# Run the agent with the input | |
output = await agent.compose_tasks(input_str, parameters) | |
logger.debug("Agent output: %s", output) | |
return output | |
except Exception as e: | |
logger.error("Error running task composer agent: %s", e) | |
# Return empty list on error | |
return [] | |