|
from fastapi import FastAPI, HTTPException, UploadFile, File, Response |
|
from pydantic import BaseModel |
|
import requests |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.responses import HTMLResponse |
|
import uuid |
|
from pathlib import Path |
|
import logging |
|
import os |
|
import stat |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
STATIC_DIR = Path("/home/user/app/static") |
|
STATIC_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
os.chmod(STATIC_DIR, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH | stat.S_IXOTH) |
|
except Exception as e: |
|
logger.error(f"Failed to set permissions for {STATIC_DIR}: {str(e)}") |
|
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
|
|
|
|
|
|
|
|
|
class TryOnRequest(BaseModel): |
|
garmentDesc: str |
|
category: str |
|
|
|
|
|
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png"} |
|
|
|
|
|
async def save_file_and_get_url(file: UploadFile) -> str: |
|
try: |
|
|
|
file_extension = f".{file.filename.split('.')[-1].lower()}" |
|
if file_extension not in ALLOWED_EXTENSIONS: |
|
logger.error(f"Invalid file extension for {file.filename}: {file_extension}") |
|
raise HTTPException(status_code=400, detail=f"File extension {file_extension} not allowed. Use JPG or PNG.") |
|
|
|
|
|
unique_filename = f"{uuid.uuid4()}{file_extension}" |
|
file_path = STATIC_DIR / unique_filename |
|
|
|
|
|
logger.info(f"Saving file to {file_path}") |
|
with file_path.open("wb") as buffer: |
|
content = await file.read() |
|
buffer.write(content) |
|
|
|
|
|
if not file_path.exists(): |
|
logger.error(f"File {file_path} was not saved correctly") |
|
raise HTTPException(status_code=500, detail="Failed to save file") |
|
try: |
|
os.chmod(file_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IROTH) |
|
logger.info(f"Set permissions for {file_path}") |
|
except Exception as e: |
|
logger.error(f"Failed to set permissions for {file_path}: {str(e)}") |
|
|
|
|
|
space_id = "tejani-tryapi" |
|
public_url = f"https://{space_id}.hf.space/static/{unique_filename}" |
|
|
|
|
|
logger.info(f"Generated public URL: {public_url}") |
|
|
|
|
|
try: |
|
response = requests.head(public_url, timeout=20) |
|
if response.status_code != 200: |
|
logger.warning(f"Public URL {public_url} returned status {response.status_code}") |
|
else: |
|
logger.info(f"Public URL {public_url} is accessible") |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Failed to access public URL {public_url}: {str(e)}") |
|
|
|
return public_url |
|
except Exception as e: |
|
logger.error(f"Error in save_file_and_get_url: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error processing file: {str(e)}") |
|
|
|
|
|
@app.post("/try-on") |
|
async def try_on( |
|
human_img: UploadFile = File(...), |
|
garment: UploadFile = File(...), |
|
garment_desc: str = "", |
|
category: str = "upper_body" |
|
): |
|
try: |
|
|
|
human_img_url = await save_file_and_get_url(human_img) |
|
garment_url = await save_file_and_get_url(garment) |
|
|
|
|
|
url = "https://changeclothesai.online/api/try-on/" |
|
|
|
headers = { |
|
"accept": "*/*", |
|
"f": "sdfdsfsKaVgUoxa5j1jzcFtziPx", |
|
} |
|
|
|
data = { |
|
"humanImg": human_img_url, |
|
"garment": garment_url, |
|
"garmentDesc": garment_desc, |
|
"category": category |
|
} |
|
|
|
logger.info(f"Forwarding request to {url} with data: {data}") |
|
|
|
|
|
response = requests.post(url, headers=headers, cookies={}, data=data) |
|
response.raise_for_status() |
|
|
|
return { |
|
"status_code": response.status_code, |
|
"response": response.json() if response.headers.get('content-type') == 'application/json' else response.text, |
|
"human_img_url": human_img_url, |
|
"garment_url": garment_url |
|
} |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"Error forwarding request: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error forwarding request: {str(e)}") |
|
except Exception as e: |
|
logger.error(f"Error in try_on endpoint: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") |
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "FastAPI proxy for try-on API with file upload is running"} |
|
|
|
|
|
@app.get("/list-files") |
|
async def list_files(): |
|
try: |
|
files = [str(f) for f in STATIC_DIR.glob("*") if f.is_file()] |
|
logger.info(f"Files in {STATIC_DIR}: {files}") |
|
return {"files": files} |
|
except Exception as e: |
|
logger.error(f"Error listing files: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error listing files: {str(e)}") |
|
|
|
|
|
@app.get("/get-file/{filename}") |
|
async def get_file(filename: str): |
|
try: |
|
file_path = STATIC_DIR / filename |
|
if not file_path.exists(): |
|
logger.error(f"File {file_path} not found") |
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
with file_path.open("rb") as f: |
|
content = f.read() |
|
return Response(content=content, media_type="image/jpeg") |
|
except Exception as e: |
|
logger.error(f"Error serving file {filename}: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error serving file: {str(e)}") |
|
|
|
|
|
@app.get("/check-file/{filename}") |
|
async def check_file(filename: str): |
|
try: |
|
file_path = STATIC_DIR / filename |
|
if not file_path.exists(): |
|
logger.error(f"File {file_path} not found") |
|
raise HTTPException(status_code=404, detail="File not found") |
|
|
|
stats = os.stat(file_path) |
|
permissions = oct(stats.st_mode)[-3:] |
|
logger.info(f"File {file_path} exists with permissions {permissions}") |
|
return { |
|
"file": str(file_path), |
|
"exists": True, |
|
"permissions": permissions |
|
} |
|
except Exception as e: |
|
logger.error(f"Error checking file {filename}: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error checking file: {str(e)}") |
|
|
|
|
|
@app.get("/clean-files") |
|
async def clean_files(): |
|
try: |
|
for f in STATIC_DIR.glob("*"): |
|
if f.is_file(): |
|
f.unlink() |
|
logger.info(f"Cleaned all files in {STATIC_DIR}") |
|
return {"message": "Files cleaned"} |
|
except Exception as e: |
|
logger.error(f"Error cleaning files: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Error cleaning files: {str(e)}") |
|
|
|
|
|
@app.get("/test-upload", response_class=HTMLResponse) |
|
async def test_upload_form(): |
|
html_content = """ |
|
<html> |
|
<head> |
|
<title>Test File Upload</title> |
|
<style> |
|
body { font-family: Arial, sans-serif; margin: 20px; } |
|
.response { margin-top: 20px; padding: 10px; border: 1px solid #ccc; } |
|
</style> |
|
</head> |
|
<body> |
|
<h1>Test File Upload</h1> |
|
<form action="/try-on" method="post" enctype="multipart/form-data" onsubmit="showResponse(event)"> |
|
<label for="human_img">Human Image (JPG/PNG):</label><br> |
|
<input type="file" id="human_img" name="human_img" accept=".jpg,.jpeg,.png" required><br><br> |
|
<label for="garment">Garment Image (JPG/PNG):</label><br> |
|
<input type="file" id="garment" name="garment" accept=".jpg,.jpeg,.png" required><br><br> |
|
<label for="garment_desc">Garment Description:</label><br> |
|
<input type="text" id="garment_desc" name="garment_desc" value=""><br><br> |
|
<label for="category">Category:</label><br> |
|
<input type="text" id="category" name="category" value="upper_body"><br><br> |
|
<input type="submit" value="Upload and Try On"> |
|
</form> |
|
<div id="response" class="response" style="display: none;"></div> |
|
<h2>Debug Tools</h2> |
|
<p><a href="/list-files">List Stored Files</a></p> |
|
<p><a href="/clean-files">Clean Stored Files</a></p> |
|
<script> |
|
async function showResponse(event) { |
|
event.preventDefault(); |
|
const form = event.target; |
|
const formData = new FormData(form); |
|
try { |
|
const response = await fetch('/try-on', { |
|
method: 'POST', |
|
body: formData |
|
}); |
|
const result = await response.json(); |
|
const responseDiv = document.getElementById('response'); |
|
responseDiv.style.display = 'block'; |
|
responseDiv.innerHTML = ` |
|
<h3>Response</h3> |
|
<p><strong>Status Code:</strong> ${result.status_code}</p> |
|
<p><strong>Human Image URL:</strong> <a href="${result.human_img_url}" target="_blank">${result.human_img_url}</a></p> |
|
<p><strong>Garment URL:</strong> <a href="${result.garment_url}" target="_blank">${result.garment_url}</a></p> |
|
<p><strong>API Response:</strong> <pre>${JSON.stringify(result.response, null, 2)}</pre></p> |
|
`; |
|
} catch (error) { |
|
document.getElementById('response').style.display = 'block'; |
|
document.getElementById('response').innerHTML = `<p><strong>Error:</strong> ${error.message}</p>`; |
|
} |
|
} |
|
</script> |
|
</body> |
|
</html> |
|
""" |
|
return HTMLResponse(content=html_content) |