Spaces:
Runtime error
Runtime error
File size: 2,631 Bytes
71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc 71a3948 93cf1dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
from sqlmodel import Session, select
from typing import List, Optional
from src.models import User, UserCreate, UserUpdate, RFIDTag
from src.crud.utils import hash_password
# --- Read Operations ---
def get_user_by_id(db: Session, user_id: int) -> Optional[User]:
"""Retrieves a user by their primary key ID."""
return db.get(User, user_id)
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Retrieves a user by their unique username."""
return db.exec(select(User).where(User.username == username)).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Retrieves a user by their unique email."""
return db.exec(select(User).where(User.email == email)).first()
def get_user_by_tag_id(db: Session, tag_id: str) -> Optional[User]:
"""Get user by RFID tag ID."""
from src.models import RFIDTag
tag = db.exec(select(RFIDTag).where(RFIDTag.tag_id == tag_id)).first()
if tag and tag.user_id:
return db.exec(select(User).where(User.id == tag.user_id)).first()
return None
def get_all_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""Retrieves a paginated list of all users."""
return db.exec(select(User).offset(skip).limit(limit)).all()
def create_user(db: Session, user: UserCreate) -> User:
"""Creates a new user and hashes their password."""
hashed_password = hash_password(user.password)
db_user = User(
username=user.username,
hashed_password=hashed_password,
email=user.email,
full_name=user.full_name,
role=user.role,
department=user.department
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user: User, updates: UserUpdate) -> User:
"""Updates a user's information."""
user = get_user_by_id(db, user_id=user.id)
if not user:
return None
update_data = updates.model_dump(exclude_unset=True)
if "password" in update_data:
# Hash the new password if it's being updated
update_data["hashed_password"] = hash_password(update_data.pop("password"))
user.sqlmodel_update(update_data)
db.add(user)
db.commit()
db.refresh(user)
return user
def delete_user(db: Session, user_id: int) -> User | None:
"""Deletes a user by their ID."""
user_to_delete = db.get(User, user_id)
if not user_to_delete:
return None
db.delete(user_to_delete)
db.commit()
# The user object is no longer valid after deletion, so we return the in-memory object
return user_to_delete
|