Spaces:
Running
Running
File size: 1,577 Bytes
220d87f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from .config import settings
from ..database import get_db
from ..models import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
auth_scheme = HTTPBearer()
def hash_password(p: str) -> str:
return pwd_context.hash(p)
def verify_password(p: str, hashed: str) -> bool:
return pwd_context.verify(p, hashed)
def create_access_token(sub: str, expires_minutes: int = None) -> str:
expire = datetime.utcnow() + timedelta(minutes=expires_minutes or settings.JWT_EXPIRES_MINUTES)
to_encode = {"sub": sub, "exp": expire}
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm="HS256")
def decode_token(token: str) -> Optional[str]:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"])
return payload.get("sub")
except JWTError:
return None
def get_current_user(creds: HTTPAuthorizationCredentials = Depends(auth_scheme), db: Session = Depends(get_db)) -> User:
email = decode_token(creds.credentials)
if not email:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.email == email).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
|