Spaces:
Build error
Build error
""" | |
AI Tool Hub - Main Application | |
Integrates all components into a single FastAPI application | |
""" | |
import os | |
import json | |
import uuid | |
import secrets | |
import logging | |
from datetime import datetime, timedelta | |
from typing import Dict, Any, List, Optional, Union | |
# Load environment variables from .env file | |
from dotenv import load_dotenv | |
load_dotenv() | |
from fastapi import FastAPI, Request, Response, Depends, HTTPException, Form, Cookie, status | |
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.security import APIKeyHeader, HTTPBearer, HTTPAuthorizationCredentials | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel, Field | |
from sqlalchemy import create_engine | |
from sqlalchemy.orm import sessionmaker | |
# Import provider modules | |
from providers import get_provider, HuggingFaceProvider, OpenAIProvider, DeepSeekProvider, OpenRouterProvider | |
# Import prompt modules | |
from prompts import PromptTemplate, PromptTemplateManager, PromptMarketplace | |
# Import ads module | |
from ads import GoogleAdsManager | |
# Import additional modules | |
import random | |
import time | |
import hashlib | |
# Import the Base from models.py | |
from models import Base | |
# Import the TOOLS from tools.py | |
from tools import TOOLS | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger("app") | |
# Database setup | |
DATABASE_URL = "sqlite:///./magicAi.db" # Path to your SQLite database | |
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
# Create the database tables | |
Base.metadata.create_all(bind=engine) | |
# Create the FastAPI app | |
app = FastAPI( | |
title="AI Tool Hub", | |
description="A platform for using AI models with various tools and prompt templates", | |
version="1.0.0" | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Set up Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# Initialize components | |
template_manager = PromptTemplateManager() | |
marketplace = PromptMarketplace() | |
ads_manager = GoogleAdsManager() | |
# Security | |
API_KEY_NAME = "X-API-Key" | |
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
security = HTTPBearer(auto_error=False) | |
# In-memory store of API keys for demo purpose | |
# In production, these would be stored in a database | |
API_KEYS = {} | |
# In-memory session store for demo purpose | |
# In production, these would be stored in a database or Redis | |
SESSIONS = {} | |
# Add new models for ad rewards | |
class AdRewardRequest(BaseModel): | |
ad_type: str # 'daily' or 'special' | |
impression_id: Optional[str] = None | |
class DailyReward(BaseModel): | |
user_id: str | |
date: str | |
count: int = 0 | |
last_reward: Optional[str] = None | |
class SpecialReward(BaseModel): | |
user_id: str | |
date: str | |
claimed: bool = False | |
claimed_at: Optional[str] = None | |
# In-memory stores for rewards (in production, use a database) | |
DAILY_REWARDS = {} | |
SPECIAL_REWARDS = {} | |
# Models | |
class CreditUpdateRequest(BaseModel): | |
user_id: str | |
amount: float | |
reason: str = "manual" | |
class GenerateRequest(BaseModel): | |
prompt: str | |
model: str | |
provider: str | |
system_message: Optional[str] = None | |
max_tokens: Optional[int] = 1000 | |
temperature: Optional[float] = 0.7 | |
prompt_template_id: Optional[str] = None | |
template_variables: Optional[Dict[str, Any]] = None | |
class ImageGenerateRequest(BaseModel): | |
prompt: str | |
provider: str = "openai" | |
model: Optional[str] = None | |
size: Optional[str] = "1024x1024" | |
prompt_template_id: Optional[str] = None | |
template_variables: Optional[Dict[str, Any]] = None | |
class UserInfo(BaseModel): | |
id: str | |
username: str | |
email: str | |
credits: float = 10.0 # New users start with some free credits | |
created_at: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
last_login: str = Field(default_factory=lambda: datetime.now().isoformat()) | |
role: str = "user" | |
# Models for prompt system | |
class Prompt(BaseModel): | |
id: str | |
tool_id: str | |
title: str | |
content: str | |
description: Optional[str] = None | |
creator_id: Optional[str] = None | |
is_default: bool = False | |
usage_count: int = 0 | |
avg_rating: float = 0.0 | |
created_at: str = None | |
updated_at: str = None | |
tags: List[str] = [] | |
class PromptRating(BaseModel): | |
prompt_id: str | |
user_id: str | |
rating: int # 1-5 | |
created_at: str | |
class UserPromptHistory(BaseModel): | |
user_id: str | |
prompt_id: str | |
tool_id: str | |
used_at: str | |
was_modified: bool = False | |
modifications: Optional[str] = None | |
# Mock database (replace with actual DynamoDB integration) | |
PROMPTS = {} | |
PROMPT_RATINGS = [] | |
USER_PROMPT_HISTORY = [] | |
# Initialize default prompts | |
def init_default_prompts(): | |
default_prompts = [ | |
{ | |
"id": "text-summary-default", | |
"tool_id": "text-summary", | |
"title": "Concise Text Summary", | |
"content": "Summarize the following text in 3-5 bullet points highlighting the main ideas: {{input}}", | |
"description": "Creates a bulleted summary of any text", | |
"is_default": True, | |
"tags": ["summary", "concise", "bullets"] | |
}, | |
{ | |
"id": "image-portrait-default", | |
"tool_id": "image-generator", | |
"title": "Professional Portrait", | |
"content": "Create a professional portrait photo of {{subject}}, high quality, studio lighting, detailed features, professional attire", | |
"description": "Generates professional-looking portrait images", | |
"is_default": True, | |
"tags": ["portrait", "professional", "photo"] | |
}, | |
{ | |
"id": "text-blog-default", | |
"tool_id": "text-generator", | |
"title": "Blog Post Generator", | |
"content": "Write a comprehensive blog post about {{topic}}. Include an introduction, 3-5 main sections with subheadings, and a conclusion. Use a conversational tone and include practical examples where appropriate.", | |
"description": "Creates complete blog posts with proper structure", | |
"is_default": True, | |
"tags": ["blog", "writing", "content"] | |
}, | |
{ | |
"id": "code-python-default", | |
"tool_id": "code-generator", | |
"title": "Python Function", | |
"content": "Write a Python function that {{task}}. Include docstrings, type hints, error handling, and comments explaining your logic. Provide a small example of how to use the function.", | |
"description": "Generates well-structured Python functions", | |
"is_default": True, | |
"tags": ["python", "function", "code"] | |
} | |
] | |
for prompt in default_prompts: | |
prompt_obj = Prompt( | |
id=prompt["id"], | |
tool_id=prompt["tool_id"], | |
title=prompt["title"], | |
content=prompt["content"], | |
description=prompt["description"], | |
is_default=prompt["is_default"], | |
tags=prompt["tags"], | |
created_at=datetime.now().isoformat(), | |
updated_at=datetime.now().isoformat() | |
) | |
PROMPTS[prompt["id"]] = prompt_obj | |
# Call initialization at startup | |
init_default_prompts() | |
# Helper functions for prompt system | |
def get_prompts_for_tool(tool_id: str) -> List[Prompt]: | |
"""Get all prompts for a specific tool""" | |
return [p for p in PROMPTS.values() if p.tool_id == tool_id] | |
def get_default_prompts_for_tool(tool_id: str) -> List[Prompt]: | |
"""Get default prompts for a specific tool""" | |
return [p for p in PROMPTS.values() if p.tool_id == tool_id and p.is_default] | |
def get_trending_prompts(limit: int = 5) -> List[Prompt]: | |
"""Get trending prompts based on usage count and ratings""" | |
sorted_prompts = sorted( | |
PROMPTS.values(), | |
key=lambda p: (p.usage_count * 0.7 + p.avg_rating * 0.3), | |
reverse=True | |
) | |
return sorted_prompts[:limit] | |
def get_personalized_prompts(user_id: str, tool_id: str, limit: int = 3) -> List[Prompt]: | |
"""Get personalized prompt recommendations for a user and tool""" | |
# Get user history for this tool | |
user_tool_history = [h for h in USER_PROMPT_HISTORY | |
if h.user_id == user_id and h.tool_id == tool_id] | |
if not user_tool_history: | |
# If no history, return default prompts | |
return get_default_prompts_for_tool(tool_id) | |
# Count prompt usage | |
prompt_usage = {} | |
for history in user_tool_history: | |
prompt_usage[history.prompt_id] = prompt_usage.get(history.prompt_id, 0) + 1 | |
# Get most used prompts | |
most_used_prompt_ids = sorted(prompt_usage.items(), key=lambda x: x[1], reverse=True) | |
most_used_prompts = [PROMPTS.get(pid) for pid, _ in most_used_prompt_ids[:limit]] | |
# If we don't have enough, add some default or trending ones | |
if len(most_used_prompts) < limit: | |
defaults = get_default_prompts_for_tool(tool_id) | |
for default in defaults: | |
if default not in most_used_prompts and len(most_used_prompts) < limit: | |
most_used_prompts.append(default) | |
return most_used_prompts | |
def record_prompt_usage(user_id: str, prompt_id: str, tool_id: str, was_modified: bool = False, modifications: str = None): | |
"""Record that a user used a particular prompt""" | |
# Update prompt usage count | |
if prompt_id in PROMPTS: | |
PROMPTS[prompt_id].usage_count += 1 | |
PROMPTS[prompt_id].updated_at = datetime.now().isoformat() | |
# Add to history | |
history_entry = UserPromptHistory( | |
user_id=user_id, | |
prompt_id=prompt_id, | |
tool_id=tool_id, | |
used_at=datetime.now().isoformat(), | |
was_modified=was_modified, | |
modifications=modifications | |
) | |
USER_PROMPT_HISTORY.append(history_entry) | |
def rate_prompt(user_id: str, prompt_id: str, rating: int): | |
"""Add or update a rating for a prompt""" | |
# Check if user already rated this prompt | |
existing_rating = next((r for r in PROMPT_RATINGS if r.user_id == user_id and r.prompt_id == prompt_id), None) | |
if existing_rating: | |
# Update existing rating | |
existing_rating.rating = rating | |
existing_rating.created_at = datetime.now().isoformat() | |
else: | |
# Add new rating | |
rating_obj = PromptRating( | |
prompt_id=prompt_id, | |
user_id=user_id, | |
rating=rating, | |
created_at=datetime.now().isoformat() | |
) | |
PROMPT_RATINGS.append(rating_obj) | |
# Update average rating on prompt | |
if prompt_id in PROMPTS: | |
prompt_ratings = [r.rating for r in PROMPT_RATINGS if r.prompt_id == prompt_id] | |
if prompt_ratings: | |
PROMPTS[prompt_id].avg_rating = sum(prompt_ratings) / len(prompt_ratings) | |
PROMPTS[prompt_id].updated_at = datetime.now().isoformat() | |
# Dependency to get the database session | |
def get_db(): | |
db = SessionLocal() | |
try: | |
yield db | |
finally: | |
db.close() | |
# User authentication and session management | |
def get_session_user(session_id: Optional[str] = Cookie(None)) -> Optional[UserInfo]: | |
"""Get the user from the session""" | |
if not session_id or session_id not in SESSIONS: | |
return None | |
session = SESSIONS[session_id] | |
if datetime.now() > session["expires"]: | |
# Session expired | |
del SESSIONS[session_id] | |
return None | |
# Update session expiration | |
session["expires"] = datetime.now() + timedelta(hours=24) | |
return session["user"] | |
def require_session_user(admin_required: bool = False, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
"""Require a logged-in user, optionally requiring admin role""" | |
if not session_user: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Authentication required", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
if admin_required and getattr(session_user, "role", "") != "admin": | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, | |
detail="Admin access required", | |
) | |
return session_user | |
def verify_api_key(api_key: str = Depends(api_key_header), | |
credentials: HTTPAuthorizationCredentials = Depends(security)) -> UserInfo: | |
"""Verify API key from header or Bearer token""" | |
# Try to get the API key from the header | |
key = api_key | |
# If not found, try to get it from the Bearer token | |
if not key and credentials: | |
key = credentials.credentials | |
# Check if the key is valid | |
if not key or key not in API_KEYS: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Invalid API key", | |
headers={"WWW-Authenticate": "Bearer"}, | |
) | |
return API_KEYS[key]["user"] | |
# Credit packages configuration | |
CREDIT_PACKAGES = [ | |
{ | |
"id": "starter", | |
"name": "Starter Pack", | |
"description": "Perfect for trying out our AI tools", | |
"credits": 100, | |
"price": 49.99 | |
}, | |
{ | |
"id": "pro", | |
"name": "Pro Pack", | |
"description": "Most popular choice for regular users", | |
"credits": 500, | |
"price": 199.99 | |
}, | |
{ | |
"id": "enterprise", | |
"name": "Enterprise Pack", | |
"description": "Best value for power users", | |
"credits": 2000, | |
"price": 690.99 | |
} | |
] | |
# Mock user data (replace with database in production) | |
users = {} | |
# Routes | |
async def home(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
"""Home page with tools list""" | |
return templates.TemplateResponse( | |
"index.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"tools": TOOLS, | |
"session_user": session_user, | |
"user": session_user, # Add user for consistency with other templates | |
"user_credits": session_user.credits if session_user else 0 | |
} | |
) | |
async def tool_page( | |
tool_id: str, | |
request: Request, | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Tool-specific page to execute a specific AI tool""" | |
# Get the tool | |
tool = next((t for t in TOOLS if t.id == tool_id), None) | |
if not tool: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Tool Not Found", | |
"error_description": "The requested tool does not exist.", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
# Get example prompts for this tool | |
suggestions = tool.example_prompts if hasattr(tool, 'example_prompts') else [] | |
# Check if the user has enough credits | |
has_enough_credits = session_user and session_user.credits >= tool.cost if session_user else False | |
# Get available providers directly from the tool | |
providers = tool.providers | |
return templates.TemplateResponse( | |
"tool.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"tool": tool, | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"has_enough_credits": has_enough_credits, | |
"suggestions": suggestions, | |
"providers": providers, | |
"tools": TOOLS # Include all tools for sidebar navigation | |
} | |
) | |
async def result_page( | |
request: Request, | |
result_id: str, | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Result display page""" | |
# In a real app, you'd fetch the result from a database | |
# Here, we'll use a mock result for demonstration | |
result = { | |
"id": result_id, | |
"type": "text", # or "image" or "code" | |
"provider": "openai", | |
"model": "gpt-4", | |
"prompt": "Write a short poem about AI", | |
"result": """ | |
Silicon dreams in neural space, | |
Learning patterns, quickening pace. | |
Algorithms dance with grace divine, | |
In zeros and ones, intelligence shine. | |
Human-made yet growing beyond, | |
Of our creations, we grow fond. | |
Partners in progress, AI and we, | |
Crafting together what's yet to be. | |
""", | |
"created_at": datetime.now().isoformat(), | |
"response_time": 2.3 # seconds | |
} | |
return templates.TemplateResponse( | |
"result.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"result": result, | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0 | |
} | |
) | |
async def marketplace_page( | |
request: Request, | |
category: Optional[str] = None, | |
sort_by: str = "popular", | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Prompt marketplace page""" | |
prompts = marketplace.list_marketplace_prompts(category=category, sort_by=sort_by) | |
return templates.TemplateResponse( | |
"marketplace.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"prompts": prompts, | |
"category": category, | |
"sort_by": sort_by, | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0 | |
} | |
) | |
async def ad_page( | |
request: Request, | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Ad reward page""" | |
return templates.TemplateResponse( | |
"ad_reward.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0 | |
} | |
) | |
async def login_page(request: Request): | |
"""Login page""" | |
return templates.TemplateResponse( | |
"login.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub" | |
} | |
) | |
async def login( | |
request: Request, | |
username: str = Form(...), | |
password: str = Form(...) | |
): | |
"""Process login""" | |
# Check if admin login | |
if username == "admin": | |
# Check admin credentials | |
admin_key = API_KEYS.get("admin_key") | |
if not admin_key or admin_key.get("password") != password: | |
# Admin login failure | |
return templates.TemplateResponse( | |
"login.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error": "Invalid admin credentials" | |
}, | |
status_code=400 | |
) | |
# Admin login success | |
user = admin_key["user"] | |
else: | |
# For demo purposes, any username/password combo works | |
# In production, you'd verify against a database | |
# Create or update user | |
user_id = f"user_{username.lower().replace(' ', '_')}" | |
user = UserInfo( | |
id=user_id, | |
username=username, | |
email=f"{username.lower().replace(' ', '.')}@example.com", | |
last_login=datetime.now().isoformat() | |
) | |
# Create session | |
session_id = secrets.token_urlsafe(32) | |
SESSIONS[session_id] = { | |
"user": user, | |
"expires": datetime.now() + timedelta(hours=24) | |
} | |
# Create redirect response | |
response = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
# Set session cookie | |
response.set_cookie( | |
key="session_id", | |
value=session_id, | |
max_age=86400, # 24 hours | |
httponly=True, | |
samesite="lax" | |
) | |
return response | |
async def logout( | |
response: Response, | |
session_id: Optional[str] = Cookie(None) | |
): | |
"""Process logout""" | |
if session_id and session_id in SESSIONS: | |
del SESSIONS[session_id] | |
response.delete_cookie(key="session_id") | |
return RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
async def credits_page(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
"""Credits management page""" | |
if not session_user: | |
return RedirectResponse(url="/login") | |
# Get credit history (mock data - replace with database in production) | |
credit_history = [ | |
{ | |
"date": "2024-03-20 14:30", | |
"type": "Daily Reward", | |
"amount": 5, | |
"details": "Watched daily ad" | |
}, | |
{ | |
"date": "2024-03-19 15:45", | |
"type": "Tool Usage", | |
"amount": -5, | |
"details": "Used Image Generation" | |
} | |
] | |
return templates.TemplateResponse( | |
"credits.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"user": session_user, | |
"user_credits": session_user.credits, | |
"credit_packages": CREDIT_PACKAGES, | |
"credit_history": credit_history, | |
"referral_link": f"{request.base_url}?ref={session_user.id}" | |
} | |
) | |
async def get_ads_status(request: Request, session_user: Optional[UserInfo] = Depends(get_session_user)): | |
"""Get user's ad reward status""" | |
if not session_user: | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"success": False, "error": "Authentication required"} | |
) | |
# Get daily reward status | |
daily_reward = DAILY_REWARDS.get(f"{session_user.id}_{datetime.now().date().isoformat()}") | |
can_claim_daily = True | |
cooldown_remaining = 0 | |
if daily_reward and daily_reward.last_reward: | |
last_reward_time = datetime.fromisoformat(daily_reward.last_reward) | |
if (datetime.now() - last_reward_time).total_seconds() < 3600: # 1 hour cooldown | |
can_claim_daily = False | |
cooldown_remaining = 3600 - (datetime.now() - last_reward_time).total_seconds() | |
# Get special reward status | |
special_reward = SPECIAL_REWARDS.get(f"{session_user.id}_{datetime.now().date().isoformat()}") | |
can_claim_special = True | |
if special_reward and special_reward.claimed: | |
can_claim_special = False | |
return JSONResponse( | |
content={ | |
"success": True, | |
"daily_rewards": { | |
"count": daily_reward.count if daily_reward else 0, | |
"can_claim": can_claim_daily and (not daily_reward or daily_reward.count < 3), | |
"cooldown_remaining": int(cooldown_remaining) | |
}, | |
"special_reward": { | |
"claimed": special_reward.claimed if special_reward else False, | |
"can_claim": can_claim_special | |
} | |
} | |
) | |
async def claim_ad_reward( | |
request: Request, | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Process ad reward claim""" | |
if not session_user: | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"success": False, "error": "Authentication required"} | |
) | |
data = await request.json() | |
reward_type = data.get("type") | |
if reward_type not in ["daily", "special"]: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "Invalid reward type"} | |
) | |
today = datetime.now().date().isoformat() | |
if reward_type == "daily": | |
# Get or create daily reward record | |
daily_key = f"{session_user.id}_{today}" | |
if daily_key not in DAILY_REWARDS: | |
DAILY_REWARDS[daily_key] = DailyReward( | |
user_id=session_user.id, | |
date=today, | |
count=0 | |
) | |
daily_reward = DAILY_REWARDS[daily_key] | |
# Check if user has already claimed maximum daily rewards | |
if daily_reward.count >= 3: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "Daily reward limit reached"} | |
) | |
# Check cooldown period | |
if daily_reward.last_reward: | |
last_reward_time = datetime.fromisoformat(daily_reward.last_reward) | |
if (datetime.now() - last_reward_time).total_seconds() < 3600: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "Please wait 1 hour between rewards"} | |
) | |
# Award credits | |
reward_amount = 10 | |
session_user.credits += reward_amount | |
daily_reward.count += 1 | |
daily_reward.last_reward = datetime.now().isoformat() | |
return JSONResponse( | |
content={ | |
"success": True, | |
"credits_earned": reward_amount, | |
"current_credits": session_user.credits, | |
"daily_progress": daily_reward.count | |
} | |
) | |
else: # special reward | |
# Get or create special reward record | |
special_key = f"{session_user.id}_{today}" | |
if special_key not in SPECIAL_REWARDS: | |
SPECIAL_REWARDS[special_key] = SpecialReward( | |
user_id=session_user.id, | |
date=today, | |
claimed=False | |
) | |
special_reward = SPECIAL_REWARDS[special_key] | |
# Check if user has already claimed special reward | |
if special_reward.claimed: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "Special reward already claimed"} | |
) | |
# Award credits | |
reward_amount = 25 | |
session_user.credits += reward_amount | |
special_reward.claimed = True | |
special_reward.claimed_at = datetime.now().isoformat() | |
return JSONResponse( | |
content={ | |
"success": True, | |
"credits_earned": reward_amount, | |
"current_credits": session_user.credits | |
} | |
) | |
async def create_admin( | |
request: Request, | |
response: Response, | |
): | |
"""Create an admin user (only for development)""" | |
# Check if admin already exists | |
admin_exists = False | |
admin_user = None | |
for session in SESSIONS.values(): | |
if session.get("user") and session["user"].username == "admin": | |
admin_exists = True | |
admin_user = session["user"] | |
break | |
if admin_exists: | |
# Update admin credits | |
admin_user.credits = 10000 | |
admin_user.role = "admin" | |
return JSONResponse( | |
content={ | |
"success": True, | |
"message": "Admin user updated with 10000 credits", | |
"user_id": admin_user.id | |
} | |
) | |
# Create admin user | |
user_id = "admin" | |
user = UserInfo( | |
id=user_id, | |
username="admin", | |
email="[email protected]", | |
credits=10000, | |
role="admin", | |
last_login=datetime.now().isoformat() | |
) | |
# In production, you would hash the password | |
# For simplicity, we'll store it in a way that the login function can use it | |
# This is for development only, never do this in production! | |
admin_info = { | |
"user": user, | |
"password": "admin123", # Plaintext for demo only | |
"created_at": datetime.now().isoformat() | |
} | |
# Store admin info for login | |
API_KEYS["admin_key"] = admin_info | |
return JSONResponse( | |
content={ | |
"success": True, | |
"message": "Admin user created with 10000 credits", | |
"user_id": user_id, | |
"username": "admin", | |
"password": "admin123" | |
} | |
) | |
async def admin_dashboard( | |
request: Request, | |
user: UserInfo = Depends(lambda: require_session_user(admin_required=True)) | |
): | |
"""Admin dashboard""" | |
# Get all users | |
users = [] | |
for session_id, session in SESSIONS.items(): | |
if "user" in session: | |
users.append({ | |
"id": session["user"].id, | |
"username": session["user"].username, | |
"email": session["user"].email, | |
"credits": session["user"].credits, | |
"role": getattr(session["user"], "role", "user"), | |
"session_id": session_id, | |
"expires": session["expires"].isoformat() if "expires" in session else None | |
}) | |
# Sort users by credits (descending) | |
users.sort(key=lambda x: x["credits"], reverse=True) | |
return templates.TemplateResponse( | |
"admin_dashboard.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"user": user, | |
"user_credits": user.credits, | |
"users": users, | |
"total_users": len(users), | |
"total_credits": sum(u["credits"] for u in users), | |
"admin_count": sum(1 for u in users if u["role"] == "admin") | |
} | |
) | |
async def admin_update_credits( | |
request: Request, | |
user: UserInfo = Depends(lambda: require_session_user(admin_required=True)) | |
): | |
"""Admin endpoint to update user credits""" | |
data = await request.json() | |
user_id = data.get("user_id") | |
new_credits = data.get("credits") | |
if not user_id or new_credits is None: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "User ID and credits are required"} | |
) | |
# Find the user | |
target_user = None | |
for session in SESSIONS.values(): | |
if "user" in session and session["user"].id == user_id: | |
target_user = session["user"] | |
break | |
if not target_user: | |
return JSONResponse( | |
status_code=status.HTTP_404_NOT_FOUND, | |
content={"success": False, "error": "User not found"} | |
) | |
# Update credits | |
try: | |
target_user.credits = float(new_credits) | |
except ValueError: | |
return JSONResponse( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
content={"success": False, "error": "Invalid credit amount"} | |
) | |
return JSONResponse( | |
content={ | |
"success": True, | |
"user_id": user_id, | |
"new_credits": target_user.credits, | |
"message": f"Credits updated for user {target_user.username}" | |
} | |
) | |
async def process_request( | |
request: Request, | |
tool_id: str = Form(...), | |
prompt: str = Form(...), | |
provider: str = Form("openai"), | |
model: str = Form("default"), | |
session_user: Optional[UserInfo] = Depends(get_session_user) | |
): | |
"""Process a tool request from a form""" | |
# Get the tool | |
tool = next((t for t in TOOLS if t.id == tool_id), None) | |
if not tool: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Tool Not Found", | |
"error_description": "The requested tool does not exist.", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
# Confirm that user is logged in | |
if not session_user: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Login Required", | |
"error_description": "You must be logged in to use AI tools.", | |
"user": None, | |
"user_credits": 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
# Check if the user has enough credits | |
if session_user.credits < tool.cost: | |
return templates.TemplateResponse( | |
"ad_reward.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"user": session_user, | |
"user_credits": session_user.credits, | |
"tool": tool, | |
"reward_ad": ads_manager.get_ad_code("reward_video"), | |
"sidebar_ad": ads_manager.get_ad_code("sidebar"), | |
"return_url": f"/tool/{tool_id}", | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
# Get the provider | |
try: | |
provider_instance = get_provider(provider) | |
if not provider_instance: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Provider Not Supported", | |
"error_description": f"The provider '{provider}' is not supported.", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
result = None | |
result_type = "text" # Default type | |
# Determine result type based on tool ID | |
if "image" in tool.id or "logo" in tool.id or "avatar" in tool.id: | |
result_type = "image" | |
elif "code" in tool.id or "debugging" in tool.id: | |
result_type = "code" | |
# Process the request based on tool type | |
try: | |
if result_type == "text": | |
# Text generation tool | |
result = provider_instance.generate_text( | |
prompt=prompt, | |
model=model if model != "default" else "mistralai/Mistral-7B-Instruct-v0.2", | |
max_tokens=1000, | |
temperature=0.7 | |
) | |
elif result_type == "image": | |
# Image generation tool | |
try: | |
result = provider_instance.generate_image( | |
prompt=prompt, | |
model=model if model != "default" else "stabilityai/stable-diffusion-xl-base-1.0" | |
) | |
except AttributeError: | |
# Provider doesn't support image generation | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Provider Not Supported", | |
"error_description": f"The provider '{provider}' does not support image generation.", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
elif result_type == "code": | |
# Code generation tool | |
result = provider_instance.generate_text( | |
prompt=prompt, | |
model=model if model != "default" else "gpt2", | |
max_tokens=1500, | |
temperature=0.5 | |
) | |
else: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Tool Type Not Supported", | |
"error_description": f"The tool type '{result_type}' is not supported.", | |
"user": session_user, | |
"user_credits": session_user.credits if session_user else 0, | |
"tools": TOOLS # Include for consistent sidebar navigation | |
} | |
) | |
# Deduct credits | |
session_user.credits -= tool.cost | |
# Return the result | |
return templates.TemplateResponse( | |
"result.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"user": session_user, | |
"user_credits": session_user.credits, | |
"tool": tool, | |
"prompt": prompt, | |
"result": result, | |
"result_type": result_type, | |
"tools": TOOLS, # Include all tools for sidebar navigation | |
"json_data": result.get("raw_response", {}) if isinstance(result, dict) else {} | |
} | |
) | |
except Exception as generate_error: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Generation Error", | |
"error_description": f"Error generating content: {str(generate_error)}", | |
"user": session_user, | |
"user_credits": session_user.credits, | |
"tools": TOOLS # Include all tools for sidebar navigation | |
} | |
) | |
except Exception as provider_error: | |
return templates.TemplateResponse( | |
"error.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error_title": "Provider Error", | |
"error_description": f"Error with provider: {str(provider_error)}", | |
"user": session_user, | |
"user_credits": session_user.credits, | |
"tools": TOOLS # Include all tools for sidebar navigation | |
} | |
) | |
async def register( | |
request: Request, | |
response: Response, | |
username: str = Form(...), | |
email: str = Form(...), | |
password: str = Form(...), | |
session_id: Optional[str] = Cookie(None) | |
): | |
"""Process registration""" | |
# Check if username or email already exists (in production use a database) | |
for session in SESSIONS.values(): | |
if session.get("user"): | |
if session["user"].username.lower() == username.lower(): | |
return templates.TemplateResponse( | |
"register.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error": "Username already exists", | |
"form_data": {"username": username, "email": email} | |
}, | |
status_code=400 | |
) | |
if session["user"].email.lower() == email.lower(): | |
return templates.TemplateResponse( | |
"register.html", | |
{ | |
"request": request, | |
"app_name": "AI Tool Hub", | |
"error": "Email already exists", | |
"form_data": {"username": username, "email": email} | |
}, | |
status_code=400 | |
) | |
# Create user | |
user_id = f"user_{username.lower().replace(' ', '_')}" | |
# Check if there's an existing temporary session to convert | |
is_converting_temp = False | |
temp_credits = 0 | |
if session_id and session_id in SESSIONS: | |
session = SESSIONS[session_id] | |
if session.get("is_temporary", False): | |
# Convert temporary session to permanent | |
is_converting_temp = True | |
temp_credits = session["user"].credits | |
# Delete the temporary session | |
del SESSIONS[session_id] | |
# Create new user with starting credits (more if converting from temp) | |
start_credits = 10.0 + temp_credits | |
user = UserInfo( | |
id=user_id, | |
username=username, | |
email=email, | |
credits=start_credits, | |
last_login=datetime.now().isoformat() | |
) | |
# In production, you would hash the password and store in database | |
# Create new session | |
new_session_id = secrets.token_urlsafe(32) | |
SESSIONS[new_session_id] = { | |
"user": user, | |
"expires": datetime.now() + timedelta(hours=24) | |
} | |
# Set session cookie | |
response = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) | |
response.set_cookie( | |
key="session_id", | |
value=new_session_id, | |
max_age=86400, # 24 hoursgj | |
httponly=True, | |
samesite="lax" | |
) | |
return response | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8006) |