File size: 5,981 Bytes
35756cf
 
 
8bc9e59
35756cf
 
 
 
 
8bc9e59
 
42c5014
35756cf
 
42c5014
35756cf
 
 
 
 
 
 
 
42c5014
35756cf
 
 
 
 
 
 
8bc9e59
35756cf
 
 
 
 
 
 
8bc9e59
35756cf
 
 
 
 
8bc9e59
 
 
 
 
35756cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8bc9e59
35756cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8bc9e59
35756cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8bc9e59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import secrets
import requests
from fastapi import FastAPI, Depends, HTTPException, Header, Request
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
from dotenv import load_dotenv
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles

# Load environment variables from .env file
load_dotenv()

# Environment variables for MySQL and main API settings
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_DB = os.getenv("MYSQL_DB")
MAIN_API_KEY = os.getenv("MAIN_API_KEY")
MAIN_API_URL = os.getenv("MAIN_API_URL", "https://api.typegpt.net/v1/chat/completions")
MODEL_NAME = os.getenv("MODEL_NAME", "Image-Generator")

DATABASE_URL = f"mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{MYSQL_HOST}/{MYSQL_DB}"

# SQLAlchemy setup
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Updated User model with credits field
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True, nullable=False)
    hashed_password = Column(String(128), nullable=False)
    api_key = Column(String(64), unique=True, index=True, nullable=False)
    is_admin = Column(Boolean, default=False)
    credits = Column(Integer, default=0)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

# Create tables
Base.metadata.create_all(bind=engine)

app = FastAPI(title="API Key Platform")

# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")

# Dependency: Database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Utility: Generate a unique API key
def generate_api_key() -> str:
    return secrets.token_hex(16)

# Pydantic models
class UserCreate(BaseModel):
    username: str
    password: str

class UserOut(BaseModel):
    id: int
    username: str
    api_key: str
    is_admin: bool
    credits: int

    class Config:
        orm_mode = True

# Dependency: Validate API key from header and return current user
def get_current_user(x_api_key: str = Header(...), db: Session = Depends(get_db)) -> User:
    user = db.query(User).filter(User.api_key == x_api_key).first()
    if not user:
        raise HTTPException(status_code=401, detail="Invalid API Key")
    return user

# --- Endpoints ---

# 1. User Registration (generates a unique API key)
@app.post("/register", response_model=UserOut)
def register(user: UserCreate, db: Session = Depends(get_db)):
    if db.query(User).filter(User.username == user.username).first():
        raise HTTPException(status_code=400, detail="Username already exists")
    new_api_key = generate_api_key()
    # In production, use proper password hashing
    new_user = User(username=user.username, hashed_password=user.password, api_key=new_api_key, is_admin=False)
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    return new_user

# 2. User Panel: Get current user info
@app.get("/user/me", response_model=UserOut)
def read_user_me(current_user: User = Depends(get_current_user)):
    return current_user

# 3. Admin Panel: List all users (admin-only)
@app.get("/admin/users", response_model=list[UserOut])
def list_users(current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
    if not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Not authorized")
    users = db.query(User).all()
    return users

# 4. Proxy Endpoint to access the main API
class RequestPayload(BaseModel):
    prompt: str

@app.post("/generate")
def generate_image(payload: RequestPayload, current_user: User = Depends(get_current_user)):
    headers = {
        "Authorization": f"Bearer {MAIN_API_KEY}",
        "Content-Type": "application/json"
    }
    data = {
        "model": MODEL_NAME,
        "prompt": payload.prompt
    }
    response = requests.post(MAIN_API_URL, json=data, headers=headers)
    if response.status_code != 200:
        raise HTTPException(status_code=response.status_code, detail="Error from main API")
    return response.json()

# 5. New endpoint for users to test their API key
@app.get("/user/test_api")
def test_api(current_user: User = Depends(get_current_user)):
    return {"message": "API is working", "username": current_user.username, "credits": current_user.credits}

# 6. New endpoint for admin to add credits to a user account
class CreditPayload(BaseModel):
    username: str
    credits: int

@app.post("/admin/add_credit")
def add_credit(payload: CreditPayload, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
    if not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Not authorized")
    user = db.query(User).filter(User.username == payload.username).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    user.credits += payload.credits
    db.commit()
    db.refresh(user)
    return {"message": f"Added {payload.credits} credits to user {user.username}. Total credits: {user.credits}"}

# 7. Render Admin Panel UI
@app.get("/admin/ui")
def admin_ui(request: Request, current_user: User = Depends(get_current_user)):
    if not current_user.is_admin:
        raise HTTPException(status_code=403, detail="Not authorized")
    return templates.TemplateResponse("admin.html", {"request": request})

# 8. Render User Panel UI
@app.get("/user/ui")
def user_ui(request: Request, current_user: User = Depends(get_current_user)):
    return templates.TemplateResponse("user.html", {"request": request})