|
|
from fastapi import APIRouter, HTTPException, status |
|
|
from .Schemas import ( |
|
|
RegisterUserRequest, ResetPasswordRequest, LoginUserRequest, |
|
|
AccessTokenResponse, BaseResponse, ForgotPasswordRequest, VerifyResetTokenRequest |
|
|
) |
|
|
from .Model import User |
|
|
from jose import jwt |
|
|
from datetime import datetime, timedelta,timezone |
|
|
import random |
|
|
from passlib.context import CryptContext |
|
|
|
|
|
|
|
|
SECRET_KEY = "your_secret_key_here" |
|
|
ALGORITHM = "HS256" |
|
|
ACCESS_TOKEN_EXPIRE_MINUTES = 30 |
|
|
PASSWORD_RESET_EXPIRE_MINUTES = 15 |
|
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
user_router = APIRouter(tags=["User"]) |
|
|
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: timedelta = None): |
|
|
to_encode = data.copy() |
|
|
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) |
|
|
to_encode.update({"exp": expire}) |
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) |
|
|
|
|
|
|
|
|
@user_router.post("/user/register", response_model=BaseResponse, status_code=status.HTTP_201_CREATED) |
|
|
async def register_user(user: RegisterUserRequest): |
|
|
existing_user = await User.filter(phoneNumber=user.phoneNumber).first() |
|
|
if existing_user: |
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="User already exists.") |
|
|
|
|
|
user.hash_password() |
|
|
new_user = await User.create(**user.dict()) |
|
|
return BaseResponse(code=200, message="User created successfully", payload={"user_id": new_user.id}) |
|
|
|
|
|
|
|
|
@user_router.post("/user/login", response_model=AccessTokenResponse, status_code=status.HTTP_200_OK) |
|
|
async def login_user(user: LoginUserRequest): |
|
|
db_user = await User.filter(phoneNumber=user.phoneNumber).first() |
|
|
if db_user and db_user.verify_password(user.password): |
|
|
access_token = create_access_token(data={"sub": db_user.phoneNumber}) |
|
|
return AccessTokenResponse(access_token=access_token, token_type="bearer") |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid credentials", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
|
|
|
|
|
|
@user_router.post("/user/forgot-password", response_model=BaseResponse, status_code=status.HTTP_200_OK) |
|
|
async def forgot_password(request: ForgotPasswordRequest): |
|
|
user = await User.filter(phoneNumber=request.phoneNumber).first() |
|
|
if not user: |
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.") |
|
|
|
|
|
|
|
|
reset_token = f"{random.randint(100000, 999999)}" |
|
|
reset_token_expiration = datetime.utcnow() + timedelta(minutes=PASSWORD_RESET_EXPIRE_MINUTES) |
|
|
|
|
|
|
|
|
user.reset_token = reset_token |
|
|
user.reset_token_expiration = reset_token_expiration |
|
|
await user.save() |
|
|
|
|
|
|
|
|
print(f"Password reset token for {request.phoneNumber}: {reset_token}") |
|
|
|
|
|
return BaseResponse(code=200, message="Password reset token sent. Check your phone for further instructions.") |
|
|
|
|
|
|
|
|
@user_router.post("/user/verify-reset-token", response_model=BaseResponse, status_code=status.HTTP_200_OK) |
|
|
async def verify_reset_token(request: VerifyResetTokenRequest): |
|
|
user = await User.filter(phoneNumber=request.phoneNumber, reset_token=request.reset_token).first() |
|
|
if not user: |
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid token or phone number.") |
|
|
|
|
|
|
|
|
if datetime.utcnow().replace(tzinfo=timezone.utc) > user.reset_token_expiration: |
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token has expired.") |
|
|
|
|
|
return BaseResponse(code=200, message="Token verified. You may now reset your password.") |
|
|
|
|
|
|
|
|
@user_router.post("/user/reset-password", response_model=BaseResponse, status_code=status.HTTP_200_OK) |
|
|
async def reset_password(request: ResetPasswordRequest): |
|
|
user = await User.filter(phoneNumber=request.phoneNumber).first() |
|
|
if not user: |
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found.") |
|
|
|
|
|
|
|
|
|
|
|
if not user.reset_token or datetime.utcnow().replace(tzinfo=timezone.utc) > user.reset_token_expiration: |
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Reset token invalid or expired.") |
|
|
|
|
|
|
|
|
user.password = pwd_context.hash(request.new_password) |
|
|
user.reset_token = None |
|
|
user.reset_token_expiration = None |
|
|
await user.save() |
|
|
|
|
|
return BaseResponse(code=200, message="Password has been reset successfully.") |
|
|
|