from __future__ import annotations from functools import wraps from typing import TYPE_CHECKING, Any, Callable, TypeVar from aiogram.types import CallbackQuery, Message from bot.analytics.amplitude import AmplitudeTelegramLogger from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent, EventProperties, EventType, UserProperties from bot.core.config import settings from bot.utils.singleton import SingletonMeta if TYPE_CHECKING: from collections.abc import Awaitable _Func = TypeVar("_Func") class AnalyticsService(metaclass=SingletonMeta): def __init__(self, logger: AbstractAnalyticsLogger | None) -> None: self.logger = logger async def _track_error(self, user_id: int, error_text: str) -> None: if not self.logger: return await self.logger.log_event( BaseEvent( user_id=user_id, event_type="Error", event_properties=EventProperties(text=error_text), ), ) def track_event( self, event_name: EventType, ) -> Callable[[Callable[..., Awaitable[_Func]]], Callable[..., Awaitable[_Func]]]: """Decorator for tracking events in Amplitude, Google Analytics or Posthog.""" def decorator( handler: Callable[[Message | CallbackQuery, dict[str, Any]], Awaitable[_Func]], ) -> Callable[..., Awaitable[_Func]]: @wraps(handler) async def wrapper(update: Message | CallbackQuery, *args: Any) -> Any: if not self.logger: return await handler(update, *args) if (isinstance(update, (Message, CallbackQuery))) and update.from_user: user_id = update.from_user.id first_name = update.from_user.first_name last_name = update.from_user.last_name username = update.from_user.username url = update.from_user.url language = update.from_user.language_code else: return None chat_id: int | None chat_type: str | None if isinstance(update, Message): chat_id = update.chat.id chat_type = update.chat.type text = update.text command = update.text if update.text and update.text.startswith("/") else None elif isinstance(update, CallbackQuery): chat_id = update.message.chat.id if update.message else None chat_type = update.message.chat.type if update.message else None text = update.data command = None await self.logger.log_event( BaseEvent( user_id=user_id, event_type=event_name, user_properties=UserProperties( first_name=first_name, last_name=last_name, username=username, url=url, ), event_properties=EventProperties( chat_id=chat_id, chat_type=chat_type, text=text, command=command, ), language=language, ), ) try: result = await handler(update, *args) except Exception as e: await self._track_error(user_id, str(e)) raise return result return wrapper return decorator logger = AmplitudeTelegramLogger(api_token=settings.AMPLITUDE_API_KEY) if settings.AMPLITUDE_API_KEY else None analytics = AnalyticsService(logger)