File size: 2,092 Bytes
054900e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# ruff: noqa: N815, TC003
from __future__ import annotations
from abc import ABC, abstractmethod
from decimal import Decimal
from typing import Any, Literal

from pydantic import BaseModel, IPvAnyAddress

EventType = Literal[
    "Start Session",
    "End Session",
    "Revenue",
    "Revenue (Verified)",
    "Revenue (Unverified)",
    "Sign Up",
    "Select Item",
    "View Item",
    "Complete Purchase",
    "Error",
]
PaymentMethod = Literal["Stripe", "PayPal", "Square", "Crypto"]


class UserProperties(BaseModel):
    first_name: str | None = None
    last_name: str | None = None
    username: str | None = None
    is_premium: str | None = None
    url: str | None = None


class EventProperties(BaseModel):
    chat_id: int | None = None
    chat_type: str | None = None
    text: str | None = None
    command: str | None = None
    payment_method: PaymentMethod | None = None


class Plan(BaseModel):
    branch: str | None = None
    source: str | None = None
    version: str | None = None


class BaseEvent(BaseModel):
    user_id: int
    event_type: EventType
    time: int | None = None  # int(datetime.now().timestamp() * 1000)
    user_properties: UserProperties | None = None
    event_properties: EventProperties | None = None
    app_version: str | None = None
    platform: str | None = None
    carrier: str | None = None
    country: str | None = None
    region: str | None = None
    city: str | None = None
    dma: str | None = None
    language: str | None = None
    price: float | Decimal | None = None
    quantity: int | None = None
    revenue: float | Decimal | None = None
    productId: str | None = None
    revenueType: str | None = None
    location_lat: float | None = None
    location_lng: float | None = None
    ip: IPvAnyAddress | None = None
    plan: Plan | None = None

    def to_dict(self) -> dict[str, Any]:
        return {key: value for key, value in self.model_dump(exclude_none=True).items() if value}


class AbstractAnalyticsLogger(ABC):
    @abstractmethod
    async def log_event(self, event: BaseEvent) -> None:
        pass