| from tortoise import fields, models | |
| from passlib.context import CryptContext | |
| import datetime | |
| import uuid | |
| import random | |
| import string | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| def generate_short_uuid() -> str: | |
| return "".join(random.choices(string.ascii_letters + string.digits, k=5)) | |
| class User(models.Model): | |
| id = fields.CharField(primary_key=True, max_length=5, default=generate_short_uuid) | |
| name = fields.CharField(max_length=100) | |
| password = fields.CharField(max_length=100) | |
| phoneNumber = fields.CharField(max_length=15, unique=True) | |
| balance = fields.DecimalField(max_digits=10, decimal_places=2, default=0.00) | |
| mac_address = fields.CharField(max_length=17) | |
| createdAt = fields.DatetimeField(auto_now_add=True) | |
| updatedAt = fields.DatetimeField(auto_now=True) | |
| lastLogin = fields.DatetimeField(default=datetime.datetime.now) | |
| failed_attempts = fields.IntField(default=0) | |
| account_locked = fields.BooleanField(default=False) | |
| reset_token = fields.CharField(max_length=6, null=True, unique=True) | |
| reset_token_expiration = fields.DatetimeField(null=True) | |
| class Meta: | |
| table = "users" | |
| def hash_password(self, plain_password: str) -> str: | |
| return pwd_context.hash(plain_password) | |
| def verify_password(self, plain_password: str) -> bool: | |
| if ( | |
| self.account_locked | |
| and datetime.datetime.now() | |
| < self.lastLogin + datetime.timedelta(minutes=15) | |
| ): | |
| print("Account is locked due to too many failed attempts. Try again later.") | |
| return False | |
| if pwd_context.verify(plain_password, self.password): | |
| self.failed_attempts = 0 | |
| self.account_locked = False | |
| self.lastLogin = datetime.datetime.now() | |
| self.save() | |
| return True | |
| else: | |
| self.failed_attempts += 1 | |
| if self.failed_attempts >= 5: | |
| self.account_locked = True | |
| self.save() | |
| return False | |
| async def initiate_password_reset(self): | |
| self.reset_token = f"{random.randint(100000, 999999)}" | |
| self.reset_token_expiration = datetime.datetime.now() + datetime.timedelta( | |
| minutes=15 | |
| ) | |
| await self.save() | |
| async def reset_password(self, reset_token: str, new_password: str): | |
| if ( | |
| self.reset_token != reset_token | |
| or datetime.datetime.now() > self.reset_token_expiration | |
| ): | |
| return False | |
| self.password = self.hash_password(new_password) | |
| self.reset_token = None | |
| self.reset_token_expiration = None | |
| await self.save() | |
| return True | |