blackopsrepl's picture
feat!: add task pinning system and refactor existing systems
e3a1efe
import os
import pandas as pd
from datetime import date
from random import Random
pd.set_option("display.max_columns", None)
from factory.data.formatters import schedule_to_dataframe
from factory.data.generators import *
from factory.data.models import *
from factory.agents.task_composer_agent import TaskComposerAgent
from constraint_solvers.timetable.domain import *
from utils.logging_config import setup_logging, get_logger
from utils.extract_calendar import (
get_earliest_calendar_date,
datetime_to_slot,
validate_calendar_working_hours,
)
# Initialize logging
setup_logging()
logger = get_logger(__name__)
# =========================
# CONSTANTS
# =========================
# Import working hours configuration
from constraint_solvers.timetable.working_hours import SLOTS_PER_WORKING_DAY
# =========================
# DEMO PARAMS
# =========================
SKILL_SET = SkillSet(
required_skills=("Frontend Engineer", "Backend Engineer", "Cloud Engineer"),
optional_skills=(
"Security Expert",
"DevOps Engineer",
"Data Engineer",
"Network Engineer",
"AI Engineer",
),
)
DATA_PARAMS = TimeTableDataParameters(
skill_set=SKILL_SET,
days_in_schedule=365,
employee_count=12,
optional_skill_distribution=(
CountDistribution(count=1, weight=3),
CountDistribution(count=2, weight=1),
),
availability_count_distribution=(
CountDistribution(count=5, weight=4),
CountDistribution(count=10, weight=3),
CountDistribution(count=15, weight=2),
CountDistribution(count=20, weight=1),
),
random_seed=37,
)
MCP_PARAMS = TimeTableDataParameters(
skill_set=SKILL_SET,
days_in_schedule=365,
# In this case, we only have one user
employee_count=1,
optional_skill_distribution=(
CountDistribution(count=len(SKILL_SET.optional_skills), weight=1),
),
availability_count_distribution=(
# Full availability for one user
CountDistribution(count=20, weight=1),
),
random_seed=37,
)
# =========================
# AGENT DATA
# =========================
async def generate_agent_data(
file, project_id: str = "", employee_count: int = None, days_in_schedule: int = None
) -> EmployeeSchedule:
# Use DATA_PARAMS, but allow override
parameters = DATA_PARAMS
if employee_count is not None or days_in_schedule is not None:
parameters = TimeTableDataParameters(
skill_set=parameters.skill_set,
days_in_schedule=days_in_schedule
if days_in_schedule is not None
else parameters.days_in_schedule,
employee_count=employee_count
if employee_count is not None
else parameters.employee_count,
optional_skill_distribution=parameters.optional_skill_distribution,
availability_count_distribution=parameters.availability_count_distribution,
random_seed=parameters.random_seed,
)
start_date: date = earliest_monday_on_or_after(date.today())
randomizer: Random = Random(parameters.random_seed)
employees: list[Employee] = generate_employees(parameters, randomizer)
total_slots: int = parameters.days_in_schedule * SLOTS_PER_WORKING_DAY
logger.debug("Processing file object: %s (type: %s)", file, type(file))
match file:
case file if hasattr(file, "read"):
input_str = file.read()
case bytes():
input_str = file.decode("utf-8")
case str() if os.path.exists(file):
with open(file, "r", encoding="utf-8") as f:
input_str = f.read()
case str():
input_str = file
case _:
raise ValueError(f"Unsupported file type: {type(file)}")
agent_output = await run_task_composer_agent(input_str, parameters)
tasks = tasks_from_agent_output(agent_output, parameters, project_id)
generate_employee_availability(employees, parameters, start_date, randomizer)
return EmployeeSchedule(
employees=employees,
tasks=tasks,
schedule_info=ScheduleInfo(total_slots=total_slots),
)
async def generate_mcp_data(
calendar_entries,
user_message: str,
project_id: str = "PROJECT",
employee_count: int = None,
days_in_schedule: int = None,
):
parameters = MCP_PARAMS
# --- DETERMINE START DATE AND REQUIRED SCHEDULE LENGTH FROM CALENDAR ---
# Validate calendar entries are within working hours first
if calendar_entries:
is_valid, error_msg = validate_calendar_working_hours(calendar_entries)
if not is_valid:
logger.error(f"❌ Calendar validation failed: {error_msg}")
raise ValueError(
f"Calendar entries violate working hours constraints:\n{error_msg}"
)
else:
logger.info(
f"βœ… All {len(calendar_entries)} calendar entries are within working hours (8:00-18:00)"
)
# Use earliest calendar date as the base, or fall back to next Monday if no calendar
earliest_calendar_date = (
get_earliest_calendar_date(calendar_entries) if calendar_entries else None
)
if earliest_calendar_date:
start_date: date = earliest_calendar_date
# Calculate required schedule length to accommodate all calendar entries
if calendar_entries and days_in_schedule is None:
# Find the latest calendar date to determine required schedule length
latest_date = earliest_calendar_date
for entry in calendar_entries:
end_dt = entry.get("end_datetime")
if end_dt and end_dt.date() > latest_date:
latest_date = end_dt.date()
# Calculate days needed plus buffer for LLM tasks
calendar_days_span = (latest_date - earliest_calendar_date).days + 1
min_required_days = (
calendar_days_span + 30
) # Add 30 days buffer for LLM tasks
# Use the larger of user-specified or calculated requirement
calculated_days = max(min_required_days, parameters.days_in_schedule)
logger.info(
f"πŸ“Š Calendar span: {calendar_days_span} days, using {calculated_days} total schedule days"
)
else:
calculated_days = (
days_in_schedule if days_in_schedule else parameters.days_in_schedule
)
else:
start_date: date = earliest_monday_on_or_after(date.today())
calculated_days = (
days_in_schedule if days_in_schedule else parameters.days_in_schedule
)
# Update parameters with calculated values
if employee_count is not None or calculated_days != parameters.days_in_schedule:
parameters = TimeTableDataParameters(
skill_set=parameters.skill_set,
days_in_schedule=calculated_days,
employee_count=employee_count
if employee_count is not None
else parameters.employee_count,
optional_skill_distribution=parameters.optional_skill_distribution,
availability_count_distribution=parameters.availability_count_distribution,
random_seed=parameters.random_seed,
)
randomizer: Random = Random(parameters.random_seed)
total_slots: int = parameters.days_in_schedule * SLOTS_PER_WORKING_DAY
# --- CALENDAR TASKS ---
calendar_tasks = generate_tasks_from_calendar(
parameters, randomizer, calendar_entries, base_date=start_date
)
# Validate that all calendar tasks have valid slot assignments
for task in calendar_tasks:
if task.start_slot >= total_slots:
logger.error(
f"Calendar task '{task.description}' has slot {task.start_slot} >= {total_slots}"
)
raise ValueError(
f"Calendar task slot {task.start_slot} exceeds schedule length {total_slots}. "
f"Increase days_in_schedule or check calendar dates."
)
# Assign project_id 'EXISTING' to all calendar tasks
for t in calendar_tasks:
t.sequence_number = 0 # will be overwritten later
t.project_id = "EXISTING"
# --- LLM TASKS ---
llm_tasks = []
if user_message:
agent_output = await run_task_composer_agent(user_message, parameters)
llm_tasks = tasks_from_agent_output(agent_output, parameters, "PROJECT")
for t in llm_tasks:
t.sequence_number = 0 # will be overwritten later
t.project_id = "PROJECT"
# --- ANALYZE REQUIRED SKILLS ---
all_tasks = calendar_tasks + llm_tasks
required_skills_needed = set()
for task in all_tasks:
if hasattr(task, "required_skill") and task.required_skill:
required_skills_needed.add(task.required_skill)
# --- GENERATE EMPLOYEES WITH REQUIRED SKILLS ---
employees: list[Employee] = generate_employees(
parameters, randomizer, required_skills_needed
)
# Set the single employee's name to 'Chatbot User'
if len(employees) == 1:
employees[0].name = "Chatbot User"
else:
raise ValueError("MCP data provider only supports one employee")
# Ensure all date sets are empty
for emp in employees:
emp.unavailable_dates.clear()
emp.undesired_dates.clear()
emp.desired_dates.clear()
# --- ASSIGN EMPLOYEES TO TASKS ---
for t in all_tasks:
t.employee = employees[0]
# Create DataFrames for debugging
calendar_df = pd.DataFrame(
[
{
"id": t.id,
"description": t.description,
"duration_slots": t.duration_slots,
"start_slot": t.start_slot,
"required_skill": t.required_skill,
"sequence_number": t.sequence_number,
"employee": t.employee.name if hasattr(t.employee, "name") else None,
"project_id": t.project_id,
}
for t in calendar_tasks
]
)
logger.debug("Generated calendar tasks DataFrame:\n%s", calendar_df)
llm_df = pd.DataFrame(
[
{
"id": t.id,
"description": t.description,
"duration_slots": t.duration_slots,
"start_slot": t.start_slot,
"required_skill": t.required_skill,
"sequence_number": t.sequence_number,
"employee": t.employee.name if hasattr(t.employee, "name") else None,
"project_id": t.project_id,
}
for t in llm_tasks
]
)
logger.debug("Generated LLM tasks DataFrame:\n%s", llm_df)
# --- ASSIGN SEQUENCE NUMBERS ---
existing_seq = 0
project_seq = 0
for t in all_tasks:
if t.project_id == "EXISTING":
t.sequence_number = existing_seq
existing_seq += 1
elif t.project_id == "PROJECT":
t.sequence_number = project_seq
project_seq += 1
schedule = EmployeeSchedule(
employees=employees,
tasks=all_tasks,
schedule_info=ScheduleInfo(
total_slots=total_slots, base_date=start_date, base_timezone=None
),
)
final_df = schedule_to_dataframe(schedule)
logger.debug("Final schedule DataFrame (MCP-aligned):\n%s", final_df)
return final_df
async def run_task_composer_agent(
input_str: str, parameters: TimeTableDataParameters
) -> list:
"""Runs the task composition agent with the given input and parameters."""
try:
# Initialize the agent
agent = TaskComposerAgent()
# Run the agent with the input
output = await agent.compose_tasks(input_str, parameters)
logger.debug("Agent output: %s", output)
return output
except Exception as e:
logger.error("Error running task composer agent: %s", e)
# Return empty list on error
return []