Spaces:
Running
Running
from datetime import datetime, timedelta | |
from typing import Optional | |
from jose import jwt, JWTError | |
from passlib.context import CryptContext | |
from fastapi import Depends, HTTPException, status | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from sqlalchemy.orm import Session | |
from .config import settings | |
from ..database import get_db | |
from ..models import User | |
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
auth_scheme = HTTPBearer() | |
def hash_password(p: str) -> str: | |
return pwd_context.hash(p) | |
def verify_password(p: str, hashed: str) -> bool: | |
return pwd_context.verify(p, hashed) | |
def create_access_token(sub: str, expires_minutes: int = None) -> str: | |
expire = datetime.utcnow() + timedelta(minutes=expires_minutes or settings.JWT_EXPIRES_MINUTES) | |
to_encode = {"sub": sub, "exp": expire} | |
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm="HS256") | |
def decode_token(token: str) -> Optional[str]: | |
try: | |
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"]) | |
return payload.get("sub") | |
except JWTError: | |
return None | |
def get_current_user(creds: HTTPAuthorizationCredentials = Depends(auth_scheme), db: Session = Depends(get_db)) -> User: | |
email = decode_token(creds.credentials) | |
if not email: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") | |
user = db.query(User).filter(User.email == email).first() | |
if not user: | |
raise HTTPException(status_code=401, detail="User not found") | |
return user | |