Spaces:
Running
Running
File size: 3,452 Bytes
76b9762 8dd0381 76b9762 8dd0381 76b9762 8dd0381 76b9762 8dd0381 76b9762 8dd0381 76b9762 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
from typing import Optional
from fastapi import Header, HTTPException
from app.config.config import settings
from app.log.logger import get_security_logger
logger = get_security_logger()
def verify_auth_token(token: str) -> bool:
return token == settings.AUTH_TOKEN
class SecurityService:
async def verify_key(self, key: str):
if key not in settings.allowed_tokens_list and key != settings.AUTH_TOKEN:
logger.error("Invalid key")
raise HTTPException(status_code=401, detail="Invalid key")
return key
async def verify_authorization(
self, authorization: Optional[str] = Header(None)
) -> str:
if not authorization:
logger.error("Missing Authorization header")
raise HTTPException(status_code=401, detail="Missing Authorization header")
if not authorization.startswith("Bearer "):
logger.error("Invalid Authorization header format")
raise HTTPException(
status_code=401, detail="Invalid Authorization header format"
)
token = authorization.replace("Bearer ", "")
if token not in settings.allowed_tokens_list and token != settings.AUTH_TOKEN:
logger.error("Invalid token")
raise HTTPException(status_code=401, detail="Invalid token")
return token
async def verify_goog_api_key(
self, x_goog_api_key: Optional[str] = Header(None)
) -> str:
"""验证Google API Key"""
if not x_goog_api_key:
logger.error("Missing x-goog-api-key header")
raise HTTPException(status_code=401, detail="Missing x-goog-api-key header")
if (
x_goog_api_key not in settings.allowed_tokens_list
and x_goog_api_key != settings.AUTH_TOKEN
):
logger.error("Invalid x-goog-api-key")
raise HTTPException(status_code=401, detail="Invalid x-goog-api-key")
return x_goog_api_key
async def verify_auth_token(
self, authorization: Optional[str] = Header(None)
) -> str:
if not authorization:
logger.error("Missing auth_token header")
raise HTTPException(status_code=401, detail="Missing auth_token header")
token = authorization.replace("Bearer ", "")
if token != settings.AUTH_TOKEN:
logger.error("Invalid auth_token")
raise HTTPException(status_code=401, detail="Invalid auth_token")
return token
async def verify_key_or_goog_api_key(
self, key: Optional[str] = None , x_goog_api_key: Optional[str] = Header(None)
) -> str:
"""验证URL中的key或请求头中的x-goog-api-key"""
# 如果URL中的key有效,直接返回
if key in settings.allowed_tokens_list or key == settings.AUTH_TOKEN:
return key
# 否则检查请求头中的x-goog-api-key
if not x_goog_api_key:
logger.error("Invalid key and missing x-goog-api-key header")
raise HTTPException(status_code=401, detail="Invalid key and missing x-goog-api-key header")
if x_goog_api_key not in settings.allowed_tokens_list and x_goog_api_key != settings.AUTH_TOKEN:
logger.error("Invalid key and invalid x-goog-api-key")
raise HTTPException(status_code=401, detail="Invalid key and invalid x-goog-api-key")
return x_goog_api_key |