tgs / bot /services /analytics.py
AZILS's picture
Upload 119 files
054900e verified
from __future__ import annotations
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, TypeVar
from aiogram.types import CallbackQuery, Message
from bot.analytics.amplitude import AmplitudeTelegramLogger
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent, EventProperties, EventType, UserProperties
from bot.core.config import settings
from bot.utils.singleton import SingletonMeta
if TYPE_CHECKING:
from collections.abc import Awaitable
_Func = TypeVar("_Func")
class AnalyticsService(metaclass=SingletonMeta):
def __init__(self, logger: AbstractAnalyticsLogger | None) -> None:
self.logger = logger
async def _track_error(self, user_id: int, error_text: str) -> None:
if not self.logger:
return
await self.logger.log_event(
BaseEvent(
user_id=user_id,
event_type="Error",
event_properties=EventProperties(text=error_text),
),
)
def track_event(
self,
event_name: EventType,
) -> Callable[[Callable[..., Awaitable[_Func]]], Callable[..., Awaitable[_Func]]]:
"""Decorator for tracking events in Amplitude, Google Analytics or Posthog."""
def decorator(
handler: Callable[[Message | CallbackQuery, dict[str, Any]], Awaitable[_Func]],
) -> Callable[..., Awaitable[_Func]]:
@wraps(handler)
async def wrapper(update: Message | CallbackQuery, *args: Any) -> Any:
if not self.logger:
return await handler(update, *args)
if (isinstance(update, (Message, CallbackQuery))) and update.from_user:
user_id = update.from_user.id
first_name = update.from_user.first_name
last_name = update.from_user.last_name
username = update.from_user.username
url = update.from_user.url
language = update.from_user.language_code
else:
return None
chat_id: int | None
chat_type: str | None
if isinstance(update, Message):
chat_id = update.chat.id
chat_type = update.chat.type
text = update.text
command = update.text if update.text and update.text.startswith("/") else None
elif isinstance(update, CallbackQuery):
chat_id = update.message.chat.id if update.message else None
chat_type = update.message.chat.type if update.message else None
text = update.data
command = None
await self.logger.log_event(
BaseEvent(
user_id=user_id,
event_type=event_name,
user_properties=UserProperties(
first_name=first_name,
last_name=last_name,
username=username,
url=url,
),
event_properties=EventProperties(
chat_id=chat_id,
chat_type=chat_type,
text=text,
command=command,
),
language=language,
),
)
try:
result = await handler(update, *args)
except Exception as e:
await self._track_error(user_id, str(e))
raise
return result
return wrapper
return decorator
logger = AmplitudeTelegramLogger(api_token=settings.AMPLITUDE_API_KEY) if settings.AMPLITUDE_API_KEY else None
analytics = AnalyticsService(logger)