File size: 4,102 Bytes
6842903
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import gradio as gr
from datetime import datetime, timedelta
from collections import defaultdict
import threading

rate_limit_data = defaultdict(list)
lock = threading.Lock()

UNAUTH_RATE_LIMIT = 3
AUTH_RATE_LIMIT = 30
RATE_LIMIT_WINDOW = 60

def clean_old_entries(user_id):
    """Remove entries older than the rate limit window"""
    current_time = datetime.now()
    cutoff_time = current_time - timedelta(seconds=RATE_LIMIT_WINDOW)
    rate_limit_data[user_id] = [
        timestamp for timestamp in rate_limit_data[user_id]
        if timestamp > cutoff_time
    ]

def get_user_identifier(profile: gr.OAuthProfile | None, request: gr.Request) -> tuple[str, bool]:
    """Get user identifier and whether they're authenticated"""
    if profile is not None:
        return profile.username, True
    else:
        if request:
            return f"ip_{request.client.host}", False
        return "ip_unknown", False

def check_rate_limit(user_id: str, is_authenticated: bool) -> tuple[bool, int, int]:
    """
    Check if user has exceeded rate limit
    Returns: (can_proceed, clicks_used, max_clicks)
    """
    with lock:
        clean_old_entries(user_id)
        
        max_clicks = AUTH_RATE_LIMIT if is_authenticated else UNAUTH_RATE_LIMIT
        clicks_used = len(rate_limit_data[user_id])
        
        can_proceed = clicks_used < max_clicks
        
        return can_proceed, clicks_used, max_clicks

def add_click(user_id: str):
    """Add a click timestamp for the user"""
    with lock:
        rate_limit_data[user_id].append(datetime.now())

def update_status(profile: gr.OAuthProfile | None, request: gr.Request) -> str:
    """Update the status message showing current rate limit info"""
    user_id, is_authenticated = get_user_identifier(profile, request)
    _, clicks_used, max_clicks = check_rate_limit(user_id, is_authenticated)
    
    if is_authenticated:
        return f"βœ… You are logged in as '{profile.username}'. You have clicked {clicks_used} times this minute. You have {max_clicks} total clicks per minute."  # type: ignore
    else:
        return f"⚠️ You are not logged in. You have clicked {clicks_used} times this minute. You have {max_clicks} total clicks per minute."

def run_action(profile: gr.OAuthProfile | None, request: gr.Request) -> tuple[str, str]:
    """Handle the run button click with rate limiting"""
    user_id, is_authenticated = get_user_identifier(profile, request)
    can_proceed, clicks_used, max_clicks = check_rate_limit(user_id, is_authenticated)
    
    if not can_proceed:
        result = f"❌ Rate limit exceeded! You've used all {max_clicks} clicks for this minute. Please wait before trying again."
        status = update_status(profile, request)
        return result, status
    
    add_click(user_id)
    
    _, new_clicks_used, _ = check_rate_limit(user_id, is_authenticated)
    
    result = f"βœ… Action executed successfully! (Click #{new_clicks_used})"
    status = update_status(profile, request)
    
    return result, status

with gr.Blocks(title="Rate Limiting Demo") as demo:
    gr.Markdown("# Rate Limiting Demo App")
    gr.Markdown("This app demonstrates rate limiting based on authentication status.")
    
    gr.LoginButton()
    
    status_text = gr.Markdown("Loading status...")
    
    with gr.Row():
        run_btn = gr.Button("πŸš€ Run Action", variant="primary", scale=1)
    
    result_text = gr.Markdown("")
    
    demo.load(update_status, inputs=None, outputs=status_text)
    
    run_btn.click(
        run_action,
        inputs=None,
        outputs=[result_text, status_text]
    )
    
    gr.Markdown("---")
    gr.Markdown("""
    ### Rate Limits:
    - **Not logged in:** 3 clicks per minute (based on IP address)
    - **Logged in:** 30 clicks per minute (based on HF username)
    
    ### How it works:
    - Click the **Login** button to authenticate with Hugging Face
    - Click the **Run Action** button to test the rate limiting
    - The system tracks your clicks over a rolling 1-minute window
    """)

if __name__ == "__main__":
    demo.launch()