File size: 6,306 Bytes
8daf03a 05f583d 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a 05f583d 8daf03a 05f583d 8daf03a 05f583d a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a a8da4e0 8daf03a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import numpy as np
import base64
import logging
import sys
import traceback
import io
from PIL import Image
import json
from faceforge_core.latent_explorer import LatentSpaceExplorer
from faceforge_core.attribute_directions import LatentDirectionFinder
from faceforge_core.custom_loss import attribute_preserving_loss
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("faceforge_api")
# --- Models for API ---
class PointIn(BaseModel):
text: str
encoding: Optional[List[float]] = Field(None)
xy_pos: Optional[List[float]] = Field(None)
class GenerateRequest(BaseModel):
prompts: List[str]
positions: Optional[List[List[float]]] = Field(None)
mode: str = "distance"
player_pos: Optional[List[float]] = Field(None)
class ManipulateRequest(BaseModel):
encoding: List[float]
direction: List[float]
alpha: float
class AttributeDirectionRequest(BaseModel):
latents: List[List[float]]
labels: Optional[List[int]] = Field(None)
n_components: Optional[int] = 10
# --- FastAPI app ---
app = FastAPI()
# Add CORS middleware to allow requests from any origin
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global explorer instance
explorer = LatentSpaceExplorer()
# Error handling middleware
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
try:
return await call_next(request)
except Exception as e:
logger.error(f"Unhandled exception: {str(e)}")
logger.debug(traceback.format_exc())
return JSONResponse(
status_code=500,
content={"detail": "Internal server error", "error": str(e)},
)
@app.get("/")
def read_root():
logger.debug("Root endpoint called")
return {"message": "FaceForge API is running"}
@app.post("/generate")
async def generate_image(req: GenerateRequest):
try:
logger.debug(f"Generate image request: {json.dumps(req.dict(), default=str)}")
# Log request schema for debugging
logger.debug(f"Request schema: {GenerateRequest.schema_json()}")
# Clear existing points
explorer.points = []
# Add points for each prompt
for i, prompt in enumerate(req.prompts):
logger.debug(f"Processing prompt {i}: {prompt}")
# Generate a mock encoding (in production, this would use a real model)
encoding = np.random.randn(512) # Stub: replace with real encoding
# Get position if provided, otherwise None
xy_pos = req.positions[i] if req.positions and i < len(req.positions) else None
logger.debug(f"Position for prompt {i}: {xy_pos}")
# Add point to explorer
explorer.add_point(prompt, encoding, xy_pos)
# Get player position
if req.player_pos is None:
player_pos = [0.0, 0.0]
else:
player_pos = req.player_pos
logger.debug(f"Player position: {player_pos}")
# Sample encoding
logger.debug(f"Sampling with mode: {req.mode}")
sampled = explorer.sample_encoding(tuple(player_pos), mode=req.mode)
# Generate mock image (in production, this would use the sampled encoding)
img = (np.random.rand(256, 256, 3) * 255).astype(np.uint8)
# Convert to base64
logger.debug("Converting image to base64")
pil_img = Image.fromarray(img)
buffer = io.BytesIO()
pil_img.save(buffer, format="PNG")
img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
# Prepare response
response = {"status": "success", "image": img_b64}
logger.debug(f"Response structure: {list(response.keys())}")
logger.debug(f"Image base64 length: {len(img_b64)}")
logger.debug("Image generated successfully")
return response
except Exception as e:
logger.error(f"Error in generate_image: {str(e)}")
logger.debug(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/manipulate")
def manipulate(req: ManipulateRequest):
try:
logger.debug(f"Manipulate request: {json.dumps(req.dict(), default=str)}")
encoding = np.array(req.encoding)
direction = np.array(req.direction)
manipulated = encoding + req.alpha * direction
logger.debug("Manipulation successful")
return {"manipulated_encoding": manipulated.tolist()}
except Exception as e:
logger.error(f"Error in manipulate: {str(e)}")
logger.debug(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e))
@app.post("/attribute_direction")
def attribute_direction(req: AttributeDirectionRequest):
try:
logger.debug(f"Attribute direction request: {json.dumps(req.dict(), default=str)}")
latents = np.array(req.latents)
finder = LatentDirectionFinder(latents)
if req.labels is not None:
logger.debug("Using classifier-based direction finding")
direction = finder.classifier_direction(req.labels)
logger.debug("Direction found successfully")
return {"direction": direction.tolist()}
else:
logger.debug(f"Using PCA with {req.n_components} components")
components, explained = finder.pca_direction(n_components=req.n_components)
logger.debug("PCA completed successfully")
return {"components": components.tolist(), "explained_variance": explained.tolist()}
except Exception as e:
logger.error(f"Error in attribute_direction: {str(e)}")
logger.debug(traceback.format_exc())
raise HTTPException(status_code=500, detail=str(e)) |