import requests
import json
import os
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
import time
import threading
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, IntegrationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class TelegramBotIntegration:
"""Telegram Bot integration for mobile notifications"""
def __init__(self, token: Optional[str] = None, chat_id: Optional[str] = None):
"""Initialize Telegram Bot integration
Args:
token: Telegram Bot API token (optional)
chat_id: Default chat ID to send messages to (optional)
"""
self.base_url = "https://api.telegram.org/bot"
self.token = token
self.chat_id = chat_id
self.polling_thread = None
self.polling_active = False
self.update_handlers = []
self.last_update_id = 0
@handle_exceptions
def set_token(self, token: str) -> None:
"""Set Telegram Bot API token
Args:
token: Telegram Bot API token
"""
self.token = token
@handle_exceptions
def set_chat_id(self, chat_id: str) -> None:
"""Set default chat ID
Args:
chat_id: Default chat ID to send messages to
"""
self.chat_id = chat_id
@handle_exceptions
def test_connection(self) -> bool:
"""Test Telegram Bot API connection
Returns:
True if connection is successful, False otherwise
"""
if not self.token:
logger.error("Telegram Bot token not set")
return False
try:
response = requests.get(f"{self.base_url}{self.token}/getMe")
return response.status_code == 200
except Exception as e:
logger.error(f"Telegram Bot connection test failed: {str(e)}")
return False
@handle_exceptions
def send_message(self, text: str, chat_id: Optional[str] = None,
parse_mode: str = "HTML") -> Dict[str, Any]:
"""Send a message to a chat
Args:
text: Message text
chat_id: Chat ID to send message to (optional, uses default if not provided)
parse_mode: Message parse mode (HTML, Markdown, MarkdownV2)
Returns:
API response data
"""
if not self.token:
raise IntegrationError("Telegram Bot token not set")
if not chat_id and not self.chat_id:
raise IntegrationError("Chat ID not provided and default not set")
target_chat_id = chat_id or self.chat_id
data = {
"chat_id": target_chat_id,
"text": text,
"parse_mode": parse_mode
}
response = requests.post(
f"{self.base_url}{self.token}/sendMessage",
json=data
)
if response.status_code != 200:
raise IntegrationError(f"Failed to send message: {response.text}")
return response.json().get("result", {})
@handle_exceptions
def send_notification(self, title: str, message: str, priority: str = "normal",
chat_id: Optional[str] = None) -> Dict[str, Any]:
"""Send a formatted notification message
Args:
title: Notification title
message: Notification message
priority: Notification priority (low, normal, high)
chat_id: Chat ID to send notification to (optional)
Returns:
API response data
"""
# Format priority emoji
priority_emoji = {
"low": "ā¹ļø",
"normal": "š",
"high": "šØ"
}.get(priority.lower(), "š")
# Format message with HTML
formatted_message = f"{priority_emoji} {title}\n\n{message}\n\nSent from MONA at {datetime.now().strftime('%Y-%m-%d %H:%M')}"
return self.send_message(formatted_message, chat_id)
@handle_exceptions
def send_task_notification(self, task: Dict[str, Any], chat_id: Optional[str] = None) -> Dict[str, Any]:
"""Send a notification about a task
Args:
task: Task data
chat_id: Chat ID to send notification to (optional)
Returns:
API response data
"""
title = f"Task: {task.get('title', 'Untitled')}"
# Format message content
message_parts = []
if task.get("description"):
message_parts.append(task["description"])
if task.get("status"):
status_emoji = {
"todo": "ā³",
"in_progress": "š",
"done": "ā
"
}.get(task["status"].lower(), "ā³")
message_parts.append(f"Status: {status_emoji} {task['status'].title()}")
if task.get("priority"):
priority_emoji = {
"low": "š¢",
"medium": "š”",
"high": "š“"
}.get(task["priority"].lower(), "āŖ")
message_parts.append(f"Priority: {priority_emoji} {task['priority'].title()}")
if task.get("due_date"):
try:
due_date = datetime.fromisoformat(task["due_date"].replace("Z", "+00:00"))
message_parts.append(f"Due: š
{due_date.strftime('%Y-%m-%d %H:%M')}")
except:
message_parts.append(f"Due: š
{task['due_date']}")
message = "\n".join(message_parts)
# Determine priority based on task priority
notification_priority = {
"low": "low",
"medium": "normal",
"high": "high"
}.get(task.get("priority", "").lower(), "normal")
return self.send_notification(title, message, notification_priority, chat_id)
@handle_exceptions
def start_polling(self, interval: int = 5) -> None:
"""Start polling for updates in a separate thread
Args:
interval: Polling interval in seconds
"""
if not self.token:
raise IntegrationError("Telegram Bot token not set")
if self.polling_active:
logger.warning("Polling already active")
return
self.polling_active = True
self.polling_thread = threading.Thread(target=self._polling_worker, args=(interval,))
self.polling_thread.daemon = True
self.polling_thread.start()
logger.info(f"Started polling for Telegram updates every {interval} seconds")
@handle_exceptions
def stop_polling(self) -> None:
"""Stop polling for updates"""
if not self.polling_active:
logger.warning("Polling not active")
return
self.polling_active = False
if self.polling_thread:
self.polling_thread.join(timeout=1.0)
self.polling_thread = None
logger.info("Stopped polling for Telegram updates")
@handle_exceptions
def add_update_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None:
"""Add a handler for updates
Args:
handler: Function to handle updates
"""
self.update_handlers.append(handler)
@handle_exceptions
def get_updates(self, offset: int = 0, limit: int = 100, timeout: int = 0) -> List[Dict[str, Any]]:
"""Get updates from Telegram Bot API
Args:
offset: Update offset
limit: Maximum number of updates to retrieve
timeout: Long polling timeout
Returns:
List of updates
"""
if not self.token:
raise IntegrationError("Telegram Bot token not set")
params = {
"offset": offset,
"limit": limit,
"timeout": timeout
}
response = requests.get(
f"{self.base_url}{self.token}/getUpdates",
params=params
)
if response.status_code != 200:
raise IntegrationError(f"Failed to get updates: {response.text}")
return response.json().get("result", [])
def _polling_worker(self, interval: int) -> None:
"""Worker function for polling thread
Args:
interval: Polling interval in seconds
"""
while self.polling_active:
try:
updates = self.get_updates(offset=self.last_update_id + 1)
for update in updates:
# Process update
update_id = update.get("update_id", 0)
if update_id > self.last_update_id:
self.last_update_id = update_id
# Call handlers
for handler in self.update_handlers:
try:
handler(update)
except Exception as e:
logger.error(f"Error in update handler: {str(e)}")
time.sleep(interval)
except Exception as e:
logger.error(f"Error in polling worker: {str(e)}")
time.sleep(interval)
@handle_exceptions
def process_command(self, update: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a command from an update
Args:
update: Update data
Returns:
Response data if a command was processed, None otherwise
"""
message = update.get("message", {})
text = message.get("text", "")
chat_id = message.get("chat", {}).get("id")
if not text or not text.startswith("/") or not chat_id:
return None
# Extract command and arguments
parts = text.split()
command = parts[0].lower()
args = parts[1:]
# Process commands
if command == "/start":
return self.send_message(
"š Welcome to MONA Bot! I'll send you notifications about your tasks and other important updates.",
chat_id
)
elif command == "/help":
help_text = """Available commands:
/start - Start the bot
/help - Show this help message
/tasks - Show recent tasks
/status - Show system status
/subscribe - Subscribe to notifications
/unsubscribe - Unsubscribe from notifications"""
return self.send_message(help_text, chat_id)
elif command == "/tasks":
# In a real implementation, this would fetch tasks from the database
return self.send_message("You have no pending tasks.", chat_id)
elif command == "/status":
status_text = "System Status:\nā
All systems operational\nā
Last update: " + datetime.now().strftime("%Y-%m-%d %H:%M")
return self.send_message(status_text, chat_id)
elif command == "/subscribe":
# In a real implementation, this would add the chat_id to subscribers
self.set_chat_id(str(chat_id))
return self.send_message("ā
You are now subscribed to notifications!", chat_id)
elif command == "/unsubscribe":
# In a real implementation, this would remove the chat_id from subscribers
if str(chat_id) == self.chat_id:
self.chat_id = None
return self.send_message("ā You are now unsubscribed from notifications.", chat_id)
else:
return self.send_message("Unknown command. Type /help for available commands.", chat_id)