import requests import json import os from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta import pytz from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class GoogleCalendarIntegration: """Google Calendar API integration for event synchronization""" def __init__(self, credentials: Optional[Dict[str, Any]] = None): """Initialize Google Calendar integration Args: credentials: Google API credentials (optional) """ self.base_url = "https://www.googleapis.com/calendar/v3" self.credentials = credentials self.token = credentials.get("access_token") if credentials else None self.refresh_token = credentials.get("refresh_token") if credentials else None self.token_expiry = credentials.get("expiry") if credentials else None self.headers = {} if self.token: self.headers["Authorization"] = f"Bearer {self.token}" @handle_exceptions def set_credentials(self, credentials: Dict[str, Any]) -> None: """Set Google API credentials Args: credentials: Google API credentials """ self.credentials = credentials self.token = credentials.get("access_token") self.refresh_token = credentials.get("refresh_token") self.token_expiry = credentials.get("expiry") if self.token: self.headers["Authorization"] = f"Bearer {self.token}" @handle_exceptions def refresh_access_token(self) -> bool: """Refresh the access token using the refresh token Returns: True if token refresh is successful, False otherwise """ # In a real implementation, this would use the refresh token to get a new access token # For demo purposes, we'll just return False logger.warning("Token refresh not implemented in demo") return False @handle_exceptions def test_connection(self) -> bool: """Test Google Calendar API connection Returns: True if connection is successful, False otherwise """ try: # Check if token is expired if self.token_expiry and datetime.fromisoformat(self.token_expiry) < datetime.now(): # Try to refresh token if not self.refresh_access_token(): return False response = requests.get( f"{self.base_url}/users/me/calendarList", headers=self.headers ) return response.status_code == 200 except Exception as e: logger.error(f"Google Calendar connection test failed: {str(e)}") return False @handle_exceptions def get_calendars(self) -> List[Dict[str, Any]]: """Get user calendars Returns: List of calendars """ response = requests.get( f"{self.base_url}/users/me/calendarList", headers=self.headers ) if response.status_code != 200: raise IntegrationError(f"Failed to get calendars: {response.text}") calendars = response.json().get("items", []) return [{ "id": calendar.get("id"), "name": calendar.get("summary"), "description": calendar.get("description"), "timezone": calendar.get("timeZone"), "primary": calendar.get("primary", False), "color": calendar.get("backgroundColor") } for calendar in calendars] @handle_exceptions def get_events(self, calendar_id: str, start_time: datetime = None, end_time: datetime = None, max_results: int = 100) -> List[Dict[str, Any]]: """Get calendar events Args: calendar_id: Calendar ID start_time: Start time for events (optional) end_time: End time for events (optional) max_results: Maximum number of events to return Returns: List of events """ params = { "maxResults": max_results, "singleEvents": True, "orderBy": "startTime" } if start_time: params["timeMin"] = start_time.isoformat() + "Z" # RFC3339 timestamp if end_time: params["timeMax"] = end_time.isoformat() + "Z" # RFC3339 timestamp response = requests.get( f"{self.base_url}/calendars/{calendar_id}/events", headers=self.headers, params=params ) if response.status_code != 200: raise IntegrationError(f"Failed to get events: {response.text}") events = response.json().get("items", []) return [self._format_event(event) for event in events] @handle_exceptions def create_event(self, calendar_id: str, summary: str, start: Dict[str, Any], end: Dict[str, Any], description: str = None, location: str = None, attendees: List[Dict[str, str]] = None) -> Dict[str, Any]: """Create a new calendar event Args: calendar_id: Calendar ID summary: Event summary/title start: Event start time (dict with 'dateTime' or 'date') end: Event end time (dict with 'dateTime' or 'date') description: Event description (optional) location: Event location (optional) attendees: List of attendees (optional) Returns: Created event data """ data = { "summary": summary, "start": start, "end": end } if description: data["description"] = description if location: data["location"] = location if attendees: data["attendees"] = attendees response = requests.post( f"{self.base_url}/calendars/{calendar_id}/events", headers=self.headers, json=data ) if response.status_code not in [200, 201]: raise IntegrationError(f"Failed to create event: {response.text}") event = response.json() return self._format_event(event) @handle_exceptions def update_event(self, calendar_id: str, event_id: str, summary: str = None, start: Dict[str, Any] = None, end: Dict[str, Any] = None, description: str = None, location: str = None) -> Dict[str, Any]: """Update an existing calendar event Args: calendar_id: Calendar ID event_id: Event ID summary: Event summary/title (optional) start: Event start time (optional) end: Event end time (optional) description: Event description (optional) location: Event location (optional) Returns: Updated event data """ data = {} if summary is not None: data["summary"] = summary if start is not None: data["start"] = start if end is not None: data["end"] = end if description is not None: data["description"] = description if location is not None: data["location"] = location response = requests.patch( f"{self.base_url}/calendars/{calendar_id}/events/{event_id}", headers=self.headers, json=data ) if response.status_code != 200: raise IntegrationError(f"Failed to update event: {response.text}") event = response.json() return self._format_event(event) @handle_exceptions def delete_event(self, calendar_id: str, event_id: str) -> bool: """Delete a calendar event Args: calendar_id: Calendar ID event_id: Event ID Returns: True if deletion is successful """ response = requests.delete( f"{self.base_url}/calendars/{calendar_id}/events/{event_id}", headers=self.headers ) if response.status_code not in [200, 204]: raise IntegrationError(f"Failed to delete event: {response.text}") return True @handle_exceptions def convert_events_to_tasks(self, calendar_id: str, days: int = 7) -> List[Dict[str, Any]]: """Convert Google Calendar events to MONA tasks Args: calendar_id: Calendar ID days: Number of days to look ahead Returns: List of tasks """ # Get events for the next 'days' days start_time = datetime.now(pytz.UTC) end_time = start_time + timedelta(days=days) events = self.get_events(calendar_id, start_time, end_time) tasks = [] for event in events: # Skip all-day events if "date" in event.get("start", {}): continue # Convert event to task format start_time = datetime.fromisoformat(event["start"].get("dateTime", "").replace("Z", "+00:00")) task = { "id": f"gcal-event-{event['id']}", "title": event["summary"], "description": event.get("description", ""), "status": "todo", "priority": "medium", # Default priority "created_at": event.get("created", datetime.now().isoformat()), "updated_at": event.get("updated", datetime.now().isoformat()), "due_date": start_time.isoformat(), "source": "google_calendar", "source_id": event["id"], "source_url": event.get("htmlLink", ""), "metadata": { "calendar_id": calendar_id, "location": event.get("location", ""), "start": event["start"], "end": event["end"], "attendees": event.get("attendees", []) } } tasks.append(task) return tasks @handle_exceptions def sync_tasks_to_events(self, tasks: List[Dict[str, Any]], calendar_id: str) -> List[Dict[str, Any]]: """Sync MONA tasks to Google Calendar events Args: tasks: List of tasks to sync calendar_id: Calendar ID Returns: List of synced tasks with updated metadata """ synced_tasks = [] for task in tasks: # Skip tasks that are already synced with Google Calendar if task.get("source") == "google_calendar": synced_tasks.append(task) continue # Skip tasks without due dates if not task.get("due_date"): synced_tasks.append(task) continue # Create event from task due_date = datetime.fromisoformat(task["due_date"].replace("Z", "+00:00")) end_date = due_date + timedelta(hours=1) # Default 1-hour event event_data = { "summary": task["title"], "description": task.get("description", ""), "start": { "dateTime": due_date.isoformat(), "timeZone": "UTC" }, "end": { "dateTime": end_date.isoformat(), "timeZone": "UTC" } } # Create event event = self.create_event( calendar_id=calendar_id, summary=event_data["summary"], start=event_data["start"], end=event_data["end"], description=event_data["description"] ) # Update task with Google Calendar metadata task.update({ "source": "google_calendar", "source_id": event["id"], "source_url": event.get("htmlLink", ""), "metadata": { "calendar_id": calendar_id, "start": event["start"], "end": event["end"] } }) synced_tasks.append(task) return synced_tasks def _format_event(self, event: Dict[str, Any]) -> Dict[str, Any]: """Format event data for consistent output Args: event: Raw event data from API Returns: Formatted event data """ return { "id": event.get("id"), "summary": event.get("summary"), "description": event.get("description"), "location": event.get("location"), "start": event.get("start"), "end": event.get("end"), "created": event.get("created"), "updated": event.get("updated"), "status": event.get("status"), "htmlLink": event.get("htmlLink"), "attendees": event.get("attendees", []), "organizer": event.get("organizer") }