File size: 2,834 Bytes
293ab16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from datetime import datetime
from collections import defaultdict, Counter
from typing import List, Dict, Any, Optional
import json
import os

# === Optional file persistence ===
ANALYTICS_FILE = "analytics_data.json"

# === In-memory analytics store ===
analytics_store: Dict[str, List[Dict[str, Any]]] = defaultdict(list)

# === Load existing analytics if present ===
def load_analytics():
    if os.path.exists(ANALYTICS_FILE):
        with open(ANALYTICS_FILE, "r") as f:
            data = json.load(f)
            for user, events in data.items():
                analytics_store[user] = events

# === Save current analytics to file ===
def save_analytics():
    with open(ANALYTICS_FILE, "w") as f:
        json.dump(analytics_store, f, indent=2)

# === Main tracking function ===
def track_event(user: str, action: str, ip: Optional[str] = None, user_agent: Optional[str] = None):
    event = {
        "action": action,
        "timestamp": datetime.utcnow().isoformat(),
        "ip": ip,
        "user_agent": user_agent
    }
    analytics_store[user].append(event)
    save_analytics()

# === Get raw event list for a user ===
def get_user_analytics(user: str) -> List[Dict[str, Any]]:
    return analytics_store.get(user, [])

# === Summary for one user ===
def get_user_summary(user: str) -> Dict[str, Any]:
    events = analytics_store.get(user, [])
    if not events:
        return {"user": user, "total": 0, "last_seen": None, "actions": {}}

    action_counts = Counter(event["action"] for event in events)
    last_seen = max(event["timestamp"] for event in events)
    return {
        "user": user,
        "total_events": len(events),
        "last_seen": last_seen,
        "actions": dict(action_counts)
    }

# === Summary for all users ===
def get_all_users_summary() -> List[Dict[str, Any]]:
    return [get_user_summary(user) for user in analytics_store]

# === Top active users ===
def get_most_active_users(top_n: int = 10) -> List[Dict[str, Any]]:
    summaries = get_all_users_summary()
    sorted_users = sorted(summaries, key=lambda x: x["total_events"], reverse=True)
    return sorted_users[:top_n]

# === Daily activity trends ===
def get_daily_activity(user: Optional[str] = None) -> Dict[str, int]:
    target_users = [user] if user else list(analytics_store.keys())
    day_counts = Counter()
    for u in target_users:
        for event in analytics_store.get(u, []):
            day = event["timestamp"].split("T")[0]
            day_counts[day] += 1
    return dict(day_counts)

# === Clear individual or all user analytics ===
def clear_user_analytics(user: str):
    if user in analytics_store:
        del analytics_store[user]
        save_analytics()

def clear_all_analytics():
    analytics_store.clear()
    save_analytics()

# === Load data at module import ===
load_analytics()