|
|
import asyncio |
|
|
import uuid |
|
|
from tortoise import fields, Tortoise |
|
|
from tortoise.models import Model |
|
|
from passlib.context import CryptContext |
|
|
import datetime |
|
|
import random |
|
|
import string |
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
|
|
|
def generate_short_uuid() -> str: |
|
|
"""Generate a random 5-character alphanumeric string.""" |
|
|
return ''.join(random.choices(string.ascii_letters + string.digits, k=5)) |
|
|
|
|
|
|
|
|
class User(Model): |
|
|
id = fields.CharField(primary_key=True, max_length=5, default=generate_short_uuid) |
|
|
name = fields.CharField(max_length=100, db_index=True) |
|
|
email = fields.CharField(max_length=100, db_index=True, unique=True) |
|
|
password = fields.CharField(max_length=100) |
|
|
phoneNumber = fields.CharField(max_length=100, null=True) |
|
|
account_type = fields.IntField(default=1) |
|
|
balance = fields.DecimalField(max_digits=10, decimal_places=2, default=0.00) |
|
|
ip_address = fields.CharField(max_length=45, null=True) |
|
|
mac_address = fields.CharField(max_length=17, null=True) |
|
|
createdAt = fields.DatetimeField(default=datetime.datetime.now) |
|
|
updatedAt = fields.DatetimeField(default=datetime.datetime.now) |
|
|
lastLogin = fields.DatetimeField(default=datetime.datetime.now) |
|
|
failed_attempts = fields.IntField(default=0) |
|
|
account_locked = fields.BooleanField(default=False) |
|
|
reset_token = fields.CharField(max_length=100, null=True, unique=True) |
|
|
reset_token_expiration = fields.DatetimeField(null=True) |
|
|
|
|
|
class Meta: |
|
|
table = "users" |
|
|
|
|
|
def verify_password(self, plain_password: str) -> bool: |
|
|
if self.account_locked: |
|
|
print("Account is locked due to too many failed attempts.") |
|
|
return False |
|
|
|
|
|
if pwd_context.verify(plain_password, self.password): |
|
|
self.failed_attempts = 0 |
|
|
self.account_locked = False |
|
|
self.save() |
|
|
return True |
|
|
else: |
|
|
self.failed_attempts += 1 |
|
|
if self.failed_attempts >= 5: |
|
|
self.account_locked = True |
|
|
print("Account locked due to too many failed attempts.") |
|
|
self.save() |
|
|
return False |
|
|
|
|
|
async def initiate_password_reset(self): |
|
|
|
|
|
self.reset_token = str(uuid.uuid4()) |
|
|
self.reset_token_expiration = datetime.datetime.now() + datetime.timedelta(hours=1) |
|
|
await self.save() |
|
|
|
|
|
|
|
|
print(f"Password reset token for {self.email}: {self.reset_token}") |
|
|
|
|
|
async def reset_password(self, reset_token, new_password): |
|
|
|
|
|
if self.reset_token != reset_token or datetime.datetime.now() > self.reset_token_expiration: |
|
|
print("Invalid or expired reset token.") |
|
|
return False |
|
|
|
|
|
|
|
|
self.password = pwd_context.hash(new_password) |
|
|
self.reset_token = None |
|
|
self.reset_token_expiration = None |
|
|
await self.save() |
|
|
print("Password has been reset successfully.") |
|
|
return True |