Spaces:
Paused
Paused
| from datetime import date, timedelta | |
| import random | |
| from random import Random | |
| from itertools import product | |
| from factory.data.models import * | |
| from constraint_solvers.timetable.domain import * | |
| from utils.extract_calendar import datetime_to_slot, calculate_duration_slots | |
| ### EMPLOYEES ### | |
| FIRST_NAMES = ("Amy", "Beth", "Carl", "Dan", "Elsa", "Flo", "Gus", "Hugo", "Ivy", "Jay") | |
| LAST_NAMES = ( | |
| "Cole", | |
| "Fox", | |
| "Green", | |
| "Jones", | |
| "King", | |
| "Li", | |
| "Poe", | |
| "Rye", | |
| "Smith", | |
| "Watt", | |
| ) | |
| def generate_employees( | |
| parameters: TimeTableDataParameters, | |
| random: Random, | |
| required_skills_needed: set[str] = None, | |
| ) -> list[Employee]: | |
| """ | |
| Generates a list of Employee objects with random names and skills. | |
| Ensures that collectively the employees have all required_skills_needed. | |
| """ | |
| name_permutations = [ | |
| f"{first_name} {last_name}" | |
| for first_name, last_name in product(FIRST_NAMES, LAST_NAMES) | |
| ] | |
| random.shuffle(name_permutations) | |
| employees = [] | |
| # If specific skills are needed, ensure they're covered | |
| if required_skills_needed: | |
| skills_needed = set(required_skills_needed) | |
| # For single employee (MCP case), give them all needed skills plus some random ones | |
| if parameters.employee_count == 1: | |
| all_available_skills = list(parameters.skill_set.required_skills) + list( | |
| parameters.skill_set.optional_skills | |
| ) | |
| # Give all available skills to the single employee to handle any task | |
| employees.append( | |
| Employee(name=name_permutations[0], skills=set(all_available_skills)) | |
| ) | |
| return employees | |
| # For multiple employees, distribute needed skills and add random skills | |
| for i in range(parameters.employee_count): | |
| (count,) = random.choices( | |
| population=counts(parameters.optional_skill_distribution), | |
| weights=weights(parameters.optional_skill_distribution), | |
| ) | |
| count = min(count, len(parameters.skill_set.optional_skills)) | |
| skills = [] | |
| # Ensure each employee gets at least one required skill | |
| skills += random.sample(parameters.skill_set.required_skills, 1) | |
| # Add random optional skills | |
| skills += random.sample(parameters.skill_set.optional_skills, count) | |
| # If there are still skills needed and this is one of the first employees, | |
| # ensure they get some of the needed skills | |
| if skills_needed and i < len(skills_needed): | |
| needed_skill = skills_needed.pop() | |
| if needed_skill not in skills: | |
| skills.append(needed_skill) | |
| employees.append(Employee(name=name_permutations[i], skills=set(skills))) | |
| else: | |
| # Original random generation when no specific skills are needed | |
| for i in range(parameters.employee_count): | |
| (count,) = random.choices( | |
| population=counts(parameters.optional_skill_distribution), | |
| weights=weights(parameters.optional_skill_distribution), | |
| ) | |
| count = min(count, len(parameters.skill_set.optional_skills)) | |
| skills = [] | |
| skills += random.sample(parameters.skill_set.optional_skills, count) | |
| skills += random.sample(parameters.skill_set.required_skills, 1) | |
| employees.append(Employee(name=name_permutations[i], skills=set(skills))) | |
| return employees | |
| def generate_employee_availability( | |
| employees: list[Employee], | |
| parameters: TimeTableDataParameters, | |
| start_date: date, | |
| random: Random, | |
| ) -> None: | |
| """ | |
| Sets up random availability preferences for employees proportional to schedule length. | |
| For 365 days: | |
| - Max 21 unavailable days per employee | |
| - Max 0-12 undesired days per employee | |
| - Desired dates remain flexible (0-12 days) | |
| Scales proportionally for different schedule lengths. | |
| """ | |
| days_in_schedule = parameters.days_in_schedule | |
| # Calculate proportional limits based on 365-day baseline | |
| max_unavailable_per_employee = round((21 / 365) * days_in_schedule) | |
| max_undesired_per_employee = round((12 / 365) * days_in_schedule) | |
| max_desired_per_employee = round((12 / 365) * days_in_schedule) | |
| # Ensure minimum reasonable values | |
| max_unavailable_per_employee = max(1, max_unavailable_per_employee) | |
| max_undesired_per_employee = max(0, max_undesired_per_employee) | |
| max_desired_per_employee = max(0, max_desired_per_employee) | |
| # Generate all possible dates in the schedule | |
| all_dates = [start_date + timedelta(days=i) for i in range(days_in_schedule)] | |
| for employee in employees: | |
| # Randomly assign unavailable dates (1 to max_unavailable_per_employee) | |
| num_unavailable = random.randint(1, max_unavailable_per_employee) | |
| unavailable_dates = random.sample( | |
| all_dates, min(num_unavailable, len(all_dates)) | |
| ) | |
| employee.unavailable_dates.update(unavailable_dates) | |
| # Remove unavailable dates from remaining pool for other preferences | |
| remaining_dates = [d for d in all_dates if d not in employee.unavailable_dates] | |
| # Randomly assign undesired dates (0 to max_undesired_per_employee) | |
| if max_undesired_per_employee > 0 and remaining_dates: | |
| num_undesired = random.randint( | |
| 0, min(max_undesired_per_employee, len(remaining_dates)) | |
| ) | |
| if num_undesired > 0: | |
| undesired_dates = random.sample(remaining_dates, num_undesired) | |
| employee.undesired_dates.update(undesired_dates) | |
| remaining_dates = [ | |
| d for d in remaining_dates if d not in employee.undesired_dates | |
| ] | |
| # Randomly assign desired dates (0 to max_desired_per_employee) | |
| if max_desired_per_employee > 0 and remaining_dates: | |
| num_desired = random.randint( | |
| 0, min(max_desired_per_employee, len(remaining_dates)) | |
| ) | |
| if num_desired > 0: | |
| desired_dates = random.sample(remaining_dates, num_desired) | |
| employee.desired_dates.update(desired_dates) | |
| def generate_employee_availability_mcp( | |
| employees: list[Employee], | |
| ) -> None: | |
| """ | |
| For MCP data generator: does not set any unavailable, desired, or undesired days for employees. | |
| All availability sets remain empty. | |
| """ | |
| for employee in employees: | |
| employee.unavailable_dates.clear() | |
| employee.undesired_dates.clear() | |
| employee.desired_dates.clear() | |
| def generate_tasks( | |
| parameters: TimeTableDataParameters, | |
| random: Random, | |
| task_tuples: list[tuple[str, int]], | |
| ) -> list[Task]: | |
| """ | |
| Given a list of (description, duration) tuples, generate Task objects with randomized required_skill. | |
| """ | |
| tasks: list[Task] = [] | |
| ids = generate_task_ids() | |
| for description, duration in task_tuples: | |
| if random.random() >= 0.5: | |
| required_skill = random.choice(parameters.skill_set.required_skills) | |
| else: | |
| required_skill = random.choice(parameters.skill_set.optional_skills) | |
| tasks.append( | |
| Task( | |
| id=next(ids), | |
| description=description, | |
| duration_slots=duration, | |
| start_slot=0, # This will be assigned by the solver | |
| required_skill=required_skill, | |
| ) | |
| ) | |
| return tasks | |
| def generate_tasks_from_calendar( | |
| parameters: TimeTableDataParameters, | |
| random: Random, | |
| calendar_entries: list[dict], | |
| base_date: date = None, | |
| ) -> list[Task]: | |
| """ | |
| Generate Task objects from calendar entries with Skills. | |
| Calendar tasks are pinned to their original datetime slots. | |
| """ | |
| tasks: list[Task] = [] | |
| ids = generate_task_ids() | |
| for entry in calendar_entries: | |
| # Get skill from entry or randomly assign | |
| required_skill = entry.get("skill") | |
| if not required_skill: | |
| if random.random() >= 0.5: | |
| required_skill = random.choice(parameters.skill_set.required_skills) | |
| else: | |
| required_skill = random.choice(parameters.skill_set.optional_skills) | |
| # Calculate start_slot and duration_slots from calendar datetime info | |
| start_datetime = entry.get("start_datetime") | |
| end_datetime = entry.get("end_datetime") | |
| if start_datetime and end_datetime and base_date: | |
| # Calculate actual slot and duration from calendar times | |
| start_slot = datetime_to_slot(start_datetime, base_date) | |
| duration_slots = calculate_duration_slots(start_datetime, end_datetime) | |
| else: | |
| # Fallback to default values if datetime info is missing | |
| start_slot = entry.get("start_slot", 0) | |
| duration_slots = entry.get("duration_slots", 2) # Default 1 hour | |
| tasks.append( | |
| Task( | |
| id=next(ids), | |
| description=entry["summary"], | |
| duration_slots=duration_slots, | |
| start_slot=start_slot, | |
| required_skill=required_skill, | |
| pinned=True, # Pin calendar tasks to their original times | |
| ) | |
| ) | |
| return tasks | |
| def generate_task_ids(): | |
| """Generate sequential task IDs starting from 0.""" | |
| current_id = 0 | |
| while True: | |
| yield str(current_id) | |
| current_id += 1 | |
| # ========================= | |
| # UTILITY FUNCTIONS | |
| # ========================= | |
| def counts(distributions: tuple[CountDistribution, ...]) -> tuple[int, ...]: | |
| """ | |
| Extracts the count values from a tuple of CountDistribution objects. | |
| """ | |
| return tuple(distribution.count for distribution in distributions) | |
| def weights(distributions: tuple[CountDistribution, ...]) -> tuple[float, ...]: | |
| """ | |
| Extracts the weight values from a tuple of CountDistribution objects. | |
| """ | |
| return tuple(distribution.weight for distribution in distributions) | |
| def earliest_monday_on_or_after(target_date: date) -> date: | |
| """ | |
| Returns the earliest Monday on or after the given date. | |
| """ | |
| days_until_monday = (7 - target_date.weekday()) % 7 | |
| return target_date + timedelta(days=days_until_monday) | |
| def tasks_from_agent_output(agent_output, parameters, project_id: str = ""): | |
| """ | |
| Convert task_composer_agent output (list of (description, duration, skill)) to Task objects. | |
| """ | |
| ids = generate_task_ids() | |
| tasks = [] | |
| for sequence_num, task_data in enumerate(agent_output): | |
| # Handle both old format (description, duration) and new format (description, duration, skill) | |
| if len(task_data) == 3: | |
| description, duration, required_skill = task_data | |
| elif len(task_data) == 2: | |
| description, duration = task_data | |
| # Fallback to random assignment if no skill provided | |
| # Use a new Random instance for compatibility | |
| rng = random.Random() | |
| if rng.random() >= 0.5: | |
| required_skill = rng.choice(parameters.skill_set.required_skills) | |
| else: | |
| required_skill = rng.choice(parameters.skill_set.optional_skills) | |
| else: | |
| continue # skip invalid task data | |
| try: | |
| duration_int = int(duration) | |
| except (ValueError, TypeError): | |
| continue # skip this task if duration is invalid | |
| # Clean up skill name (remove any extra formatting) | |
| if required_skill: | |
| required_skill = required_skill.strip() | |
| # Ensure the skill exists in our skill set | |
| all_skills = list(parameters.skill_set.required_skills) + list( | |
| parameters.skill_set.optional_skills | |
| ) | |
| if required_skill not in all_skills: | |
| # If skill doesn't match exactly, try to find closest match or fallback to random | |
| rng = random.Random() | |
| required_skill = rng.choice(parameters.skill_set.required_skills) | |
| tasks.append( | |
| Task( | |
| id=next(ids), | |
| description=description, | |
| duration_slots=duration_int, | |
| start_slot=0, # Will be assigned by solver | |
| required_skill=required_skill, | |
| project_id=project_id, | |
| sequence_number=sequence_num, | |
| ) | |
| ) | |
| return tasks | |