Yadav122 commited on
Commit
5b27bce
·
verified ·
1 Parent(s): 6f25760

Upload auth.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. auth.py +118 -0
auth.py ADDED
@@ -0,0 +1,118 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import hashlib
3
+ import secrets
4
+ from typing import Dict, Optional
5
+ from datetime import datetime, timedelta
6
+ import logging
7
+
8
+ logger = logging.getLogger(__name__)
9
+
10
+ class APIKeyManager:
11
+ """Manages API key authentication and rate limiting"""
12
+
13
+ def __init__(self):
14
+ self.api_keys = {
15
+ os.getenv("API_KEY_1", "your-secure-api-key-1"): {
16
+ "user": "user1",
17
+ "created": datetime.now(),
18
+ "last_used": None,
19
+ "request_count": 0
20
+ },
21
+ os.getenv("API_KEY_2", "your-secure-api-key-2"): {
22
+ "user": "user2",
23
+ "created": datetime.now(),
24
+ "last_used": None,
25
+ "request_count": 0
26
+ }
27
+ }
28
+ self.rate_limits = {} # {api_key: {minute: count}}
29
+ self.max_requests_per_minute = int(os.getenv("RATE_LIMIT", "10"))
30
+
31
+ def validate_api_key(self, api_key: str) -> Optional[str]:
32
+ """Validate API key and return user info"""
33
+ if api_key in self.api_keys:
34
+ self.api_keys[api_key]["last_used"] = datetime.now()
35
+ self.api_keys[api_key]["request_count"] += 1
36
+ return self.api_keys[api_key]["user"]
37
+ return None
38
+
39
+ def check_rate_limit(self, api_key: str) -> bool:
40
+ """Check if API key has exceeded rate limit"""
41
+ current_minute = datetime.now().strftime("%Y-%m-%d-%H-%M")
42
+
43
+ if api_key not in self.rate_limits:
44
+ self.rate_limits[api_key] = {}
45
+
46
+ # Clean old entries (keep only last 5 minutes)
47
+ cutoff_time = datetime.now() - timedelta(minutes=5)
48
+ keys_to_remove = []
49
+ for minute_key in self.rate_limits[api_key]:
50
+ try:
51
+ minute_time = datetime.strptime(minute_key, "%Y-%m-%d-%H-%M")
52
+ if minute_time < cutoff_time:
53
+ keys_to_remove.append(minute_key)
54
+ except ValueError:
55
+ keys_to_remove.append(minute_key)
56
+
57
+ for key in keys_to_remove:
58
+ del self.rate_limits[api_key][key]
59
+
60
+ # Check current minute
61
+ current_count = self.rate_limits[api_key].get(current_minute, 0)
62
+ if current_count >= self.max_requests_per_minute:
63
+ return False
64
+
65
+ # Increment counter
66
+ self.rate_limits[api_key][current_minute] = current_count + 1
67
+ return True
68
+
69
+ def get_api_key_stats(self, api_key: str) -> Optional[Dict]:
70
+ """Get statistics for an API key"""
71
+ if api_key in self.api_keys:
72
+ stats = self.api_keys[api_key].copy()
73
+ current_minute = datetime.now().strftime("%Y-%m-%d-%H-%M")
74
+ stats["current_minute_requests"] = self.rate_limits.get(api_key, {}).get(current_minute, 0)
75
+ stats["rate_limit"] = self.max_requests_per_minute
76
+ return stats
77
+ return None
78
+
79
+ def generate_new_api_key(self, user: str) -> str:
80
+ """Generate a new secure API key"""
81
+ api_key = secrets.token_urlsafe(32)
82
+ self.api_keys[api_key] = {
83
+ "user": user,
84
+ "created": datetime.now(),
85
+ "last_used": None,
86
+ "request_count": 0
87
+ }
88
+ return api_key
89
+
90
+ def revoke_api_key(self, api_key: str) -> bool:
91
+ """Revoke an API key"""
92
+ if api_key in self.api_keys:
93
+ del self.api_keys[api_key]
94
+ if api_key in self.rate_limits:
95
+ del self.rate_limits[api_key]
96
+ return True
97
+ return False
98
+
99
+ def list_api_keys(self) -> Dict:
100
+ """List all API keys with their stats (without revealing the keys)"""
101
+ result = {}
102
+ for api_key, info in self.api_keys.items():
103
+ # Hash the API key for identification without revealing it
104
+ key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:8]
105
+ result[key_hash] = {
106
+ "user": info["user"],
107
+ "created": info["created"].isoformat(),
108
+ "last_used": info["last_used"].isoformat() if info["last_used"] else None,
109
+ "request_count": info["request_count"]
110
+ }
111
+ return result
112
+
113
+ # Global instance
114
+ api_key_manager = APIKeyManager()
115
+
116
+ def get_api_key_manager() -> APIKeyManager:
117
+ """Get the global API key manager instance"""
118
+ return api_key_manager