Spaces:
Running
Running
File size: 5,981 Bytes
35756cf 8bc9e59 35756cf 8bc9e59 42c5014 35756cf 42c5014 35756cf 42c5014 35756cf 8bc9e59 35756cf 8bc9e59 35756cf 8bc9e59 35756cf 8bc9e59 35756cf 8bc9e59 35756cf 8bc9e59 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import os
import secrets
import requests
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
# Load environment variables from .env file
load_dotenv()
# Environment variables for MySQL and main API settings
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_DB = os.getenv("MYSQL_DB")
MAIN_API_KEY = os.getenv("MAIN_API_KEY")
MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions")
MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator")
DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}"
# SQLAlchemy setup
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Updated User model with credits field
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True, nullable=False)
hashed_password = Column(String(128), nullable=False)
api_key = Column(String(64), unique=True, index=True, nullable=False)
is_admin = Column(Boolean, default=False)
credits = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="API Key Platform")
# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# Dependency: Database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Utility: Generate a unique API key
def generate_api_key() -> str:
return secrets.token_hex(16)
# Pydantic models
class UserCreate(BaseModel):
username: str
password: str
class UserOut(BaseModel):
id: int
username: str
api_key: str
is_admin: bool
credits: int
class Config:
orm_mode = True
# Dependency: Validate API key from header and return current user
def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User:
user = db.query(User).filter(User.api_key == x_api_key).first()
if not user:
raise HTTPException(status_code=401, detail="Invalid API Key")
return user
# --- Endpoints ---
# 1. User Registration (generates a unique API key)
@app.post("/register", response_model=UserOut)
def register(user: UserCreate, db: Session = Depends(get_db)):
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(status_code=400, detail="Username already exists")
new_api_key = generate_api_key()
# In production, use proper password hashing
new_user = User(username=user.username, hashed_password=user.password, api_key=new_api_key, is_admin=False)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
# 2. User Panel: Get current user info
@app.get("/user/me", response_model=UserOut)
def read_user_me(current_user: User = Depends(get_current_user)):
return current_user
# 3. Admin Panel: List all users (admin-only)
@app.get("/admin/users", response_model=list[UserOut])
def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
users = db.query(User).all()
return users
# 4. Proxy Endpoint to access the main API
class RequestPayload(BaseModel):
prompt: str
@app.post("/generate")
def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)):
headers = {
"Authorization": f"Bearer {MAIN_API_KEY}",
"Content-Type": "application/json"
}
data = {
"model": MODEL_NAME,
"prompt": payload.prompt
}
response = requests.post(MAIN_API_URL, json=data, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error from main API")
return response.json()
# 5. New endpoint for users to test their API key
@app.get("/user/test_api")
def test_api(current_user: User = Depends(get_current_user)):
return {"message": "API is working", "username": current_user.username, "credits": current_user.credits}
# 6. New endpoint for admin to add credits to a user account
class CreditPayload(BaseModel):
username: str
credits: int
@app.post("/admin/add_credit")
def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
user = db.query(User).filter(User.username == payload.username).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.credits += payload.credits
db.commit()
db.refresh(user)
return {"message": f"Added {payload.credits} credits to user {user.username}. Total credits: {user.credits}"}
# 7. Render Admin Panel UI
@app.get("/admin/ui")
def admin_ui(request: Request, current_user: User = Depends(get_current_user)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not authorized")
return templates.TemplateResponse("admin.html", {"request": request})
# 8. Render User Panel UI
@app.get("/user/ui")
def user_ui(request: Request, current_user: User = Depends(get_current_user)):
return templates.TemplateResponse("user.html", {"request": request})
|