yuga-planner / src /factory /data /generators.py
blackopsrepl's picture
feat!: add task pinning system and refactor existing systems
e3a1efe
from datetime import date, timedelta
import random
from random import Random
from itertools import product
from factory.data.models import *
from constraint_solvers.timetable.domain import *
from utils.extract_calendar import datetime_to_slot, calculate_duration_slots
### EMPLOYEES ###
FIRST_NAMES = ("Amy", "Beth", "Carl", "Dan", "Elsa", "Flo", "Gus", "Hugo", "Ivy", "Jay")
LAST_NAMES = (
"Cole",
"Fox",
"Green",
"Jones",
"King",
"Li",
"Poe",
"Rye",
"Smith",
"Watt",
)
def generate_employees(
parameters: TimeTableDataParameters,
random: Random,
required_skills_needed: set[str] = None,
) -> list[Employee]:
"""
Generates a list of Employee objects with random names and skills.
Ensures that collectively the employees have all required_skills_needed.
"""
name_permutations = [
f"{first_name} {last_name}"
for first_name, last_name in product(FIRST_NAMES, LAST_NAMES)
]
random.shuffle(name_permutations)
employees = []
# If specific skills are needed, ensure they're covered
if required_skills_needed:
skills_needed = set(required_skills_needed)
# For single employee (MCP case), give them all needed skills plus some random ones
if parameters.employee_count == 1:
all_available_skills = list(parameters.skill_set.required_skills) + list(
parameters.skill_set.optional_skills
)
# Give all available skills to the single employee to handle any task
employees.append(
Employee(name=name_permutations[0], skills=set(all_available_skills))
)
return employees
# For multiple employees, distribute needed skills and add random skills
for i in range(parameters.employee_count):
(count,) = random.choices(
population=counts(parameters.optional_skill_distribution),
weights=weights(parameters.optional_skill_distribution),
)
count = min(count, len(parameters.skill_set.optional_skills))
skills = []
# Ensure each employee gets at least one required skill
skills += random.sample(parameters.skill_set.required_skills, 1)
# Add random optional skills
skills += random.sample(parameters.skill_set.optional_skills, count)
# If there are still skills needed and this is one of the first employees,
# ensure they get some of the needed skills
if skills_needed and i < len(skills_needed):
needed_skill = skills_needed.pop()
if needed_skill not in skills:
skills.append(needed_skill)
employees.append(Employee(name=name_permutations[i], skills=set(skills)))
else:
# Original random generation when no specific skills are needed
for i in range(parameters.employee_count):
(count,) = random.choices(
population=counts(parameters.optional_skill_distribution),
weights=weights(parameters.optional_skill_distribution),
)
count = min(count, len(parameters.skill_set.optional_skills))
skills = []
skills += random.sample(parameters.skill_set.optional_skills, count)
skills += random.sample(parameters.skill_set.required_skills, 1)
employees.append(Employee(name=name_permutations[i], skills=set(skills)))
return employees
def generate_employee_availability(
employees: list[Employee],
parameters: TimeTableDataParameters,
start_date: date,
random: Random,
) -> None:
"""
Sets up random availability preferences for employees proportional to schedule length.
For 365 days:
- Max 21 unavailable days per employee
- Max 0-12 undesired days per employee
- Desired dates remain flexible (0-12 days)
Scales proportionally for different schedule lengths.
"""
days_in_schedule = parameters.days_in_schedule
# Calculate proportional limits based on 365-day baseline
max_unavailable_per_employee = round((21 / 365) * days_in_schedule)
max_undesired_per_employee = round((12 / 365) * days_in_schedule)
max_desired_per_employee = round((12 / 365) * days_in_schedule)
# Ensure minimum reasonable values
max_unavailable_per_employee = max(1, max_unavailable_per_employee)
max_undesired_per_employee = max(0, max_undesired_per_employee)
max_desired_per_employee = max(0, max_desired_per_employee)
# Generate all possible dates in the schedule
all_dates = [start_date + timedelta(days=i) for i in range(days_in_schedule)]
for employee in employees:
# Randomly assign unavailable dates (1 to max_unavailable_per_employee)
num_unavailable = random.randint(1, max_unavailable_per_employee)
unavailable_dates = random.sample(
all_dates, min(num_unavailable, len(all_dates))
)
employee.unavailable_dates.update(unavailable_dates)
# Remove unavailable dates from remaining pool for other preferences
remaining_dates = [d for d in all_dates if d not in employee.unavailable_dates]
# Randomly assign undesired dates (0 to max_undesired_per_employee)
if max_undesired_per_employee > 0 and remaining_dates:
num_undesired = random.randint(
0, min(max_undesired_per_employee, len(remaining_dates))
)
if num_undesired > 0:
undesired_dates = random.sample(remaining_dates, num_undesired)
employee.undesired_dates.update(undesired_dates)
remaining_dates = [
d for d in remaining_dates if d not in employee.undesired_dates
]
# Randomly assign desired dates (0 to max_desired_per_employee)
if max_desired_per_employee > 0 and remaining_dates:
num_desired = random.randint(
0, min(max_desired_per_employee, len(remaining_dates))
)
if num_desired > 0:
desired_dates = random.sample(remaining_dates, num_desired)
employee.desired_dates.update(desired_dates)
def generate_employee_availability_mcp(
employees: list[Employee],
) -> None:
"""
For MCP data generator: does not set any unavailable, desired, or undesired days for employees.
All availability sets remain empty.
"""
for employee in employees:
employee.unavailable_dates.clear()
employee.undesired_dates.clear()
employee.desired_dates.clear()
def generate_tasks(
parameters: TimeTableDataParameters,
random: Random,
task_tuples: list[tuple[str, int]],
) -> list[Task]:
"""
Given a list of (description, duration) tuples, generate Task objects with randomized required_skill.
"""
tasks: list[Task] = []
ids = generate_task_ids()
for description, duration in task_tuples:
if random.random() >= 0.5:
required_skill = random.choice(parameters.skill_set.required_skills)
else:
required_skill = random.choice(parameters.skill_set.optional_skills)
tasks.append(
Task(
id=next(ids),
description=description,
duration_slots=duration,
start_slot=0, # This will be assigned by the solver
required_skill=required_skill,
)
)
return tasks
def generate_tasks_from_calendar(
parameters: TimeTableDataParameters,
random: Random,
calendar_entries: list[dict],
base_date: date = None,
) -> list[Task]:
"""
Generate Task objects from calendar entries with Skills.
Calendar tasks are pinned to their original datetime slots.
"""
tasks: list[Task] = []
ids = generate_task_ids()
for entry in calendar_entries:
# Get skill from entry or randomly assign
required_skill = entry.get("skill")
if not required_skill:
if random.random() >= 0.5:
required_skill = random.choice(parameters.skill_set.required_skills)
else:
required_skill = random.choice(parameters.skill_set.optional_skills)
# Calculate start_slot and duration_slots from calendar datetime info
start_datetime = entry.get("start_datetime")
end_datetime = entry.get("end_datetime")
if start_datetime and end_datetime and base_date:
# Calculate actual slot and duration from calendar times
start_slot = datetime_to_slot(start_datetime, base_date)
duration_slots = calculate_duration_slots(start_datetime, end_datetime)
else:
# Fallback to default values if datetime info is missing
start_slot = entry.get("start_slot", 0)
duration_slots = entry.get("duration_slots", 2) # Default 1 hour
tasks.append(
Task(
id=next(ids),
description=entry["summary"],
duration_slots=duration_slots,
start_slot=start_slot,
required_skill=required_skill,
pinned=True, # Pin calendar tasks to their original times
)
)
return tasks
def generate_task_ids():
"""Generate sequential task IDs starting from 0."""
current_id = 0
while True:
yield str(current_id)
current_id += 1
# =========================
# UTILITY FUNCTIONS
# =========================
def counts(distributions: tuple[CountDistribution, ...]) -> tuple[int, ...]:
"""
Extracts the count values from a tuple of CountDistribution objects.
"""
return tuple(distribution.count for distribution in distributions)
def weights(distributions: tuple[CountDistribution, ...]) -> tuple[float, ...]:
"""
Extracts the weight values from a tuple of CountDistribution objects.
"""
return tuple(distribution.weight for distribution in distributions)
def earliest_monday_on_or_after(target_date: date) -> date:
"""
Returns the earliest Monday on or after the given date.
"""
days_until_monday = (7 - target_date.weekday()) % 7
return target_date + timedelta(days=days_until_monday)
def tasks_from_agent_output(agent_output, parameters, project_id: str = ""):
"""
Convert task_composer_agent output (list of (description, duration, skill)) to Task objects.
"""
ids = generate_task_ids()
tasks = []
for sequence_num, task_data in enumerate(agent_output):
# Handle both old format (description, duration) and new format (description, duration, skill)
if len(task_data) == 3:
description, duration, required_skill = task_data
elif len(task_data) == 2:
description, duration = task_data
# Fallback to random assignment if no skill provided
# Use a new Random instance for compatibility
rng = random.Random()
if rng.random() >= 0.5:
required_skill = rng.choice(parameters.skill_set.required_skills)
else:
required_skill = rng.choice(parameters.skill_set.optional_skills)
else:
continue # skip invalid task data
try:
duration_int = int(duration)
except (ValueError, TypeError):
continue # skip this task if duration is invalid
# Clean up skill name (remove any extra formatting)
if required_skill:
required_skill = required_skill.strip()
# Ensure the skill exists in our skill set
all_skills = list(parameters.skill_set.required_skills) + list(
parameters.skill_set.optional_skills
)
if required_skill not in all_skills:
# If skill doesn't match exactly, try to find closest match or fallback to random
rng = random.Random()
required_skill = rng.choice(parameters.skill_set.required_skills)
tasks.append(
Task(
id=next(ids),
description=description,
duration_slots=duration_int,
start_slot=0, # Will be assigned by solver
required_skill=required_skill,
project_id=project_id,
sequence_number=sequence_num,
)
)
return tasks