from __future__ import annotations from aiohttp import ClientSession from loguru import logger from bot.analytics.types import AbstractAnalyticsLogger, BaseEvent POSTHOG_ENDPOINT = "https://app.posthog.com" class PosthogTelegramLogger(AbstractAnalyticsLogger): def __init__(self, api_token: str, base_url: str = POSTHOG_ENDPOINT) -> None: self._api_token: str = api_token self._base_url: str = base_url self._headers = { "Authorization": "Bearer ${POSTHOG_PERSONAL_API_KEY}", "Content-Type": "application/json", "Accept": "*/*", } self._timeout = 15 async def _send_request( self, event: BaseEvent, ) -> dict: url = f"{self._base_url}/api/event/?personal_api_key={self._api_token}" params = dict(event) async with ( ClientSession() as session, session.post( url, headers=self._headers, json=params, timeout=self._timeout, ) as response, ): json_response = await response.json(content_type="application/json") logger.info("Send record to Posthog") logger.info(f"{json_response=}") return self._validate_response(json_response) @staticmethod def _validate_response(response: dict) -> dict: """Validate response.""" if not response.get("ok"): name = response["error"]["name"] code = response["error"]["code"] logger.error(f"get error from cryptopay api | name: {name} | code: {code}") msg = f"Error in CryptoPay API call | name: {name} | code: {code}" raise ValueError(msg) logger.info(f"got response | ok: {response['ok']} | result: {response['result']}") return response async def log_event( self, event: BaseEvent, ) -> None: """Use this method to sends event to Posthog.""" await self._send_request(event)