Spaces:
Sleeping
Sleeping
File size: 4,242 Bytes
b7791c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
from sqlalchemy.orm import Session
from .database import Base, KeyCategory, APIKey # Import models directly from database module
# from . import schemas # Schemas will be created later
# --- CRUD for KeyCategory ---
def create_key_category(db: Session, name: str, type: str, tags: list = None, metadata: dict = None):
db_category = KeyCategory(name=name, type=type, tags=tags, metadata_=metadata)
db.add(db_category)
db.commit()
db.refresh(db_category)
return db_category
def get_key_category(db: Session, category_id: int):
return db.query(KeyCategory).filter(KeyCategory.id == category_id).first()
def get_key_category_by_name(db: Session, name: str):
return db.query(KeyCategory).filter(KeyCategory.name == name).first()
def get_key_categories(db: Session, skip: int = 0, limit: int = 100):
return db.query(KeyCategory).offset(skip).limit(limit).all()
def update_key_category(db: Session, category_id: int, name: str = None, type: str = None, tags: list = None, metadata: dict = None):
db_category = get_key_category(db, category_id)
if db_category:
if name is not None:
db_category.name = name
if type is not None:
db_category.type = type
if tags is not None:
db_category.tags = tags
if metadata is not None:
db_category.metadata_ = metadata
db.commit()
db.refresh(db_category)
return db_category
def delete_key_category(db: Session, category_id: int):
db_category = get_key_category(db, category_id)
if db_category:
db.delete(db_category)
db.commit()
return db_category
# --- CRUD for APIKey ---
def create_api_key(db: Session, value: str, category_id: int, status: str = "active", metadata: dict = None):
db_key = APIKey(value=value, category_id=category_id, status=status, metadata_=metadata)
db.add(db_key)
db.commit()
db.refresh(db_key)
return db_key
def get_api_key(db: Session, key_id: int):
return db.query(APIKey).filter(APIKey.id == key_id).first()
def get_api_keys(db: Session, category_id: int = None, status: str = None, skip: int = 0, limit: int = 100):
query = db.query(APIKey)
if category_id is not None:
query = query.filter(APIKey.category_id == category_id)
if status is not None:
query = query.filter(APIKey.status == status)
return query.offset(skip).limit(limit).all()
def update_api_key(db: Session, key_id: int, value: str = None, category_id: int = None, status: str = None, metadata: dict = None):
db_key = get_api_key(db, key_id)
if db_key:
if value is not None:
db_key.value = value
if category_id is not None:
db_key.category_id = category_id
if status is not None:
db_key.status = status
if metadata is not None:
db_key.metadata_ = metadata
db.commit()
db.refresh(db_key)
return db_key
def delete_api_key(db: Session, key_id: int):
db_key = get_api_key(db, key_id)
if db_key:
db.delete(db_key)
db.commit()
return db_key
# --- Key Selection Logic (Basic Placeholder) ---
# This will be expanded later based on selection strategy (round-robin, etc.)
def get_available_keys_for_category(db: Session, category_id: int):
"""Get all active keys for a given category."""
return db.query(APIKey).filter(
APIKey.category_id == category_id,
APIKey.status == "active"
).all()
# Placeholder for selection strategy - will need to implement round-robin, etc.
def select_key_from_pool(db: Session, category_id: int):
"""Select one key from the available pool for a category."""
available_keys = get_available_keys_for_category(db, category_id)
if not available_keys:
return None # No keys available
# Basic selection: just return the first active key for now
# TODO: Implement round-robin or other selection strategy
selected_key = available_keys[0]
# Optional: Update usage stats (e.g., increment usage_count, update last_used)
# selected_key.usage_count += 1
# selected_key.last_used = func.now()
# db.commit()
# db.refresh(selected_key)
return selected_key
|