from datetime import date, timedelta import random from random import Random from itertools import product from factory.data.models import * from constraint_solvers.timetable.domain import * from utils.extract_calendar import datetime_to_slot, calculate_duration_slots ### EMPLOYEES ### FIRST_NAMES = ("Amy", "Beth", "Carl", "Dan", "Elsa", "Flo", "Gus", "Hugo", "Ivy", "Jay") LAST_NAMES = ( "Cole", "Fox", "Green", "Jones", "King", "Li", "Poe", "Rye", "Smith", "Watt", ) def generate_employees( parameters: TimeTableDataParameters, random: Random, required_skills_needed: set[str] = None, ) -> list[Employee]: """ Generates a list of Employee objects with random names and skills. Ensures that collectively the employees have all required_skills_needed. """ name_permutations = [ f"{first_name} {last_name}" for first_name, last_name in product(FIRST_NAMES, LAST_NAMES) ] random.shuffle(name_permutations) employees = [] # If specific skills are needed, ensure they're covered if required_skills_needed: skills_needed = set(required_skills_needed) # For single employee (MCP case), give them all needed skills plus some random ones if parameters.employee_count == 1: all_available_skills = list(parameters.skill_set.required_skills) + list( parameters.skill_set.optional_skills ) # Give all available skills to the single employee to handle any task employees.append( Employee(name=name_permutations[0], skills=set(all_available_skills)) ) return employees # For multiple employees, distribute needed skills and add random skills for i in range(parameters.employee_count): (count,) = random.choices( population=counts(parameters.optional_skill_distribution), weights=weights(parameters.optional_skill_distribution), ) count = min(count, len(parameters.skill_set.optional_skills)) skills = [] # Ensure each employee gets at least one required skill skills += random.sample(parameters.skill_set.required_skills, 1) # Add random optional skills skills += random.sample(parameters.skill_set.optional_skills, count) # If there are still skills needed and this is one of the first employees, # ensure they get some of the needed skills if skills_needed and i < len(skills_needed): needed_skill = skills_needed.pop() if needed_skill not in skills: skills.append(needed_skill) employees.append(Employee(name=name_permutations[i], skills=set(skills))) else: # Original random generation when no specific skills are needed for i in range(parameters.employee_count): (count,) = random.choices( population=counts(parameters.optional_skill_distribution), weights=weights(parameters.optional_skill_distribution), ) count = min(count, len(parameters.skill_set.optional_skills)) skills = [] skills += random.sample(parameters.skill_set.optional_skills, count) skills += random.sample(parameters.skill_set.required_skills, 1) employees.append(Employee(name=name_permutations[i], skills=set(skills))) return employees def generate_employee_availability( employees: list[Employee], parameters: TimeTableDataParameters, start_date: date, random: Random, ) -> None: """ Sets up random availability preferences for employees proportional to schedule length. For 365 days: - Max 21 unavailable days per employee - Max 0-12 undesired days per employee - Desired dates remain flexible (0-12 days) Scales proportionally for different schedule lengths. """ days_in_schedule = parameters.days_in_schedule # Calculate proportional limits based on 365-day baseline max_unavailable_per_employee = round((21 / 365) * days_in_schedule) max_undesired_per_employee = round((12 / 365) * days_in_schedule) max_desired_per_employee = round((12 / 365) * days_in_schedule) # Ensure minimum reasonable values max_unavailable_per_employee = max(1, max_unavailable_per_employee) max_undesired_per_employee = max(0, max_undesired_per_employee) max_desired_per_employee = max(0, max_desired_per_employee) # Generate all possible dates in the schedule all_dates = [start_date + timedelta(days=i) for i in range(days_in_schedule)] for employee in employees: # Randomly assign unavailable dates (1 to max_unavailable_per_employee) num_unavailable = random.randint(1, max_unavailable_per_employee) unavailable_dates = random.sample( all_dates, min(num_unavailable, len(all_dates)) ) employee.unavailable_dates.update(unavailable_dates) # Remove unavailable dates from remaining pool for other preferences remaining_dates = [d for d in all_dates if d not in employee.unavailable_dates] # Randomly assign undesired dates (0 to max_undesired_per_employee) if max_undesired_per_employee > 0 and remaining_dates: num_undesired = random.randint( 0, min(max_undesired_per_employee, len(remaining_dates)) ) if num_undesired > 0: undesired_dates = random.sample(remaining_dates, num_undesired) employee.undesired_dates.update(undesired_dates) remaining_dates = [ d for d in remaining_dates if d not in employee.undesired_dates ] # Randomly assign desired dates (0 to max_desired_per_employee) if max_desired_per_employee > 0 and remaining_dates: num_desired = random.randint( 0, min(max_desired_per_employee, len(remaining_dates)) ) if num_desired > 0: desired_dates = random.sample(remaining_dates, num_desired) employee.desired_dates.update(desired_dates) def generate_employee_availability_mcp( employees: list[Employee], ) -> None: """ For MCP data generator: does not set any unavailable, desired, or undesired days for employees. All availability sets remain empty. """ for employee in employees: employee.unavailable_dates.clear() employee.undesired_dates.clear() employee.desired_dates.clear() def generate_tasks( parameters: TimeTableDataParameters, random: Random, task_tuples: list[tuple[str, int]], ) -> list[Task]: """ Given a list of (description, duration) tuples, generate Task objects with randomized required_skill. """ tasks: list[Task] = [] ids = generate_task_ids() for description, duration in task_tuples: if random.random() >= 0.5: required_skill = random.choice(parameters.skill_set.required_skills) else: required_skill = random.choice(parameters.skill_set.optional_skills) tasks.append( Task( id=next(ids), description=description, duration_slots=duration, start_slot=0, # This will be assigned by the solver required_skill=required_skill, ) ) return tasks def generate_tasks_from_calendar( parameters: TimeTableDataParameters, random: Random, calendar_entries: list[dict], base_date: date = None, ) -> list[Task]: """ Generate Task objects from calendar entries with Skills. Calendar tasks are pinned to their original datetime slots. """ tasks: list[Task] = [] ids = generate_task_ids() for entry in calendar_entries: # Get skill from entry or randomly assign required_skill = entry.get("skill") if not required_skill: if random.random() >= 0.5: required_skill = random.choice(parameters.skill_set.required_skills) else: required_skill = random.choice(parameters.skill_set.optional_skills) # Calculate start_slot and duration_slots from calendar datetime info start_datetime = entry.get("start_datetime") end_datetime = entry.get("end_datetime") if start_datetime and end_datetime and base_date: # Calculate actual slot and duration from calendar times start_slot = datetime_to_slot(start_datetime, base_date) duration_slots = calculate_duration_slots(start_datetime, end_datetime) else: # Fallback to default values if datetime info is missing start_slot = entry.get("start_slot", 0) duration_slots = entry.get("duration_slots", 2) # Default 1 hour tasks.append( Task( id=next(ids), description=entry["summary"], duration_slots=duration_slots, start_slot=start_slot, required_skill=required_skill, pinned=True, # Pin calendar tasks to their original times ) ) return tasks def generate_task_ids(): """Generate sequential task IDs starting from 0.""" current_id = 0 while True: yield str(current_id) current_id += 1 # ========================= # UTILITY FUNCTIONS # ========================= def counts(distributions: tuple[CountDistribution, ...]) -> tuple[int, ...]: """ Extracts the count values from a tuple of CountDistribution objects. """ return tuple(distribution.count for distribution in distributions) def weights(distributions: tuple[CountDistribution, ...]) -> tuple[float, ...]: """ Extracts the weight values from a tuple of CountDistribution objects. """ return tuple(distribution.weight for distribution in distributions) def earliest_monday_on_or_after(target_date: date) -> date: """ Returns the earliest Monday on or after the given date. """ days_until_monday = (7 - target_date.weekday()) % 7 return target_date + timedelta(days=days_until_monday) def tasks_from_agent_output(agent_output, parameters, project_id: str = ""): """ Convert task_composer_agent output (list of (description, duration, skill)) to Task objects. """ ids = generate_task_ids() tasks = [] for sequence_num, task_data in enumerate(agent_output): # Handle both old format (description, duration) and new format (description, duration, skill) if len(task_data) == 3: description, duration, required_skill = task_data elif len(task_data) == 2: description, duration = task_data # Fallback to random assignment if no skill provided # Use a new Random instance for compatibility rng = random.Random() if rng.random() >= 0.5: required_skill = rng.choice(parameters.skill_set.required_skills) else: required_skill = rng.choice(parameters.skill_set.optional_skills) else: continue # skip invalid task data try: duration_int = int(duration) except (ValueError, TypeError): continue # skip this task if duration is invalid # Clean up skill name (remove any extra formatting) if required_skill: required_skill = required_skill.strip() # Ensure the skill exists in our skill set all_skills = list(parameters.skill_set.required_skills) + list( parameters.skill_set.optional_skills ) if required_skill not in all_skills: # If skill doesn't match exactly, try to find closest match or fallback to random rng = random.Random() required_skill = rng.choice(parameters.skill_set.required_skills) tasks.append( Task( id=next(ids), description=description, duration_slots=duration_int, start_slot=0, # Will be assigned by solver required_skill=required_skill, project_id=project_id, sequence_number=sequence_num, ) ) return tasks