import requests import json import os from typing import Dict, List, Any, Optional, Callable from datetime import datetime import time import threading from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class TelegramBotIntegration: """Telegram Bot integration for mobile notifications""" def __init__(self, token: Optional[str] = None, chat_id: Optional[str] = None): """Initialize Telegram Bot integration Args: token: Telegram Bot API token (optional) chat_id: Default chat ID to send messages to (optional) """ self.base_url = "https://api.telegram.org/bot" self.token = token self.chat_id = chat_id self.polling_thread = None self.polling_active = False self.update_handlers = [] self.last_update_id = 0 @handle_exceptions def set_token(self, token: str) -> None: """Set Telegram Bot API token Args: token: Telegram Bot API token """ self.token = token @handle_exceptions def set_chat_id(self, chat_id: str) -> None: """Set default chat ID Args: chat_id: Default chat ID to send messages to """ self.chat_id = chat_id @handle_exceptions def test_connection(self) -> bool: """Test Telegram Bot API connection Returns: True if connection is successful, False otherwise """ if not self.token: logger.error("Telegram Bot token not set") return False try: response = requests.get(f"{self.base_url}{self.token}/getMe") return response.status_code == 200 except Exception as e: logger.error(f"Telegram Bot connection test failed: {str(e)}") return False @handle_exceptions def send_message(self, text: str, chat_id: Optional[str] = None, parse_mode: str = "HTML") -> Dict[str, Any]: """Send a message to a chat Args: text: Message text chat_id: Chat ID to send message to (optional, uses default if not provided) parse_mode: Message parse mode (HTML, Markdown, MarkdownV2) Returns: API response data """ if not self.token: raise IntegrationError("Telegram Bot token not set") if not chat_id and not self.chat_id: raise IntegrationError("Chat ID not provided and default not set") target_chat_id = chat_id or self.chat_id data = { "chat_id": target_chat_id, "text": text, "parse_mode": parse_mode } response = requests.post( f"{self.base_url}{self.token}/sendMessage", json=data ) if response.status_code != 200: raise IntegrationError(f"Failed to send message: {response.text}") return response.json().get("result", {}) @handle_exceptions def send_notification(self, title: str, message: str, priority: str = "normal", chat_id: Optional[str] = None) -> Dict[str, Any]: """Send a formatted notification message Args: title: Notification title message: Notification message priority: Notification priority (low, normal, high) chat_id: Chat ID to send notification to (optional) Returns: API response data """ # Format priority emoji priority_emoji = { "low": "ā„¹ļø", "normal": "šŸ””", "high": "🚨" }.get(priority.lower(), "šŸ””") # Format message with HTML formatted_message = f"{priority_emoji} {title}\n\n{message}\n\nSent from MONA at {datetime.now().strftime('%Y-%m-%d %H:%M')}" return self.send_message(formatted_message, chat_id) @handle_exceptions def send_task_notification(self, task: Dict[str, Any], chat_id: Optional[str] = None) -> Dict[str, Any]: """Send a notification about a task Args: task: Task data chat_id: Chat ID to send notification to (optional) Returns: API response data """ title = f"Task: {task.get('title', 'Untitled')}" # Format message content message_parts = [] if task.get("description"): message_parts.append(task["description"]) if task.get("status"): status_emoji = { "todo": "ā³", "in_progress": "šŸ”„", "done": "āœ…" }.get(task["status"].lower(), "ā³") message_parts.append(f"Status: {status_emoji} {task['status'].title()}") if task.get("priority"): priority_emoji = { "low": "🟢", "medium": "🟔", "high": "šŸ”“" }.get(task["priority"].lower(), "⚪") message_parts.append(f"Priority: {priority_emoji} {task['priority'].title()}") if task.get("due_date"): try: due_date = datetime.fromisoformat(task["due_date"].replace("Z", "+00:00")) message_parts.append(f"Due: šŸ“… {due_date.strftime('%Y-%m-%d %H:%M')}") except: message_parts.append(f"Due: šŸ“… {task['due_date']}") message = "\n".join(message_parts) # Determine priority based on task priority notification_priority = { "low": "low", "medium": "normal", "high": "high" }.get(task.get("priority", "").lower(), "normal") return self.send_notification(title, message, notification_priority, chat_id) @handle_exceptions def start_polling(self, interval: int = 5) -> None: """Start polling for updates in a separate thread Args: interval: Polling interval in seconds """ if not self.token: raise IntegrationError("Telegram Bot token not set") if self.polling_active: logger.warning("Polling already active") return self.polling_active = True self.polling_thread = threading.Thread(target=self._polling_worker, args=(interval,)) self.polling_thread.daemon = True self.polling_thread.start() logger.info(f"Started polling for Telegram updates every {interval} seconds") @handle_exceptions def stop_polling(self) -> None: """Stop polling for updates""" if not self.polling_active: logger.warning("Polling not active") return self.polling_active = False if self.polling_thread: self.polling_thread.join(timeout=1.0) self.polling_thread = None logger.info("Stopped polling for Telegram updates") @handle_exceptions def add_update_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: """Add a handler for updates Args: handler: Function to handle updates """ self.update_handlers.append(handler) @handle_exceptions def get_updates(self, offset: int = 0, limit: int = 100, timeout: int = 0) -> List[Dict[str, Any]]: """Get updates from Telegram Bot API Args: offset: Update offset limit: Maximum number of updates to retrieve timeout: Long polling timeout Returns: List of updates """ if not self.token: raise IntegrationError("Telegram Bot token not set") params = { "offset": offset, "limit": limit, "timeout": timeout } response = requests.get( f"{self.base_url}{self.token}/getUpdates", params=params ) if response.status_code != 200: raise IntegrationError(f"Failed to get updates: {response.text}") return response.json().get("result", []) def _polling_worker(self, interval: int) -> None: """Worker function for polling thread Args: interval: Polling interval in seconds """ while self.polling_active: try: updates = self.get_updates(offset=self.last_update_id + 1) for update in updates: # Process update update_id = update.get("update_id", 0) if update_id > self.last_update_id: self.last_update_id = update_id # Call handlers for handler in self.update_handlers: try: handler(update) except Exception as e: logger.error(f"Error in update handler: {str(e)}") time.sleep(interval) except Exception as e: logger.error(f"Error in polling worker: {str(e)}") time.sleep(interval) @handle_exceptions def process_command(self, update: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process a command from an update Args: update: Update data Returns: Response data if a command was processed, None otherwise """ message = update.get("message", {}) text = message.get("text", "") chat_id = message.get("chat", {}).get("id") if not text or not text.startswith("/") or not chat_id: return None # Extract command and arguments parts = text.split() command = parts[0].lower() args = parts[1:] # Process commands if command == "/start": return self.send_message( "šŸ‘‹ Welcome to MONA Bot! I'll send you notifications about your tasks and other important updates.", chat_id ) elif command == "/help": help_text = """Available commands: /start - Start the bot /help - Show this help message /tasks - Show recent tasks /status - Show system status /subscribe - Subscribe to notifications /unsubscribe - Unsubscribe from notifications""" return self.send_message(help_text, chat_id) elif command == "/tasks": # In a real implementation, this would fetch tasks from the database return self.send_message("You have no pending tasks.", chat_id) elif command == "/status": status_text = "System Status:\nāœ… All systems operational\nāœ… Last update: " + datetime.now().strftime("%Y-%m-%d %H:%M") return self.send_message(status_text, chat_id) elif command == "/subscribe": # In a real implementation, this would add the chat_id to subscribers self.set_chat_id(str(chat_id)) return self.send_message("āœ… You are now subscribed to notifications!", chat_id) elif command == "/unsubscribe": # In a real implementation, this would remove the chat_id from subscribers if str(chat_id) == self.chat_id: self.chat_id = None return self.send_message("āŒ You are now unsubscribed from notifications.", chat_id) else: return self.send_message("Unknown command. Type /help for available commands.", chat_id)