from fastapi import FastAPI, File, UploadFile, Request, HTTPException, Form, Depends, status from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.security import HTTPBasic, HTTPBasicCredentials import shutil import os import uuid import base64 from pathlib import Path import uvicorn from typing import List, Optional import secrets from starlette.middleware.sessions import SessionMiddleware from fastapi.security import OAuth2PasswordRequestForm from fastapi.responses import JSONResponse # Create FastAPI app app = FastAPI(title="Image Uploader") # Add session middleware app.add_middleware( SessionMiddleware, secret_key="YOUR_SECRET_KEY_CHANGE_THIS_IN_PRODUCTION" ) # Create uploads directory if it doesn't exist UPLOAD_DIR = Path("static/uploads") UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # Mount static directory app.mount("/static", StaticFiles(directory="static"), name="static") # Set up Jinja2 templates templates = Jinja2Templates(directory="templates") # Set up security security = HTTPBasic() # Hardcoded credentials (in a real app, use proper hashed passwords in a database) USERNAME = "detomo" PASSWORD = "itweek2025" def get_file_extension(filename: str) -> str: """Get the file extension from a filename.""" return os.path.splitext(filename)[1].lower() def is_valid_image(extension: str) -> bool: """Check if the file extension is a valid image type.""" return extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] def authenticate(request: Request): """Check if user is authenticated.""" is_authenticated = request.session.get("authenticated", False) return is_authenticated def verify_auth(request: Request): """Verify authentication.""" if not authenticate(request): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Basic"}, ) return True @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Render the login page.""" # If already authenticated, redirect to home if authenticate(request): return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) return templates.TemplateResponse( "login.html", {"request": request} ) @app.post("/login") async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()): """Handle login form submission.""" if form_data.username == USERNAME and form_data.password == PASSWORD: request.session["authenticated"] = True return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) else: return templates.TemplateResponse( "login.html", {"request": request, "error": "Invalid username or password"} ) @app.get("/logout") async def logout(request: Request): """Handle logout.""" request.session.pop("authenticated", None) return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) @app.get("/", response_class=HTMLResponse) async def home(request: Request): """Render the home page with authentication check.""" # Check if user is authenticated if not authenticate(request): return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) # Get all uploaded images uploaded_images = [] if UPLOAD_DIR.exists(): for file in UPLOAD_DIR.iterdir(): if is_valid_image(get_file_extension(file.name)): image_url = f"/static/uploads/{file.name}" uploaded_images.append({ "name": file.name, "url": image_url, "embed_url": f"{request.base_url}static/uploads/{file.name}" }) return templates.TemplateResponse( "index.html", {"request": request, "uploaded_images": uploaded_images} ) @app.post("/upload/") async def upload_image(request: Request, file: UploadFile = File(...)): """Handle image upload with authentication check.""" # Check if user is authenticated if not authenticate(request): return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Not authenticated"} ) # Check if the file is an image extension = get_file_extension(file.filename) if not is_valid_image(extension): raise HTTPException(status_code=400, detail="Only image files are allowed") # Generate a unique filename to prevent overwrites unique_filename = f"{uuid.uuid4()}{extension}" file_path = UPLOAD_DIR / unique_filename # Save the file with file_path.open("wb") as buffer: shutil.copyfileobj(file.file, buffer) # Return the file URL and embed code file_url = f"/static/uploads/{unique_filename}" # For base64 encoding file.file.seek(0) # Reset file pointer to beginning contents = await file.read() base64_encoded = base64.b64encode(contents).decode("utf-8") # Determine MIME type mime_type = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp' }.get(extension, 'application/octet-stream') return { "success": True, "file_name": unique_filename, "file_url": file_url, "full_url": f"{request.base_url}static/uploads/{unique_filename}", "embed_html": f'Uploaded Image', "base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", "base64_embed": f'Embedded Image' } @app.get("/view/{file_name}") async def view_image(request: Request, file_name: str): """View a specific image with authentication check.""" # Check if user is authenticated if not authenticate(request): return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) file_path = UPLOAD_DIR / file_name if not file_path.exists(): raise HTTPException(status_code=404, detail="Image not found") image_url = f"/static/uploads/{file_name}" embed_url = f"{request.base_url}static/uploads/{file_name}" return templates.TemplateResponse( "view.html", { "request": request, "image_url": image_url, "file_name": file_name, "embed_url": embed_url } ) @app.delete("/delete/{file_name}") async def delete_image(request: Request, file_name: str): """Delete an image with authentication check.""" # Check if user is authenticated if not authenticate(request): return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"detail": "Not authenticated"} ) file_path = UPLOAD_DIR / file_name if not file_path.exists(): raise HTTPException(status_code=404, detail="Image not found") os.remove(file_path) return {"success": True, "message": f"Image {file_name} has been deleted"} # Health check endpoint for Hugging Face Spaces @app.get("/healthz") async def health_check(): return {"status": "ok"} if __name__ == "__main__": # For local development uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True) # For production/Hugging Face (uncomment when deploying) # uvicorn.run("app:app", host="0.0.0.0", port=7860)