|
import requests |
|
import json |
|
import os |
|
from typing import Dict, List, Any, Optional, Callable |
|
from datetime import datetime |
|
import time |
|
import threading |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, IntegrationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class TelegramBotIntegration: |
|
"""Telegram Bot integration for mobile notifications""" |
|
|
|
def __init__(self, token: Optional[str] = None, chat_id: Optional[str] = None): |
|
"""Initialize Telegram Bot integration |
|
|
|
Args: |
|
token: Telegram Bot API token (optional) |
|
chat_id: Default chat ID to send messages to (optional) |
|
""" |
|
self.base_url = "https://api.telegram.org/bot" |
|
self.token = token |
|
self.chat_id = chat_id |
|
self.polling_thread = None |
|
self.polling_active = False |
|
self.update_handlers = [] |
|
self.last_update_id = 0 |
|
|
|
@handle_exceptions |
|
def set_token(self, token: str) -> None: |
|
"""Set Telegram Bot API token |
|
|
|
Args: |
|
token: Telegram Bot API token |
|
""" |
|
self.token = token |
|
|
|
@handle_exceptions |
|
def set_chat_id(self, chat_id: str) -> None: |
|
"""Set default chat ID |
|
|
|
Args: |
|
chat_id: Default chat ID to send messages to |
|
""" |
|
self.chat_id = chat_id |
|
|
|
@handle_exceptions |
|
def test_connection(self) -> bool: |
|
"""Test Telegram Bot API connection |
|
|
|
Returns: |
|
True if connection is successful, False otherwise |
|
""" |
|
if not self.token: |
|
logger.error("Telegram Bot token not set") |
|
return False |
|
|
|
try: |
|
response = requests.get(f"{self.base_url}{self.token}/getMe") |
|
return response.status_code == 200 |
|
except Exception as e: |
|
logger.error(f"Telegram Bot connection test failed: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def send_message(self, text: str, chat_id: Optional[str] = None, |
|
parse_mode: str = "HTML") -> Dict[str, Any]: |
|
"""Send a message to a chat |
|
|
|
Args: |
|
text: Message text |
|
chat_id: Chat ID to send message to (optional, uses default if not provided) |
|
parse_mode: Message parse mode (HTML, Markdown, MarkdownV2) |
|
|
|
Returns: |
|
API response data |
|
""" |
|
if not self.token: |
|
raise IntegrationError("Telegram Bot token not set") |
|
|
|
if not chat_id and not self.chat_id: |
|
raise IntegrationError("Chat ID not provided and default not set") |
|
|
|
target_chat_id = chat_id or self.chat_id |
|
|
|
data = { |
|
"chat_id": target_chat_id, |
|
"text": text, |
|
"parse_mode": parse_mode |
|
} |
|
|
|
response = requests.post( |
|
f"{self.base_url}{self.token}/sendMessage", |
|
json=data |
|
) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to send message: {response.text}") |
|
|
|
return response.json().get("result", {}) |
|
|
|
@handle_exceptions |
|
def send_notification(self, title: str, message: str, priority: str = "normal", |
|
chat_id: Optional[str] = None) -> Dict[str, Any]: |
|
"""Send a formatted notification message |
|
|
|
Args: |
|
title: Notification title |
|
message: Notification message |
|
priority: Notification priority (low, normal, high) |
|
chat_id: Chat ID to send notification to (optional) |
|
|
|
Returns: |
|
API response data |
|
""" |
|
|
|
priority_emoji = { |
|
"low": "βΉοΈ", |
|
"normal": "π", |
|
"high": "π¨" |
|
}.get(priority.lower(), "π") |
|
|
|
|
|
formatted_message = f"{priority_emoji} <b>{title}</b>\n\n{message}\n\n<i>Sent from MONA at {datetime.now().strftime('%Y-%m-%d %H:%M')}</i>" |
|
|
|
return self.send_message(formatted_message, chat_id) |
|
|
|
@handle_exceptions |
|
def send_task_notification(self, task: Dict[str, Any], chat_id: Optional[str] = None) -> Dict[str, Any]: |
|
"""Send a notification about a task |
|
|
|
Args: |
|
task: Task data |
|
chat_id: Chat ID to send notification to (optional) |
|
|
|
Returns: |
|
API response data |
|
""" |
|
title = f"Task: {task.get('title', 'Untitled')}" |
|
|
|
|
|
message_parts = [] |
|
|
|
if task.get("description"): |
|
message_parts.append(task["description"]) |
|
|
|
if task.get("status"): |
|
status_emoji = { |
|
"todo": "β³", |
|
"in_progress": "π", |
|
"done": "β
" |
|
}.get(task["status"].lower(), "β³") |
|
message_parts.append(f"Status: {status_emoji} {task['status'].title()}") |
|
|
|
if task.get("priority"): |
|
priority_emoji = { |
|
"low": "π’", |
|
"medium": "π‘", |
|
"high": "π΄" |
|
}.get(task["priority"].lower(), "βͺ") |
|
message_parts.append(f"Priority: {priority_emoji} {task['priority'].title()}") |
|
|
|
if task.get("due_date"): |
|
try: |
|
due_date = datetime.fromisoformat(task["due_date"].replace("Z", "+00:00")) |
|
message_parts.append(f"Due: π
{due_date.strftime('%Y-%m-%d %H:%M')}") |
|
except: |
|
message_parts.append(f"Due: π
{task['due_date']}") |
|
|
|
message = "\n".join(message_parts) |
|
|
|
|
|
notification_priority = { |
|
"low": "low", |
|
"medium": "normal", |
|
"high": "high" |
|
}.get(task.get("priority", "").lower(), "normal") |
|
|
|
return self.send_notification(title, message, notification_priority, chat_id) |
|
|
|
@handle_exceptions |
|
def start_polling(self, interval: int = 5) -> None: |
|
"""Start polling for updates in a separate thread |
|
|
|
Args: |
|
interval: Polling interval in seconds |
|
""" |
|
if not self.token: |
|
raise IntegrationError("Telegram Bot token not set") |
|
|
|
if self.polling_active: |
|
logger.warning("Polling already active") |
|
return |
|
|
|
self.polling_active = True |
|
self.polling_thread = threading.Thread(target=self._polling_worker, args=(interval,)) |
|
self.polling_thread.daemon = True |
|
self.polling_thread.start() |
|
|
|
logger.info(f"Started polling for Telegram updates every {interval} seconds") |
|
|
|
@handle_exceptions |
|
def stop_polling(self) -> None: |
|
"""Stop polling for updates""" |
|
if not self.polling_active: |
|
logger.warning("Polling not active") |
|
return |
|
|
|
self.polling_active = False |
|
if self.polling_thread: |
|
self.polling_thread.join(timeout=1.0) |
|
self.polling_thread = None |
|
|
|
logger.info("Stopped polling for Telegram updates") |
|
|
|
@handle_exceptions |
|
def add_update_handler(self, handler: Callable[[Dict[str, Any]], None]) -> None: |
|
"""Add a handler for updates |
|
|
|
Args: |
|
handler: Function to handle updates |
|
""" |
|
self.update_handlers.append(handler) |
|
|
|
@handle_exceptions |
|
def get_updates(self, offset: int = 0, limit: int = 100, timeout: int = 0) -> List[Dict[str, Any]]: |
|
"""Get updates from Telegram Bot API |
|
|
|
Args: |
|
offset: Update offset |
|
limit: Maximum number of updates to retrieve |
|
timeout: Long polling timeout |
|
|
|
Returns: |
|
List of updates |
|
""" |
|
if not self.token: |
|
raise IntegrationError("Telegram Bot token not set") |
|
|
|
params = { |
|
"offset": offset, |
|
"limit": limit, |
|
"timeout": timeout |
|
} |
|
|
|
response = requests.get( |
|
f"{self.base_url}{self.token}/getUpdates", |
|
params=params |
|
) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get updates: {response.text}") |
|
|
|
return response.json().get("result", []) |
|
|
|
def _polling_worker(self, interval: int) -> None: |
|
"""Worker function for polling thread |
|
|
|
Args: |
|
interval: Polling interval in seconds |
|
""" |
|
while self.polling_active: |
|
try: |
|
updates = self.get_updates(offset=self.last_update_id + 1) |
|
|
|
for update in updates: |
|
|
|
update_id = update.get("update_id", 0) |
|
if update_id > self.last_update_id: |
|
self.last_update_id = update_id |
|
|
|
|
|
for handler in self.update_handlers: |
|
try: |
|
handler(update) |
|
except Exception as e: |
|
logger.error(f"Error in update handler: {str(e)}") |
|
|
|
time.sleep(interval) |
|
except Exception as e: |
|
logger.error(f"Error in polling worker: {str(e)}") |
|
time.sleep(interval) |
|
|
|
@handle_exceptions |
|
def process_command(self, update: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
"""Process a command from an update |
|
|
|
Args: |
|
update: Update data |
|
|
|
Returns: |
|
Response data if a command was processed, None otherwise |
|
""" |
|
message = update.get("message", {}) |
|
text = message.get("text", "") |
|
chat_id = message.get("chat", {}).get("id") |
|
|
|
if not text or not text.startswith("/") or not chat_id: |
|
return None |
|
|
|
|
|
parts = text.split() |
|
command = parts[0].lower() |
|
args = parts[1:] |
|
|
|
|
|
if command == "/start": |
|
return self.send_message( |
|
"π Welcome to MONA Bot! I'll send you notifications about your tasks and other important updates.", |
|
chat_id |
|
) |
|
|
|
elif command == "/help": |
|
help_text = """<b>Available commands:</b> |
|
/start - Start the bot |
|
/help - Show this help message |
|
/tasks - Show recent tasks |
|
/status - Show system status |
|
/subscribe - Subscribe to notifications |
|
/unsubscribe - Unsubscribe from notifications""" |
|
return self.send_message(help_text, chat_id) |
|
|
|
elif command == "/tasks": |
|
|
|
return self.send_message("You have no pending tasks.", chat_id) |
|
|
|
elif command == "/status": |
|
status_text = "<b>System Status:</b>\nβ
All systems operational\nβ
Last update: " + datetime.now().strftime("%Y-%m-%d %H:%M") |
|
return self.send_message(status_text, chat_id) |
|
|
|
elif command == "/subscribe": |
|
|
|
self.set_chat_id(str(chat_id)) |
|
return self.send_message("β
You are now subscribed to notifications!", chat_id) |
|
|
|
elif command == "/unsubscribe": |
|
|
|
if str(chat_id) == self.chat_id: |
|
self.chat_id = None |
|
return self.send_message("β You are now unsubscribed from notifications.", chat_id) |
|
|
|
else: |
|
return self.send_message("Unknown command. Type /help for available commands.", chat_id) |