ciyidogan commited on
Commit
2c5f4bb
·
verified ·
1 Parent(s): fc5668f

Update utils/utils.py

Browse files
Files changed (1) hide show
  1. utils/utils.py +161 -161
utils/utils.py CHANGED
@@ -1,162 +1,162 @@
1
-
2
- import os
3
- from typing import Optional
4
- from fastapi import HTTPException, Depends
5
- from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
6
- from datetime import datetime, timedelta, timezone
7
- import jwt
8
- from utils.logger import log_info, log_warning
9
-
10
- security = HTTPBearer()
11
-
12
- # ===================== Rate Limiting =====================
13
- class RateLimiter:
14
- """Simple in-memory rate limiter"""
15
- def __init__(self):
16
- self.requests = {} # {key: [(timestamp, count)]}
17
- self.lock = threading.Lock()
18
-
19
- def is_allowed(self, key: str, max_requests: int, window_seconds: int) -> bool:
20
- """Check if request is allowed"""
21
- with self.lock:
22
- now = datetime.now(timezone.utc)
23
-
24
- if key not in self.requests:
25
- self.requests[key] = []
26
-
27
- # Remove old entries
28
- cutoff = now.timestamp() - window_seconds
29
- self.requests[key] = [
30
- (ts, count) for ts, count in self.requests[key]
31
- if ts > cutoff
32
- ]
33
-
34
- # Count requests in window
35
- total = sum(count for _, count in self.requests[key])
36
-
37
- if total >= max_requests:
38
- return False
39
-
40
- # Add this request
41
- self.requests[key].append((now.timestamp(), 1))
42
- return True
43
-
44
- def reset(self, key: str):
45
- """Reset rate limit for key"""
46
- with self.lock:
47
- if key in self.requests:
48
- del self.requests[key]
49
-
50
- # Create global rate limiter instance
51
- import threading
52
- rate_limiter = RateLimiter()
53
-
54
- # ===================== JWT Config =====================
55
- def get_jwt_config():
56
- """Get JWT configuration based on environment"""
57
- # Check if we're in HuggingFace Space
58
- if os.getenv("SPACE_ID"):
59
- # Cloud mode - use secrets from environment
60
- jwt_secret = os.getenv("JWT_SECRET")
61
- if not jwt_secret:
62
- log_warning("⚠️ WARNING: JWT_SECRET not found in environment, using fallback")
63
- jwt_secret = "flare-admin-secret-key-change-in-production" # Fallback
64
- else:
65
- # On-premise mode - use .env file
66
- from dotenv import load_dotenv
67
- load_dotenv()
68
- jwt_secret = os.getenv("JWT_SECRET", "flare-admin-secret-key-change-in-production")
69
-
70
- return {
71
- "secret": jwt_secret,
72
- "algorithm": os.getenv("JWT_ALGORITHM", "HS256"),
73
- "expiration_hours": int(os.getenv("JWT_EXPIRATION_HOURS", "24"))
74
- }
75
-
76
- # ===================== Auth Helpers =====================
77
- def create_token(username: str) -> str:
78
- """Create JWT token for user"""
79
- config = get_jwt_config()
80
- expiry = datetime.now(timezone.utc) + timedelta(hours=config["expiration_hours"])
81
-
82
- payload = {
83
- "sub": username,
84
- "exp": expiry,
85
- "iat": datetime.now(timezone.utc)
86
- }
87
-
88
- return jwt.encode(payload, config["secret"], algorithm=config["algorithm"])
89
-
90
- def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
91
- """Verify JWT token and return username"""
92
- token = credentials.credentials
93
- config = get_jwt_config()
94
-
95
- try:
96
- payload = jwt.decode(token, config["secret"], algorithms=[config["algorithm"]])
97
- return payload["sub"]
98
- except jwt.ExpiredSignatureError:
99
- raise HTTPException(status_code=401, detail="Token expired")
100
- except jwt.InvalidTokenError:
101
- raise HTTPException(status_code=401, detail="Invalid token")
102
-
103
- # ===================== Utility Functions =====================
104
-
105
- def truncate_string(text: str, max_length: int = 100, suffix: str = "...") -> str:
106
- """Truncate string to max length"""
107
- if len(text) <= max_length:
108
- return text
109
- return text[:max_length - len(suffix)] + suffix
110
-
111
- def format_file_size(size_bytes: int) -> str:
112
- """Format file size in human readable format"""
113
- for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
114
- if size_bytes < 1024.0:
115
- return f"{size_bytes:.2f} {unit}"
116
- size_bytes /= 1024.0
117
- return f"{size_bytes:.2f} PB"
118
-
119
- def is_safe_path(path: str, base_path: str) -> bool:
120
- """Check if path is safe (no directory traversal)"""
121
- import os
122
- # Resolve to absolute paths
123
- base = os.path.abspath(base_path)
124
- target = os.path.abspath(os.path.join(base, path))
125
-
126
- # Check if target is under base
127
- return target.startswith(base)
128
-
129
- def get_current_timestamp() -> str:
130
- """
131
- Get current UTC timestamp in ISO format with Z suffix
132
- Returns: "2025-01-10T12:00:00.123Z"
133
- """
134
- return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
135
-
136
- def normalize_timestamp(timestamp: Optional[str]) -> str:
137
- """
138
- Normalize timestamp string for consistent comparison
139
- Handles various formats:
140
- - "2025-01-10T12:00:00Z"
141
- - "2025-01-10T12:00:00.000Z"
142
- - "2025-01-10T12:00:00+00:00"
143
- - "2025-01-10 12:00:00+00:00"
144
- """
145
- if not timestamp:
146
- return ""
147
-
148
- # Normalize various formats
149
- normalized = timestamp.replace(' ', 'T') # Space to T
150
- normalized = normalized.replace('+00:00', 'Z') # UTC timezone
151
-
152
- # Remove milliseconds if present for comparison
153
- if '.' in normalized and normalized.endswith('Z'):
154
- normalized = normalized.split('.')[0] + 'Z'
155
-
156
- return normalized
157
-
158
- def timestamps_equal(ts1: Optional[str], ts2: Optional[str]) -> bool:
159
- """
160
- Compare two timestamps regardless of format differences
161
- """
162
  return normalize_timestamp(ts1) == normalize_timestamp(ts2)
 
