from __future__ import annotations import orjson from aiohttp import ClientSession, ClientTimeout from loguru import logger from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent AMPLITUDE_ENDPOINT = "https://api2.amplitude.com/2/httpapi" class AmplitudeTelegramLogger(AbstractAnalyticsLogger): def __init__(self, api_token: str, base_url: str = AMPLITUDE_ENDPOINT) -> None: self._api_token: str = api_token self._base_url: str = base_url self._headers = {"Content-Type": "application/json", "Accept": "*/*"} self._timeout = ClientTimeout(total=15) self.SUCCESS_STATUS_CODE = 200 async def _send_request( self, event: BaseEvent, ) -> None: """Implementation of interaction with Amplitude API.""" data = {"api_key": self._api_token, "events": [event.to_dict()]} async with ( ClientSession() as session, session.post( self._base_url, headers=self._headers, data=orjson.dumps(data), timeout=self._timeout, ) as response, ): json_response = await response.json(content_type="application/json") self._validate_response(json_response) def _validate_response(self, response: dict[str, str | int]) -> None: """Validate response.""" if response.get("code") != self.SUCCESS_STATUS_CODE: error = response.get("error") code = response.get("code") logger.error(f"get error from amplitude api | error: {error} | code: {code}") msg = f"Error in amplitude api call | error: {error} | code: {code}" raise ValueError(msg) logger.info(f"successfully send to Amplitude | server_upload_time: {response['server_upload_time']}") async def log_event( self, event: BaseEvent, ) -> None: """Use this method to sends event to Amplitude.""" await self._send_request(event)