Spaces:
Paused
Paused
File size: 7,249 Bytes
3b9a6b5 e3a1efe 3b9a6b5 918bdb4 3b9a6b5 918bdb4 3b9a6b5 918bdb4 3b9a6b5 918bdb4 3b9a6b5 e3a1efe 918bdb4 3b9a6b5 918bdb4 3b9a6b5 e3a1efe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
from icalendar import Calendar
from datetime import datetime, date, timezone, timedelta
from typing import Optional, Tuple, List, Dict, Any
from constraint_solvers.timetable.working_hours import (
SLOTS_PER_WORKING_DAY,
MORNING_SLOTS,
)
def extract_ical_entries(file_bytes):
try:
cal = Calendar.from_ical(file_bytes)
entries = []
for component in cal.walk():
if component.name == "VEVENT":
summary = str(component.get("summary", ""))
dtstart = component.get("dtstart", "")
dtend = component.get("dtend", "")
def to_iso(val):
if hasattr(val, "dt"):
dt = val.dt
if hasattr(dt, "isoformat"):
return dt.isoformat()
return str(dt)
return str(val)
def to_datetime(val):
"""Convert icalendar datetime to Python datetime object, normalized to current timezone."""
if hasattr(val, "dt"):
dt = val.dt
if isinstance(dt, datetime):
# If timezone-aware, convert to current timezone, then make naive
if dt.tzinfo is not None:
# Convert to local timezone then strip timezone info
local_dt = dt.astimezone()
return local_dt.replace(tzinfo=None)
else:
# Already naive, return as-is
return dt
elif isinstance(dt, date):
# Convert date to datetime at 9 AM (naive)
return datetime.combine(
dt, datetime.min.time().replace(hour=9)
)
return None
# Parse datetime objects for slot calculation (now normalized to current timezone)
start_datetime = to_datetime(dtstart)
end_datetime = to_datetime(dtend)
entry = {
"summary": summary,
"dtstart": to_iso(dtstart),
"dtend": to_iso(dtend),
}
# Add datetime objects for slot calculation
if start_datetime:
entry["start_datetime"] = start_datetime
if end_datetime:
entry["end_datetime"] = end_datetime
entries.append(entry)
return entries, None
except Exception as e:
return None, str(e)
def get_earliest_calendar_date(
calendar_entries: List[Dict[str, Any]]
) -> Optional[date]:
"""
Find the earliest date from calendar entries to use as base_date for scheduling.
Args:
calendar_entries: List of calendar entry dictionaries
Returns:
The earliest date found, or None if no valid dates found
"""
earliest_date = None
for entry in calendar_entries:
start_datetime = entry.get("start_datetime")
if start_datetime and isinstance(start_datetime, datetime):
entry_date = start_datetime.date()
if earliest_date is None or entry_date < earliest_date:
earliest_date = entry_date
return earliest_date
def validate_calendar_working_hours(
calendar_entries: List[Dict[str, Any]]
) -> Tuple[bool, str]:
"""
Validate that all calendar entries fall within standard working hours (9:00-18:00) and don't span lunch break (13:00-14:00).
Args:
calendar_entries: List of calendar entry dictionaries
Returns:
Tuple of (is_valid, error_message)
"""
if not calendar_entries:
return True, ""
violations = []
for entry in calendar_entries:
summary = entry.get("summary", "Unknown Event")
start_datetime = entry.get("start_datetime")
end_datetime = entry.get("end_datetime")
if start_datetime and isinstance(start_datetime, datetime):
if start_datetime.hour < 9:
violations.append(
f"'{summary}' starts at {start_datetime.hour:02d}:{start_datetime.minute:02d} (before 9:00)"
)
if end_datetime and isinstance(end_datetime, datetime):
if end_datetime.hour > 18 or (
end_datetime.hour == 18 and end_datetime.minute > 0
):
violations.append(
f"'{summary}' ends at {end_datetime.hour:02d}:{end_datetime.minute:02d} (after 18:00)"
)
# Check for lunch break spanning (13:00-14:00)
if (
start_datetime
and end_datetime
and isinstance(start_datetime, datetime)
and isinstance(end_datetime, datetime)
):
start_hour_min = start_datetime.hour + start_datetime.minute / 60.0
end_hour_min = end_datetime.hour + end_datetime.minute / 60.0
# Check if task spans across lunch break (13:00-14:00)
if start_hour_min < 14.0 and end_hour_min > 13.0:
violations.append(
f"'{summary}' ({start_datetime.hour:02d}:{start_datetime.minute:02d}-{end_datetime.hour:02d}:{end_datetime.minute:02d}) spans lunch break (13:00-14:00)"
)
if violations:
error_msg = "Calendar entries violate working constraints:\n" + "\n".join(
violations
)
return False, error_msg
return True, ""
def datetime_to_slot(dt: datetime, base_date: date) -> int:
"""
Convert a datetime to a 30-minute slot index within working days.
Args:
dt: The datetime to convert (should be naive local time)
base_date: The base date (slot 0 = base_date at 9:00 AM local time)
Returns:
The slot index (each slot = 30 minutes within working hours)
"""
# Calculate which working day this datetime falls on
days_from_base = (dt.date() - base_date).days
# Calculate time within the working day (minutes from 9:00 AM)
minutes_from_9am = (dt.hour - 9) * 60 + dt.minute
# Convert to slot within the day (each slot = 30 minutes)
slot_within_day = round(minutes_from_9am / 30)
# Calculate total slot index
total_slot = days_from_base * SLOTS_PER_WORKING_DAY + slot_within_day
# Ensure non-negative slot
return max(0, total_slot)
def calculate_duration_slots(start_dt: datetime, end_dt: datetime) -> int:
"""
Calculate duration in 30-minute slots between two datetimes (naive local time).
Args:
start_dt: Start datetime (naive local time)
end_dt: End datetime (naive local time)
Returns:
Duration in 30-minute slots (minimum 1 slot)
"""
# Calculate difference in minutes (both should be naive local time)
time_diff = end_dt - start_dt
total_minutes = time_diff.total_seconds() / 60
# Convert to 30-minute slots, rounding up to ensure task duration is preserved
duration_slots = max(1, round(total_minutes / 30))
return duration_slots
|