hotspot / App /Users /UserRoutes.py
Mbonea's picture
new age with tortole user works but not tests
b856986
raw
history blame
4.98 kB
from fastapi import APIRouter, HTTPException, status
from .Schemas import (
RegisterUserRequest, ResetPasswordRequest, LoginUserRequest,
AccessTokenResponse, BaseResponse, ForgotPasswordRequest, VerifyResetTokenRequest
)
from .Model import User
from jose import jwt
from datetime import datetime, timedelta,timezone
import random
from passlib.context import CryptContext
# Configurations for JWT and Password Reset
SECRET_KEY = "your_secret_key_here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
PASSWORD_RESET_EXPIRE_MINUTES = 15 # Reset token valid for 15 minutes
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
user_router = APIRouter(tags=["User"])
# Utility function to create JWT token
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# Register route
@user_router.post("/user/register", response_model=BaseResponse, status_code=status.HTTP_201_CREATED)
async def register_user(user: RegisterUserRequest):
existing_user = await User.filter(phoneNumber=user.phoneNumber).first()
if existing_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User already exists.")
user.hash_password()
new_user = await User.create(**user.dict())
return BaseResponse(code=200, message="User created successfully", payload={"user_id": new_user.id})
# Login route (using phone number)
@user_router.post("/user/login", response_model=AccessTokenResponse, status_code=status.HTTP_200_OK)
async def login_user(user: LoginUserRequest):
db_user = await User.filter(phoneNumber=user.phoneNumber).first()
if db_user and db_user.verify_password(user.password):
access_token = create_access_token(data={"sub": db_user.phoneNumber})
return AccessTokenResponse(access_token=access_token, token_type="bearer")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Forgot Password route (using phone number only)
@user_router.post("/user/forgot-password", response_model=BaseResponse, status_code=status.HTTP_200_OK)
async def forgot_password(request: ForgotPasswordRequest):
user = await User.filter(phoneNumber=request.phoneNumber).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")
# Generate a short reset token (6-digit code)
reset_token = f"{random.randint(100000, 999999)}"
reset_token_expiration = datetime.utcnow() + timedelta(minutes=PASSWORD_RESET_EXPIRE_MINUTES)
# Store reset token and expiration in the user record
user.reset_token = reset_token
user.reset_token_expiration = reset_token_expiration
await user.save()
# In production, send this token via SMS to the user's phone number
print(f"Password reset token for {request.phoneNumber}: {reset_token}")
return BaseResponse(code=200, message="Password reset token sent. Check your phone for further instructions.")
# Verify Reset Token route
@user_router.post("/user/verify-reset-token", response_model=BaseResponse, status_code=status.HTTP_200_OK)
async def verify_reset_token(request: VerifyResetTokenRequest):
user = await User.filter(phoneNumber=request.phoneNumber, reset_token=request.reset_token).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid token or phone number.")
# Check if the reset token has expired
if datetime.utcnow().replace(tzinfo=timezone.utc) > user.reset_token_expiration:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token has expired.")
return BaseResponse(code=200, message="Token verified. You may now reset your password.")
# Reset Password route (After Token Verification)
@user_router.post("/user/reset-password", response_model=BaseResponse, status_code=status.HTTP_200_OK)
async def reset_password(request: ResetPasswordRequest):
user = await User.filter(phoneNumber=request.phoneNumber).first()
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.")
# Check if the reset token is present and not expired
if not user.reset_token or datetime.utcnow().replace(tzinfo=timezone.utc) > user.reset_token_expiration:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token invalid or expired.")
# Update the user's password and clear the reset token
user.password = pwd_context.hash(request.new_password)
user.reset_token = None
user.reset_token_expiration = None
await user.save()
return BaseResponse(code=200, message="Password has been reset successfully.")