File size: 6,306 Bytes
8daf03a
05f583d
8daf03a
a8da4e0
 
8daf03a
 
 
 
 
 
 
a8da4e0
8daf03a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a8da4e0
 
8daf03a
 
 
a8da4e0
8daf03a
a8da4e0
8daf03a
 
 
 
 
 
 
 
a8da4e0
8daf03a
 
 
05f583d
 
 
8daf03a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
05f583d
 
8daf03a
05f583d
 
 
a8da4e0
8daf03a
a8da4e0
 
 
 
8daf03a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a8da4e0
 
 
 
 
8daf03a
a8da4e0
8daf03a
 
 
 
 
 
 
 
 
a8da4e0
8daf03a
 
 
 
 
 
 
 
 
 
 
 
 
a8da4e0
8daf03a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import numpy as np
import base64
import logging
import sys
import traceback
import io
from PIL import Image
import json

from faceforge_core.latent_explorer import LatentSpaceExplorer
from faceforge_core.attribute_directions import LatentDirectionFinder
from faceforge_core.custom_loss import attribute_preserving_loss

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("faceforge_api")

# --- Models for API ---

class PointIn(BaseModel):
    text: str
    encoding: Optional[List[float]] = Field(None)
    xy_pos: Optional[List[float]] = Field(None)

class GenerateRequest(BaseModel):
    prompts: List[str]
    positions: Optional[List[List[float]]] = Field(None)
    mode: str = "distance"
    player_pos: Optional[List[float]] = Field(None)

class ManipulateRequest(BaseModel):
    encoding: List[float]
    direction: List[float]
    alpha: float

class AttributeDirectionRequest(BaseModel):
    latents: List[List[float]]
    labels: Optional[List[int]] = Field(None)
    n_components: Optional[int] = 10

# --- FastAPI app ---

app = FastAPI()

# Add CORS middleware to allow requests from any origin
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Global explorer instance
explorer = LatentSpaceExplorer()

# Error handling middleware
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
    try:
        return await call_next(request)
    except Exception as e:
        logger.error(f"Unhandled exception: {str(e)}")
        logger.debug(traceback.format_exc())
        return JSONResponse(
            status_code=500,
            content={"detail": "Internal server error", "error": str(e)},
        )

@app.get("/")
def read_root():
    logger.debug("Root endpoint called")
    return {"message": "FaceForge API is running"}

@app.post("/generate")
async def generate_image(req: GenerateRequest):
    try:
        logger.debug(f"Generate image request: {json.dumps(req.dict(), default=str)}")
        
        # Log request schema for debugging
        logger.debug(f"Request schema: {GenerateRequest.schema_json()}")
        
        # Clear existing points
        explorer.points = []
        
        # Add points for each prompt
        for i, prompt in enumerate(req.prompts):
            logger.debug(f"Processing prompt {i}: {prompt}")
            
            # Generate a mock encoding (in production, this would use a real model)
            encoding = np.random.randn(512)  # Stub: replace with real encoding
            
            # Get position if provided, otherwise None
            xy_pos = req.positions[i] if req.positions and i < len(req.positions) else None
            logger.debug(f"Position for prompt {i}: {xy_pos}")
            
            # Add point to explorer
            explorer.add_point(prompt, encoding, xy_pos)
        
        # Get player position
        if req.player_pos is None:
            player_pos = [0.0, 0.0]
        else:
            player_pos = req.player_pos
        logger.debug(f"Player position: {player_pos}")
        
        # Sample encoding
        logger.debug(f"Sampling with mode: {req.mode}")
        sampled = explorer.sample_encoding(tuple(player_pos), mode=req.mode)
        
        # Generate mock image (in production, this would use the sampled encoding)
        img = (np.random.rand(256, 256, 3) * 255).astype(np.uint8)
        
        # Convert to base64
        logger.debug("Converting image to base64")
        pil_img = Image.fromarray(img)
        buffer = io.BytesIO()
        pil_img.save(buffer, format="PNG")
        img_b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
        
        # Prepare response
        response = {"status": "success", "image": img_b64}
        logger.debug(f"Response structure: {list(response.keys())}")
        logger.debug(f"Image base64 length: {len(img_b64)}")
        
        logger.debug("Image generated successfully")
        return response
        
    except Exception as e:
        logger.error(f"Error in generate_image: {str(e)}")
        logger.debug(traceback.format_exc())
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/manipulate")
def manipulate(req: ManipulateRequest):
    try:
        logger.debug(f"Manipulate request: {json.dumps(req.dict(), default=str)}")
        encoding = np.array(req.encoding)
        direction = np.array(req.direction)
        manipulated = encoding + req.alpha * direction
        logger.debug("Manipulation successful")
        return {"manipulated_encoding": manipulated.tolist()}
    except Exception as e:
        logger.error(f"Error in manipulate: {str(e)}")
        logger.debug(traceback.format_exc())
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/attribute_direction")
def attribute_direction(req: AttributeDirectionRequest):
    try:
        logger.debug(f"Attribute direction request: {json.dumps(req.dict(), default=str)}")
        latents = np.array(req.latents)
        finder = LatentDirectionFinder(latents)
        
        if req.labels is not None:
            logger.debug("Using classifier-based direction finding")
            direction = finder.classifier_direction(req.labels)
            logger.debug("Direction found successfully")
            return {"direction": direction.tolist()}
        else:
            logger.debug(f"Using PCA with {req.n_components} components")
            components, explained = finder.pca_direction(n_components=req.n_components)
            logger.debug("PCA completed successfully")
            return {"components": components.tolist(), "explained_variance": explained.tolist()}
    except Exception as e:
        logger.error(f"Error in attribute_direction: {str(e)}")
        logger.debug(traceback.format_exc())
        raise HTTPException(status_code=500, detail=str(e))