1
+
2
+ import os
3
+ from typing import Optional
4
+ from fastapi import HTTPException, Depends
5
+ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
6
+ from datetime import datetime, timedelta, timezone
7
+ import jwt
8
+ from .logger import log_info, log_warning
9
+
10
+ security = HTTPBearer()
11
+
12
+ # ===================== Rate Limiting =====================
13
+ class RateLimiter:
14
+ """Simple in-memory rate limiter"""
15
+ def __init__(self):
16
+ self.requests = {} # {key: [(timestamp, count)]}
17
+ self.lock = threading.Lock()
18
+
19
+ def is_allowed(self, key: str, max_requests: int, window_seconds: int) -> bool:
20
+ """Check if request is allowed"""
21
+ with self.lock:
22
+ now = datetime.now(timezone.utc)
23
+
24
+ if key not in self.requests:
25
+ self.requests[key] = []
26
+
27
+ # Remove old entries
28
+ cutoff = now.timestamp() - window_seconds
29
+ self.requests[key] = [
30
+ (ts, count) for ts, count in self.requests[key]
31
+ if ts > cutoff
32
+ ]
33
+
34
+ # Count requests in window
35
+ total = sum(count for _, count in self.requests[key])
36
+
37
+ if total >= max_requests:
38
+ return False
39
+
40
+ # Add this request
41
+ self.requests[key].append((now.timestamp(), 1))
42
+ return True
43
+
44
+ def reset(self, key: str):
45
+ """Reset rate limit for key"""
46
+ with self.lock:
47
+ if key in self.requests:
48
+ del self.requests[key]
49
+
50
+ # Create global rate limiter instance
51
+ import threading
52
+ rate_limiter = RateLimiter()
53
+
54
+ # ===================== JWT Config =====================
55
+ def get_jwt_config():
56
+ """Get JWT configuration based on environment"""
57
+ # Check if we're in HuggingFace Space
58
+ if os.getenv("SPACE_ID"):
59
+ # Cloud mode - use secrets from environment
60
+ jwt_secret = os.getenv("JWT_SECRET")
61
+ if not jwt_secret:
62
+ log_warning("⚠️ WARNING: JWT_SECRET not found in environment, using fallback")
63
+ jwt_secret = "flare-admin-secret-key-change-in-production" # Fallback
64
+ else:
65
+ # On-premise mode - use .env file
66
+ from dotenv import load_dotenv
67
+ load_dotenv()
68
+ jwt_secret = os.getenv("JWT_SECRET", "flare-admin-secret-key-change-in-production")
69
+
70
+ return {
71
+ "secret": jwt_secret,
72
+ "algorithm": os.getenv("JWT_ALGORITHM", "HS256"),
73
+ "expiration_hours": int(os.getenv("JWT_EXPIRATION_HOURS", "24"))
74
+ }
75
+
76
+ # ===================== Auth Helpers =====================
77
+ def create_token(username: str) -> str:
78
+ """Create JWT token for user"""
79
+ config = get_jwt_config()
80
+ expiry = datetime.now(timezone.utc) + timedelta(hours=config["expiration_hours"])
81
+
82
+ payload = {
83
+ "sub": username,
84
+ "exp": expiry,
85
+ "iat": datetime.now(timezone.utc)
86
+ }
87
+
88
+ return jwt.encode(payload, config["secret"], algorithm=config["algorithm"])
89
+
90
+ def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
91
+ """Verify JWT token and return username"""
92
+ token = credentials.credentials
93
+ config = get_jwt_config()
94
+
95
+ try:
96
+ payload = jwt.decode(token, config["secret"], algorithms=[config["algorithm"]])
97
+ return payload["sub"]
98
+ except jwt.ExpiredSignatureError:
99
+ raise HTTPException(status_code=401, detail="Token expired")
100
+ except jwt.InvalidTokenError:
101
+ raise HTTPException(status_code=401, detail="Invalid token")
102
+
103
+ # ===================== Utility Functions =====================
104
+
105
+ def truncate_string(text: str, max_length: int = 100, suffix: str = "...") -> str:
106
+ """Truncate string to max length"""
107
+ if len(text) <= max_length:
108
+ return text
109
+ return text[:max_length - len(suffix)] + suffix
110
+
111
+ def format_file_size(size_bytes: int) -> str:
112
+ """Format file size in human readable format"""
113
+ for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
114
+ if size_bytes < 1024.0:
115
+ return f"{size_bytes:.2f} {unit}"
116
+ size_bytes /= 1024.0
117
+ return f"{size_bytes:.2f} PB"
118
+
119
+ def is_safe_path(path: str, base_path: str) -> bool:
120
+ """Check if path is safe (no directory traversal)"""
121
+ import os
122
+ # Resolve to absolute paths
123
+ base = os.path.abspath(base_path)
124
+ target = os.path.abspath(os.path.join(base, path))
125
+
126
+ # Check if target is under base
127
+ return target.startswith(base)
128
+
129
+ def get_current_timestamp() -> str:
130
+ """
131
+ Get current UTC timestamp in ISO format with Z suffix
132
+ Returns: "2025-01-10T12:00:00.123Z"
133
+ """
134
+ return datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
135
+
136
+ def normalize_timestamp(timestamp: Optional[str]) -> str:
137
+ """
138
+ Normalize timestamp string for consistent comparison
139
+ Handles various formats:
140
+ - "2025-01-10T12:00:00Z"
141
+ - "2025-01-10T12:00:00.000Z"
142
+ - "2025-01-10T12:00:00+00:00"
143
+ - "2025-01-10 12:00:00+00:00"
144
+ """
145
+ if not timestamp:
146
+ return ""
147
+
148
+ # Normalize various formats
149
+ normalized = timestamp.replace(' ', 'T') # Space to T
150
+ normalized = normalized.replace('+00:00', 'Z') # UTC timezone
151
+
152
+ # Remove milliseconds if present for comparison
153
+ if '.' in normalized and normalized.endswith('Z'):
154
+ normalized = normalized.split('.')[0] + 'Z'
155
+
156
+ return normalized
157
+
158
+ def timestamps_equal(ts1: Optional[str], ts2: Optional[str]) -> bool:
159
+ """
160
+ Compare two timestamps regardless of format differences
161
+ """
162
  return normalize_timestamp(ts1) == normalize_timestamp(ts2)