File size: 5,270 Bytes
6842903
1
{"cells": [{"cell_type": "markdown", "id": "302934307671667531413257853548643485645", "metadata": {}, "source": ["# Gradio Demo: rate_limit"]}, {"cell_type": "code", "execution_count": null, "id": "272996653310673477252411125948039410165", "metadata": {}, "outputs": [], "source": ["!pip install -q gradio "]}, {"cell_type": "code", "execution_count": null, "id": "288918539441861185822528903084949547379", "metadata": {}, "outputs": [], "source": ["import gradio as gr\n", "from datetime import datetime, timedelta\n", "from collections import defaultdict\n", "import threading\n", "\n", "rate_limit_data = defaultdict(list)\n", "lock = threading.Lock()\n", "\n", "UNAUTH_RATE_LIMIT = 3\n", "AUTH_RATE_LIMIT = 30\n", "RATE_LIMIT_WINDOW = 60\n", "\n", "def clean_old_entries(user_id):\n", "    \"\"\"Remove entries older than the rate limit window\"\"\"\n", "    current_time = datetime.now()\n", "    cutoff_time = current_time - timedelta(seconds=RATE_LIMIT_WINDOW)\n", "    rate_limit_data[user_id] = [\n", "        timestamp for timestamp in rate_limit_data[user_id]\n", "        if timestamp > cutoff_time\n", "    ]\n", "\n", "def get_user_identifier(profile: gr.OAuthProfile | None, request: gr.Request) -> tuple[str, bool]:\n", "    \"\"\"Get user identifier and whether they're authenticated\"\"\"\n", "    if profile is not None:\n", "        return profile.username, True\n", "    else:\n", "        if request:\n", "            return f\"ip_{request.client.host}\", False\n", "        return \"ip_unknown\", False\n", "\n", "def check_rate_limit(user_id: str, is_authenticated: bool) -> tuple[bool, int, int]:\n", "    \"\"\"\n", "    Check if user has exceeded rate limit\n", "    Returns: (can_proceed, clicks_used, max_clicks)\n", "    \"\"\"\n", "    with lock:\n", "        clean_old_entries(user_id)\n", "        \n", "        max_clicks = AUTH_RATE_LIMIT if is_authenticated else UNAUTH_RATE_LIMIT\n", "        clicks_used = len(rate_limit_data[user_id])\n", "        \n", "        can_proceed = clicks_used < max_clicks\n", "        \n", "        return can_proceed, clicks_used, max_clicks\n", "\n", "def add_click(user_id: str):\n", "    \"\"\"Add a click timestamp for the user\"\"\"\n", "    with lock:\n", "        rate_limit_data[user_id].append(datetime.now())\n", "\n", "def update_status(profile: gr.OAuthProfile | None, request: gr.Request) -> str:\n", "    \"\"\"Update the status message showing current rate limit info\"\"\"\n", "    user_id, is_authenticated = get_user_identifier(profile, request)\n", "    _, clicks_used, max_clicks = check_rate_limit(user_id, is_authenticated)\n", "    \n", "    if is_authenticated:\n", "        return f\"\u2705 You are logged in as '{profile.username}'. You have clicked {clicks_used} times this minute. You have {max_clicks} total clicks per minute.\"  # type: ignore\n", "    else:\n", "        return f\"\u26a0\ufe0f You are not logged in. You have clicked {clicks_used} times this minute. You have {max_clicks} total clicks per minute.\"\n", "\n", "def run_action(profile: gr.OAuthProfile | None, request: gr.Request) -> tuple[str, str]:\n", "    \"\"\"Handle the run button click with rate limiting\"\"\"\n", "    user_id, is_authenticated = get_user_identifier(profile, request)\n", "    can_proceed, clicks_used, max_clicks = check_rate_limit(user_id, is_authenticated)\n", "    \n", "    if not can_proceed:\n", "        result = f\"\u274c Rate limit exceeded! You've used all {max_clicks} clicks for this minute. Please wait before trying again.\"\n", "        status = update_status(profile, request)\n", "        return result, status\n", "    \n", "    add_click(user_id)\n", "    \n", "    _, new_clicks_used, _ = check_rate_limit(user_id, is_authenticated)\n", "    \n", "    result = f\"\u2705 Action executed successfully! (Click #{new_clicks_used})\"\n", "    status = update_status(profile, request)\n", "    \n", "    return result, status\n", "\n", "with gr.Blocks(title=\"Rate Limiting Demo\") as demo:\n", "    gr.Markdown(\"# Rate Limiting Demo App\")\n", "    gr.Markdown(\"This app demonstrates rate limiting based on authentication status.\")\n", "    \n", "    gr.LoginButton()\n", "    \n", "    status_text = gr.Markdown(\"Loading status...\")\n", "    \n", "    with gr.Row():\n", "        run_btn = gr.Button(\"\ud83d\ude80 Run Action\", variant=\"primary\", scale=1)\n", "    \n", "    result_text = gr.Markdown(\"\")\n", "    \n", "    demo.load(update_status, inputs=None, outputs=status_text)\n", "    \n", "    run_btn.click(\n", "        run_action,\n", "        inputs=None,\n", "        outputs=[result_text, status_text]\n", "    )\n", "    \n", "    gr.Markdown(\"---\")\n", "    gr.Markdown(\"\"\"\n", "    ### Rate Limits:\n", "    - **Not logged in:** 3 clicks per minute (based on IP address)\n", "    - **Logged in:** 30 clicks per minute (based on HF username)\n", "    \n", "    ### How it works:\n", "    - Click the **Login** button to authenticate with Hugging Face\n", "    - Click the **Run Action** button to test the rate limiting\n", "    - The system tracks your clicks over a rolling 1-minute window\n", "    \"\"\")\n", "\n", "if __name__ == \"__main__\":\n", "    demo.launch()\n"]}], "metadata": {}, "nbformat": 4, "nbformat_minor": 5}