yuga-planner / src /factory /data /generators.py
blackopsrepl's picture
feat!: add task pinning system and refactor existing systems
e3a1efe
raw
history blame
12.5 kB
from datetime import date, timedelta
import random
from random import Random
from itertools import product
from factory.data.models import *
from constraint_solvers.timetable.domain import *
from utils.extract_calendar import datetime_to_slot, calculate_duration_slots
### EMPLOYEES ###
FIRST_NAMES = ("Amy", "Beth", "Carl", "Dan", "Elsa", "Flo", "Gus", "Hugo", "Ivy", "Jay")
LAST_NAMES = (
"Cole",
"Fox",
"Green",
"Jones",
"King",
"Li",
"Poe",
"Rye",
"Smith",
"Watt",
)
def generate_employees(
parameters: TimeTableDataParameters,
random: Random,
required_skills_needed: set[str] = None,
) -> list[Employee]:
"""
Generates a list of Employee objects with random names and skills.
Ensures that collectively the employees have all required_skills_needed.
"""
name_permutations = [
f"{first_name} {last_name}"
for first_name, last_name in product(FIRST_NAMES, LAST_NAMES)
]
random.shuffle(name_permutations)
employees = []
# If specific skills are needed, ensure they're covered
if required_skills_needed:
skills_needed = set(required_skills_needed)
# For single employee (MCP case), give them all needed skills plus some random ones
if parameters.employee_count == 1:
all_available_skills = list(parameters.skill_set.required_skills) + list(
parameters.skill_set.optional_skills
)
# Give all available skills to the single employee to handle any task
employees.append(
Employee(name=name_permutations[0], skills=set(all_available_skills))
)
return employees
# For multiple employees, distribute needed skills and add random skills
for i in range(parameters.employee_count):
(count,) = random.choices(
population=counts(parameters.optional_skill_distribution),
weights=weights(parameters.optional_skill_distribution),
)
count = min(count, len(parameters.skill_set.optional_skills))
skills = []
# Ensure each employee gets at least one required skill
skills += random.sample(parameters.skill_set.required_skills, 1)
# Add random optional skills
skills += random.sample(parameters.skill_set.optional_skills, count)
# If there are still skills needed and this is one of the first employees,
# ensure they get some of the needed skills
if skills_needed and i < len(skills_needed):
needed_skill = skills_needed.pop()
if needed_skill not in skills:
skills.append(needed_skill)
employees.append(Employee(name=name_permutations[i], skills=set(skills)))
else:
# Original random generation when no specific skills are needed
for i in range(parameters.employee_count):
(count,) = random.choices(
population=counts(parameters.optional_skill_distribution),
weights=weights(parameters.optional_skill_distribution),
)
count = min(count, len(parameters.skill_set.optional_skills))
skills = []
skills += random.sample(parameters.skill_set.optional_skills, count)
skills += random.sample(parameters.skill_set.required_skills, 1)
employees.append(Employee(name=name_permutations[i], skills=set(skills)))
return employees
def generate_employee_availability(
employees: list[Employee],
parameters: TimeTableDataParameters,
start_date: date,
random: Random,
) -> None:
"""
Sets up random availability preferences for employees proportional to schedule length.
For 365 days:
- Max 21 unavailable days per employee
- Max 0-12 undesired days per employee
- Desired dates remain flexible (0-12 days)
Scales proportionally for different schedule lengths.
"""
days_in_schedule = parameters.days_in_schedule
# Calculate proportional limits based on 365-day baseline
max_unavailable_per_employee = round((21 / 365) * days_in_schedule)
max_undesired_per_employee = round((12 / 365) * days_in_schedule)
max_desired_per_employee = round((12 / 365) * days_in_schedule)
# Ensure minimum reasonable values
max_unavailable_per_employee = max(1, max_unavailable_per_employee)
max_undesired_per_employee = max(0, max_undesired_per_employee)
max_desired_per_employee = max(0, max_desired_per_employee)
# Generate all possible dates in the schedule
all_dates = [start_date + timedelta(days=i) for i in range(days_in_schedule)]
for employee in employees:
# Randomly assign unavailable dates (1 to max_unavailable_per_employee)
num_unavailable = random.randint(1, max_unavailable_per_employee)
unavailable_dates = random.sample(
all_dates, min(num_unavailable, len(all_dates))
)
employee.unavailable_dates.update(unavailable_dates)
# Remove unavailable dates from remaining pool for other preferences
remaining_dates = [d for d in all_dates if d not in employee.unavailable_dates]
# Randomly assign undesired dates (0 to max_undesired_per_employee)
if max_undesired_per_employee > 0 and remaining_dates:
num_undesired = random.randint(
0, min(max_undesired_per_employee, len(remaining_dates))
)
if num_undesired > 0:
undesired_dates = random.sample(remaining_dates, num_undesired)
employee.undesired_dates.update(undesired_dates)
remaining_dates = [
d for d in remaining_dates if d not in employee.undesired_dates
]
# Randomly assign desired dates (0 to max_desired_per_employee)
if max_desired_per_employee > 0 and remaining_dates:
num_desired = random.randint(
0, min(max_desired_per_employee, len(remaining_dates))
)
if num_desired > 0:
desired_dates = random.sample(remaining_dates, num_desired)
employee.desired_dates.update(desired_dates)
def generate_employee_availability_mcp(
employees: list[Employee],
) -> None:
"""
For MCP data generator: does not set any unavailable, desired, or undesired days for employees.
All availability sets remain empty.
"""
for employee in employees:
employee.unavailable_dates.clear()
employee.undesired_dates.clear()
employee.desired_dates.clear()
def generate_tasks(
parameters: TimeTableDataParameters,
random: Random,
task_tuples: list[tuple[str, int]],
) -> list[Task]:
"""
Given a list of (description, duration) tuples, generate Task objects with randomized required_skill.
"""
tasks: list[Task] = []
ids = generate_task_ids()
for description, duration in task_tuples:
if random.random() >= 0.5:
required_skill = random.choice(parameters.skill_set.required_skills)
else:
required_skill = random.choice(parameters.skill_set.optional_skills)
tasks.append(
Task(
id=next(ids),
description=description,
duration_slots=duration,
start_slot=0, # This will be assigned by the solver
required_skill=required_skill,
)
)
return tasks
def generate_tasks_from_calendar(
parameters: TimeTableDataParameters,
random: Random,
calendar_entries: list[dict],
base_date: date = None,
) -> list[Task]:
"""
Generate Task objects from calendar entries with Skills.
Calendar tasks are pinned to their original datetime slots.
"""
tasks: list[Task] = []
ids = generate_task_ids()
for entry in calendar_entries:
# Get skill from entry or randomly assign
required_skill = entry.get("skill")
if not required_skill:
if random.random() >= 0.5:
required_skill = random.choice(parameters.skill_set.required_skills)
else:
required_skill = random.choice(parameters.skill_set.optional_skills)
# Calculate start_slot and duration_slots from calendar datetime info
start_datetime = entry.get("start_datetime")
end_datetime = entry.get("end_datetime")
if start_datetime and end_datetime and base_date:
# Calculate actual slot and duration from calendar times
start_slot = datetime_to_slot(start_datetime, base_date)
duration_slots = calculate_duration_slots(start_datetime, end_datetime)
else:
# Fallback to default values if datetime info is missing
start_slot = entry.get("start_slot", 0)
duration_slots = entry.get("duration_slots", 2) # Default 1 hour
tasks.append(
Task(
id=next(ids),
description=entry["summary"],
duration_slots=duration_slots,
start_slot=start_slot,
required_skill=required_skill,
pinned=True, # Pin calendar tasks to their original times
)
)
return tasks
def generate_task_ids():
"""Generate sequential task IDs starting from 0."""
current_id = 0
while True:
yield str(current_id)
current_id += 1
# =========================
# UTILITY FUNCTIONS
# =========================
def counts(distributions: tuple[CountDistribution, ...]) -> tuple[int, ...]:
"""
Extracts the count values from a tuple of CountDistribution objects.
"""
return tuple(distribution.count for distribution in distributions)
def weights(distributions: tuple[CountDistribution, ...]) -> tuple[float, ...]:
"""
Extracts the weight values from a tuple of CountDistribution objects.
"""
return tuple(distribution.weight for distribution in distributions)
def earliest_monday_on_or_after(target_date: date) -> date:
"""
Returns the earliest Monday on or after the given date.
"""
days_until_monday = (7 - target_date.weekday()) % 7
return target_date + timedelta(days=days_until_monday)
def tasks_from_agent_output(agent_output, parameters, project_id: str = ""):
"""
Convert task_composer_agent output (list of (description, duration, skill)) to Task objects.
"""
ids = generate_task_ids()
tasks = []
for sequence_num, task_data in enumerate(agent_output):
# Handle both old format (description, duration) and new format (description, duration, skill)
if len(task_data) == 3:
description, duration, required_skill = task_data
elif len(task_data) == 2:
description, duration = task_data
# Fallback to random assignment if no skill provided
# Use a new Random instance for compatibility
rng = random.Random()
if rng.random() >= 0.5:
required_skill = rng.choice(parameters.skill_set.required_skills)
else:
required_skill = rng.choice(parameters.skill_set.optional_skills)
else:
continue # skip invalid task data
try:
duration_int = int(duration)
except (ValueError, TypeError):
continue # skip this task if duration is invalid
# Clean up skill name (remove any extra formatting)
if required_skill:
required_skill = required_skill.strip()
# Ensure the skill exists in our skill set
all_skills = list(parameters.skill_set.required_skills) + list(
parameters.skill_set.optional_skills
)
if required_skill not in all_skills:
# If skill doesn't match exactly, try to find closest match or fallback to random
rng = random.Random()
required_skill = rng.choice(parameters.skill_set.required_skills)
tasks.append(
Task(
id=next(ids),
description=description,
duration_slots=duration_int,
start_slot=0, # Will be assigned by solver
required_skill=required_skill,
project_id=project_id,
sequence_number=sequence_num,
)
)
return tasks