File size: 3,962 Bytes
054900e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import annotations
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, TypeVar

from aiogram.types import CallbackQuery, Message

from bot.analytics.amplitude import AmplitudeTelegramLogger
from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent, EventProperties, EventType, UserProperties
from bot.core.config import settings
from bot.utils.singleton import SingletonMeta

if TYPE_CHECKING:
    from collections.abc import Awaitable

_Func = TypeVar("_Func")


class AnalyticsService(metaclass=SingletonMeta):
    def __init__(self, logger: AbstractAnalyticsLogger | None) -> None:
        self.logger = logger

    async def _track_error(self, user_id: int, error_text: str) -> None:
        if not self.logger:
            return

        await self.logger.log_event(
            BaseEvent(
                user_id=user_id,
                event_type="Error",
                event_properties=EventProperties(text=error_text),
            ),
        )

    def track_event(
        self,
        event_name: EventType,
    ) -> Callable[[Callable[..., Awaitable[_Func]]], Callable[..., Awaitable[_Func]]]:
        """Decorator for tracking events in Amplitude, Google Analytics or Posthog."""

        def decorator(
            handler: Callable[[Message | CallbackQuery, dict[str, Any]], Awaitable[_Func]],
        ) -> Callable[..., Awaitable[_Func]]:
            @wraps(handler)
            async def wrapper(update: Message | CallbackQuery, *args: Any) -> Any:
                if not self.logger:
                    return await handler(update, *args)

                if (isinstance(update, (Message, CallbackQuery))) and update.from_user:
                    user_id = update.from_user.id
                    first_name = update.from_user.first_name
                    last_name = update.from_user.last_name
                    username = update.from_user.username
                    url = update.from_user.url
                    language = update.from_user.language_code
                else:
                    return None

                chat_id: int | None
                chat_type: str | None
                if isinstance(update, Message):
                    chat_id = update.chat.id
                    chat_type = update.chat.type
                    text = update.text
                    command = update.text if update.text and update.text.startswith("/") else None
                elif isinstance(update, CallbackQuery):
                    chat_id = update.message.chat.id if update.message else None
                    chat_type = update.message.chat.type if update.message else None
                    text = update.data
                    command = None

                await self.logger.log_event(
                    BaseEvent(
                        user_id=user_id,
                        event_type=event_name,
                        user_properties=UserProperties(
                            first_name=first_name,
                            last_name=last_name,
                            username=username,
                            url=url,
                        ),
                        event_properties=EventProperties(
                            chat_id=chat_id,
                            chat_type=chat_type,
                            text=text,
                            command=command,
                        ),
                        language=language,
                    ),
                )
                try:
                    result = await handler(update, *args)
                except Exception as e:
                    await self._track_error(user_id, str(e))
                    raise
                return result

            return wrapper

        return decorator


logger = AmplitudeTelegramLogger(api_token=settings.AMPLITUDE_API_KEY) if settings.AMPLITUDE_API_KEY else None

analytics = AnalyticsService(logger)