File size: 2,017 Bytes
054900e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from __future__ import annotations

import orjson
from aiohttp import ClientSession, ClientTimeout
from loguru import logger

from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent

AMPLITUDE_ENDPOINT = "https://api2.amplitude.com/2/httpapi"


class AmplitudeTelegramLogger(AbstractAnalyticsLogger):
    def __init__(self, api_token: str, base_url: str = AMPLITUDE_ENDPOINT) -> None:
        self._api_token: str = api_token
        self._base_url: str = base_url
        self._headers = {"Content-Type": "application/json", "Accept": "*/*"}
        self._timeout = ClientTimeout(total=15)
        self.SUCCESS_STATUS_CODE = 200

    async def _send_request(
        self,
        event: BaseEvent,
    ) -> None:
        """Implementation of interaction with Amplitude API."""
        data = {"api_key": self._api_token, "events": [event.to_dict()]}

        async with (
            ClientSession() as session,
            session.post(
                self._base_url,
                headers=self._headers,
                data=orjson.dumps(data),
                timeout=self._timeout,
            ) as response,
        ):
            json_response = await response.json(content_type="application/json")

        self._validate_response(json_response)

    def _validate_response(self, response: dict[str, str | int]) -> None:
        """Validate response."""
        if response.get("code") != self.SUCCESS_STATUS_CODE:
            error = response.get("error")
            code = response.get("code")

            logger.error(f"get error from amplitude api | error: {error} | code: {code}")
            msg = f"Error in amplitude api call | error: {error} | code: {code}"
            raise ValueError(msg)

        logger.info(f"successfully send to Amplitude | server_upload_time: {response['server_upload_time']}")

    async def log_event(
        self,
        event: BaseEvent,
    ) -> None:
        """Use this method to sends event to Amplitude."""
        await self._send_request(event)