|
import jwt |
|
from aiohttp import web |
|
|
|
SECRET_KEY = "your_secret_key" |
|
|
|
def create_jwt(user_id: str): |
|
"""Create a JWT token.""" |
|
payload = {"sub": user_id, "role": "api_user"} |
|
return jwt.encode(payload, SECRET_KEY, algorithm="HS256") |
|
|
|
@web.middleware |
|
async def jwt_middleware(request, handler): |
|
"""JWT Middleware for /api endpoints.""" |
|
if request.path.startswith("/api"): |
|
token = request.headers.get("Authorization") |
|
if not token or not token.startswith("Bearer "): |
|
raise web.HTTPUnauthorized(reason="Authorization header missing or malformed") |
|
token = token.split("Bearer ")[1] |
|
try: |
|
decoded = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) |
|
request["user"] = decoded |
|
except jwt.ExpiredSignatureError: |
|
raise web.HTTPUnauthorized(reason="Token has expired") |
|
except jwt.InvalidTokenError: |
|
raise web.HTTPUnauthorized(reason="Invalid token") |
|
return await handler(request) |
|
|