Spaces:
Running
Running
File size: 4,362 Bytes
5b27bce |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import hashlib
import secrets
from typing import Dict, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class APIKeyManager:
"""Manages API key authentication and rate limiting"""
def __init__(self):
self.api_keys = {
os.getenv("API_KEY_1", "your-secure-api-key-1"): {
"user": "user1",
"created": datetime.now(),
"last_used": None,
"request_count": 0
},
os.getenv("API_KEY_2", "your-secure-api-key-2"): {
"user": "user2",
"created": datetime.now(),
"last_used": None,
"request_count": 0
}
}
self.rate_limits = {} # {api_key: {minute: count}}
self.max_requests_per_minute = int(os.getenv("RATE_LIMIT", "10"))
def validate_api_key(self, api_key: str) -> Optional[str]:
"""Validate API key and return user info"""
if api_key in self.api_keys:
self.api_keys[api_key]["last_used"] = datetime.now()
self.api_keys[api_key]["request_count"] += 1
return self.api_keys[api_key]["user"]
return None
def check_rate_limit(self, api_key: str) -> bool:
"""Check if API key has exceeded rate limit"""
current_minute = datetime.now().strftime("%Y-%m-%d-%H-%M")
if api_key not in self.rate_limits:
self.rate_limits[api_key] = {}
# Clean old entries (keep only last 5 minutes)
cutoff_time = datetime.now() - timedelta(minutes=5)
keys_to_remove = []
for minute_key in self.rate_limits[api_key]:
try:
minute_time = datetime.strptime(minute_key, "%Y-%m-%d-%H-%M")
if minute_time < cutoff_time:
keys_to_remove.append(minute_key)
except ValueError:
keys_to_remove.append(minute_key)
for key in keys_to_remove:
del self.rate_limits[api_key][key]
# Check current minute
current_count = self.rate_limits[api_key].get(current_minute, 0)
if current_count >= self.max_requests_per_minute:
return False
# Increment counter
self.rate_limits[api_key][current_minute] = current_count + 1
return True
def get_api_key_stats(self, api_key: str) -> Optional[Dict]:
"""Get statistics for an API key"""
if api_key in self.api_keys:
stats = self.api_keys[api_key].copy()
current_minute = datetime.now().strftime("%Y-%m-%d-%H-%M")
stats["current_minute_requests"] = self.rate_limits.get(api_key, {}).get(current_minute, 0)
stats["rate_limit"] = self.max_requests_per_minute
return stats
return None
def generate_new_api_key(self, user: str) -> str:
"""Generate a new secure API key"""
api_key = secrets.token_urlsafe(32)
self.api_keys[api_key] = {
"user": user,
"created": datetime.now(),
"last_used": None,
"request_count": 0
}
return api_key
def revoke_api_key(self, api_key: str) -> bool:
"""Revoke an API key"""
if api_key in self.api_keys:
del self.api_keys[api_key]
if api_key in self.rate_limits:
del self.rate_limits[api_key]
return True
return False
def list_api_keys(self) -> Dict:
"""List all API keys with their stats (without revealing the keys)"""
result = {}
for api_key, info in self.api_keys.items():
# Hash the API key for identification without revealing it
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:8]
result[key_hash] = {
"user": info["user"],
"created": info["created"].isoformat(),
"last_used": info["last_used"].isoformat() if info["last_used"] else None,
"request_count": info["request_count"]
}
return result
# Global instance
api_key_manager = APIKeyManager()
def get_api_key_manager() -> APIKeyManager:
"""Get the global API key manager instance"""
return api_key_manager
|