tanbushi's picture
Sun Jun 8 15:02:12 CST 2025
b7791c2
from sqlalchemy.orm import Session
from .database import Base, KeyCategory, APIKey # Import models directly from database module
# from . import schemas # Schemas will be created later
# --- CRUD for KeyCategory ---
def create_key_category(db: Session, name: str, type: str, tags: list = None, metadata: dict = None):
db_category = KeyCategory(name=name, type=type, tags=tags, metadata_=metadata)
db.add(db_category)
db.commit()
db.refresh(db_category)
return db_category
def get_key_category(db: Session, category_id: int):
return db.query(KeyCategory).filter(KeyCategory.id == category_id).first()
def get_key_category_by_name(db: Session, name: str):
return db.query(KeyCategory).filter(KeyCategory.name == name).first()
def get_key_categories(db: Session, skip: int = 0, limit: int = 100):
return db.query(KeyCategory).offset(skip).limit(limit).all()
def update_key_category(db: Session, category_id: int, name: str = None, type: str = None, tags: list = None, metadata: dict = None):
db_category = get_key_category(db, category_id)
if db_category:
if name is not None:
db_category.name = name
if type is not None:
db_category.type = type
if tags is not None:
db_category.tags = tags
if metadata is not None:
db_category.metadata_ = metadata
db.commit()
db.refresh(db_category)
return db_category
def delete_key_category(db: Session, category_id: int):
db_category = get_key_category(db, category_id)
if db_category:
db.delete(db_category)
db.commit()
return db_category
# --- CRUD for APIKey ---
def create_api_key(db: Session, value: str, category_id: int, status: str = "active", metadata: dict = None):
db_key = APIKey(value=value, category_id=category_id, status=status, metadata_=metadata)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
def get_api_key(db: Session, key_id: int):
return db.query(APIKey).filter(APIKey.id == key_id).first()
def get_api_keys(db: Session, category_id: int = None, status: str = None, skip: int = 0, limit: int = 100):
query = db.query(APIKey)
if category_id is not None:
query = query.filter(APIKey.category_id == category_id)
if status is not None:
query = query.filter(APIKey.status == status)
return query.offset(skip).limit(limit).all()
def update_api_key(db: Session, key_id: int, value: str = None, category_id: int = None, status: str = None, metadata: dict = None):
db_key = get_api_key(db, key_id)
if db_key:
if value is not None:
db_key.value = value
if category_id is not None:
db_key.category_id = category_id
if status is not None:
db_key.status = status
if metadata is not None:
db_key.metadata_ = metadata
db.commit()
db.refresh(db_key)
return db_key
def delete_api_key(db: Session, key_id: int):
db_key = get_api_key(db, key_id)
if db_key:
db.delete(db_key)
db.commit()
return db_key
# --- Key Selection Logic (Basic Placeholder) ---
# This will be expanded later based on selection strategy (round-robin, etc.)
def get_available_keys_for_category(db: Session, category_id: int):
"""Get all active keys for a given category."""
return db.query(APIKey).filter(
APIKey.category_id == category_id,
APIKey.status == "active"
).all()
# Placeholder for selection strategy - will need to implement round-robin, etc.
def select_key_from_pool(db: Session, category_id: int):
"""Select one key from the available pool for a category."""
available_keys = get_available_keys_for_category(db, category_id)
if not available_keys:
return None # No keys available
# Basic selection: just return the first active key for now
# TODO: Implement round-robin or other selection strategy
selected_key = available_keys[0]
# Optional: Update usage stats (e.g., increment usage_count, update last_used)
# selected_key.usage_count += 1
# selected_key.last_used = func.now()
# db.commit()
# db.refresh(selected_key)
return selected_key