Spaces:
Running
Running
from fastapi import FastAPI, File, UploadFile, Request, HTTPException, Form, Depends, status | |
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
import shutil | |
import os | |
import uuid | |
import base64 | |
from pathlib import Path | |
import uvicorn | |
from typing import List, Optional | |
import secrets | |
from starlette.middleware.sessions import SessionMiddleware | |
from fastapi.security import OAuth2PasswordRequestForm | |
from fastapi.responses import JSONResponse | |
# Create FastAPI app | |
app = FastAPI(title="Image Uploader") | |
# Add session middleware | |
app.add_middleware( | |
SessionMiddleware, | |
secret_key="YOUR_SECRET_KEY_CHANGE_THIS_IN_PRODUCTION" | |
) | |
# Create uploads directory if it doesn't exist | |
UPLOAD_DIR = Path("static/uploads") | |
UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
# Mount static directory | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Set up Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# Set up security | |
security = HTTPBasic() | |
# Hardcoded credentials (in a real app, use proper hashed passwords in a database) | |
USERNAME = "detomo" | |
PASSWORD = "itweek2025" | |
def get_file_extension(filename: str) -> str: | |
"""Get the file extension from a filename.""" | |
return os.path.splitext(filename)[1].lower() | |
def is_valid_image(extension: str) -> bool: | |
"""Check if the file extension is a valid image type.""" | |
return extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] | |
def authenticate(request: Request): | |
"""Check if user is authenticated.""" | |
is_authenticated = request.session.get("authenticated", False) | |
return is_authenticated | |
def verify_auth(request: Request): | |
"""Verify authentication.""" | |
if not authenticate(request): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail="Not authenticated", | |
headers={"WWW-Authenticate": "Basic"}, | |
) | |
return True | |
async def login_page(request: Request): | |
"""Render the login page.""" | |
# If already authenticated, redirect to home | |
if authenticate(request): | |
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
return templates.TemplateResponse( | |
"login.html", | |
{"request": request} | |
) | |
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()): | |
"""Handle login form submission.""" | |
if form_data.username == USERNAME and form_data.password == PASSWORD: | |
request.session["authenticated"] = True | |
return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
else: | |
return templates.TemplateResponse( | |
"login.html", | |
{"request": request, "error": "Invalid username or password"} | |
) | |
async def logout(request: Request): | |
"""Handle logout.""" | |
request.session.pop("authenticated", None) | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
async def home(request: Request): | |
"""Render the home page with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
# Get all uploaded images | |
uploaded_images = [] | |
if UPLOAD_DIR.exists(): | |
for file in UPLOAD_DIR.iterdir(): | |
if is_valid_image(get_file_extension(file.name)): | |
image_url = f"/static/uploads/{file.name}" | |
uploaded_images.append({ | |
"name": file.name, | |
"url": image_url, | |
"embed_url": f"{request.base_url}static/uploads/{file.name}" | |
}) | |
return templates.TemplateResponse( | |
"index.html", | |
{"request": request, "uploaded_images": uploaded_images} | |
) | |
async def upload_image(request: Request, file: UploadFile = File(...)): | |
"""Handle image upload with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
# Check if the file is an image | |
extension = get_file_extension(file.filename) | |
if not is_valid_image(extension): | |
raise HTTPException(status_code=400, detail="Only image files are allowed") | |
# Generate a unique filename to prevent overwrites | |
unique_filename = f"{uuid.uuid4()}{extension}" | |
file_path = UPLOAD_DIR / unique_filename | |
# Save the file | |
with file_path.open("wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
# Return the file URL and embed code | |
file_url = f"/static/uploads/{unique_filename}" | |
# For base64 encoding | |
file.file.seek(0) # Reset file pointer to beginning | |
contents = await file.read() | |
base64_encoded = base64.b64encode(contents).decode("utf-8") | |
# Determine MIME type | |
mime_type = { | |
'.jpg': 'image/jpeg', | |
'.jpeg': 'image/jpeg', | |
'.png': 'image/png', | |
'.gif': 'image/gif', | |
'.bmp': 'image/bmp', | |
'.webp': 'image/webp' | |
}.get(extension, 'application/octet-stream') | |
return { | |
"success": True, | |
"file_name": unique_filename, | |
"file_url": file_url, | |
"full_url": f"{request.base_url}static/uploads/{unique_filename}", | |
"embed_html": f'<img src="{request.base_url}static/uploads/{unique_filename}" alt="Uploaded Image" />', | |
"base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", | |
"base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="Embedded Image" />' | |
} | |
async def view_image(request: Request, file_name: str): | |
"""View a specific image with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
file_path = UPLOAD_DIR / file_name | |
if not file_path.exists(): | |
raise HTTPException(status_code=404, detail="Image not found") | |
image_url = f"/static/uploads/{file_name}" | |
embed_url = f"{request.base_url}static/uploads/{file_name}" | |
return templates.TemplateResponse( | |
"view.html", | |
{ | |
"request": request, | |
"image_url": image_url, | |
"file_name": file_name, | |
"embed_url": embed_url | |
} | |
) | |
async def delete_image(request: Request, file_name: str): | |
"""Delete an image with authentication check.""" | |
# Check if user is authenticated | |
if not authenticate(request): | |
return JSONResponse( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
content={"detail": "Not authenticated"} | |
) | |
file_path = UPLOAD_DIR / file_name | |
if not file_path.exists(): | |
raise HTTPException(status_code=404, detail="Image not found") | |
os.remove(file_path) | |
return {"success": True, "message": f"Image {file_name} has been deleted"} | |
# Health check endpoint for Hugging Face Spaces | |
async def health_check(): | |
return {"status": "ok"} | |
if __name__ == "__main__": | |
# For local development | |
uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True) | |
# For production/Hugging Face (uncomment when deploying) | |
# uvicorn.run("app:app", host="0.0.0.0", port=7860) |