Spaces:
Running
Running
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel, HttpUrl | |
from transformers import SegformerImageProcessor, AutoModelForSemanticSegmentation | |
from PIL import Image, ImageEnhance, ImageFilter | |
import torch.nn as nn | |
import torch | |
import cv2 | |
import numpy as np | |
import os | |
import requests | |
import io | |
from datetime import datetime | |
from scipy import ndimage | |
import json | |
import tempfile | |
import shutil | |
from typing import List, Dict, Optional | |
import uuid | |
import asyncio | |
from concurrent.futures import ThreadPoolExecutor | |
import logging | |
import cloudinary | |
import cloudinary.uploader | |
from cloudinary.utils import cloudinary_url | |
import os | |
from dotenv import load_dotenv | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI(title="Fashion Segmentation API", version="1.0.0") | |
# Request/Response models | |
class SegmentationRequest(BaseModel): | |
image_url: HttpUrl | |
settings: Optional[Dict] = { | |
"padding": 15, | |
"background": "white", | |
"quality": "high", | |
"outline": "grey_2px" | |
} | |
image_url="https://res.cloudinary.com/dyvuvklpk/image/upload/v1751009512/MEN-Denim-id_00000089-46_7_additional_ow2h0l.png" | |
#print(hi) | |
class SegmentInfo(BaseModel): | |
class_id: int | |
class_name: str | |
filename: str | |
category: str | |
pixel_count: int | |
coverage_percent: float | |
cloudinary_url: str | |
public_id: str | |
class SegmentationResponse(BaseModel): | |
success: bool | |
processing_time: float | |
total_segments: int | |
segments: List[SegmentInfo] | |
metadata: Dict | |
# Global model storage | |
model_cache = {} | |
executor = ThreadPoolExecutor(max_workers=4) | |
# Constants | |
SEGFORMER_LABELS = { | |
0: "Background", 1: "Hat", 2: "Hair", 3: "Sunglasses", 4: "Upper-clothes", | |
5: "Skirt", 6: "Pants", 7: "Dress", 8: "Belt", 9: "Left-shoe", 10: "Right-shoe", | |
11: "Face", 12: "Left-leg", 13: "Right-leg", 14: "Left-arm", 15: "Right-arm", | |
16: "Bag", 17: "Scarf" | |
} | |
CLOTHING_ITEMS = {4, 5, 6, 7, 8, 17} # Upper-clothes, Skirt, Pants, Dress, Belt, Scarf | |
ACCESSORIES = {1, 3, 9, 10, 16} # Hat, Sunglasses, Left-shoe, Right-shoe, Bag | |
BODY_PARTS = {2, 11, 12, 13, 14, 15} # Hair, Face, Left-leg, Right-leg, Left-arm, Right-arm | |
load_dotenv() | |
# Cloudinary Configuration | |
cloudinary.config( | |
cloud_name=os.getenv("CLOUDINARY_CLOUD_NAME"), | |
api_key=os.getenv("CLOUDINARY_API_KEY"), | |
api_secret=os.getenv("CLOUDINARY_API_SECRET"), | |
secure=True | |
) | |
async def load_model(): | |
"""Load the segmentation model asynchronously""" | |
if "model" not in model_cache: | |
logger.info("Loading SegFormer model...") | |
try: | |
processor = SegformerImageProcessor.from_pretrained("mattmdjaga/segformer_b2_clothes") | |
model = AutoModelForSemanticSegmentation.from_pretrained("mattmdjaga/segformer_b2_clothes") | |
model_cache["processor"] = processor | |
model_cache["model"] = model | |
logger.info("Model loaded successfully!") | |
except Exception as e: | |
logger.error(f"Model loading failed: {e}") | |
raise HTTPException(status_code=500, detail=f"Model loading failed: {e}") | |
return model_cache["processor"], model_cache["model"] | |
def download_image(url: str) -> Image.Image: | |
"""Download image from URL""" | |
try: | |
response = requests.get(str(url), timeout=30) | |
response.raise_for_status() | |
image = Image.open(io.BytesIO(response.content)) | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
logger.info("Image downloaaded succcessfully:",url) | |
return image | |
except Exception as e: | |
raise HTTPException(status_code=400, detail=f"Failed to download image: {e}") | |
def enhance_image_quality(image): | |
"""Enhance image quality for high-quality output""" | |
if isinstance(image, np.ndarray): | |
if len(image.shape) == 3 and image.shape[2] == 3: | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
pil_image = Image.fromarray(image) | |
else: | |
pil_image = image | |
# High quality enhancement | |
pil_image = pil_image.filter(ImageFilter.UnsharpMask(radius=2, percent=150, threshold=3)) | |
enhancer = ImageEnhance.Sharpness(pil_image) | |
pil_image = enhancer.enhance(1.3) | |
enhancer = ImageEnhance.Contrast(pil_image) | |
pil_image = enhancer.enhance(1.15) | |
enhancer = ImageEnhance.Color(pil_image) | |
pil_image = enhancer.enhance(1.1) | |
return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
def get_category_folder(class_id): | |
"""Get appropriate folder for class""" | |
if class_id in CLOTHING_ITEMS: | |
return "clothing" | |
"""elif class_id in ACCESSORIES: | |
return "accessories" | |
else: | |
pass # default""" | |
def upload_to_cloudinary(file_path: str, public_id: str, folder: str = "fashion_segments") -> Dict: | |
"""Upload file to Cloudinary and return response with URLs""" | |
try: | |
# Upload to Cloudinary | |
upload_result = cloudinary.uploader.upload( | |
file_path, | |
public_id=f"{folder}/{public_id}", | |
folder=folder, | |
resource_type="image", | |
format="png", | |
quality="auto:best", | |
fetch_format="auto" | |
) | |
# Generate optimized URL | |
optimized_url, _ = cloudinary_url( | |
upload_result['public_id'], | |
format="png", | |
quality="auto:best", | |
fetch_format="auto" | |
) | |
return { | |
'url': upload_result.get('secure_url', upload_result.get('url')), | |
'optimized_url': optimized_url, | |
'public_id': upload_result['public_id'], | |
'version': upload_result.get('version'), | |
'format': upload_result.get('format'), | |
'width': upload_result.get('width'), | |
'height': upload_result.get('height'), | |
'bytes': upload_result.get('bytes') | |
} | |
except Exception as e: | |
logger.error(f"Cloudinary upload failed: {e}") | |
raise HTTPException(status_code=500, detail=f"Upload failed: {e}") | |
def process_segmentation(image: Image.Image, processor, model, settings: Dict) -> tuple: | |
"""Process image segmentation""" | |
# Process with model | |
inputs = processor(images=image, return_tensors="pt") | |
outputs = model(**inputs) | |
logits = outputs.logits.cpu() | |
# Resize to original image size | |
upsampled_logits = nn.functional.interpolate( | |
logits, | |
size=image.size[::-1], # height, width | |
mode="bilinear", | |
align_corners=False, | |
) | |
pred_seg = upsampled_logits.argmax(dim=1)[0] | |
# Extract bounding boxes | |
unique_classes = torch.unique(pred_seg) | |
segment_data = {} | |
total_pixels = pred_seg.numel() | |
for class_id in unique_classes: | |
coords = torch.where(pred_seg == class_id) | |
y_coords = coords[0].numpy() | |
x_coords = coords[1].numpy() | |
min_x, max_x = int(x_coords.min()), int(x_coords.max()) | |
min_y, max_y = int(y_coords.min()), int(y_coords.max()) | |
pixel_count = len(x_coords) | |
coverage = (pixel_count / total_pixels) * 100 | |
segment_data[int(class_id)] = { | |
'bbox': (min_x, min_y, max_x, max_y), | |
'pixel_count': pixel_count, | |
'coverage_percent': coverage | |
} | |
return pred_seg, segment_data | |
def extract_segments(image: Image.Image, pred_seg, segment_data: Dict, settings: Dict) -> List[Dict]: | |
"""Extract individual segments and upload to Cloudinary""" | |
image_np = np.array(image) | |
image_bgr = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) | |
label_map = pred_seg.numpy().astype(np.uint8) | |
h, w = label_map.shape | |
extracted_segments = [] | |
padding = settings.get("padding", 15) | |
# Create temporary directory for processing | |
temp_dir = tempfile.mkdtemp() | |
session_id = str(uuid.uuid4())[:8] # Shorter session ID | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
try: | |
for class_id, info in segment_data.items(): | |
if class_id == 0: # Skip background | |
continue | |
x1, y1, x2, y2 = info['bbox'] | |
# Apply padding | |
x1 = max(0, x1 - padding) | |
y1 = max(0, y1 - padding) | |
x2 = min(w - 1, x2 + padding) | |
y2 = min(h - 1, y2 + padding) | |
# Enhanced mask processing | |
mask = (label_map == class_id).astype(np.uint8) | |
mask_filled = ndimage.binary_fill_holes(mask).astype(np.uint8) | |
# Adaptive kernel size | |
segment_area = np.sum(mask_filled) | |
kernel_size = max(3, min(7, int(np.sqrt(segment_area) / 100))) | |
kernel = np.ones((kernel_size, kernel_size), np.uint8) | |
mask_cleaned = cv2.morphologyEx(mask_filled, cv2.MORPH_CLOSE, kernel, iterations=2) | |
mask_cleaned = cv2.morphologyEx(mask_cleaned, cv2.MORPH_OPEN, kernel, iterations=1) | |
# Smooth edges | |
mask_smooth = cv2.GaussianBlur(mask_cleaned.astype(np.float32), (3, 3), 1.0) | |
# Crop | |
cropped_mask_smooth = mask_smooth[y1:y2+1, x1:x2+1] | |
cropped_image = image_bgr[y1:y2+1, x1:x2+1] | |
# Create white background with grey outline | |
background = np.full(cropped_image.shape, 248, dtype=np.uint8) | |
mask_uint8 = (cropped_mask_smooth * 255).astype(np.uint8) | |
contours, _ = cv2.findContours(mask_uint8, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
mask_3d = np.stack([cropped_mask_smooth] * 3, axis=2) | |
# Composite image - just object + white background (no grey outline) | |
final_image = (cropped_image * mask_3d + | |
background * (1 - mask_3d)).astype(np.uint8) | |
# Enhance quality | |
final_image = enhance_image_quality(final_image) | |
# Save temporarily | |
class_name = SEGFORMER_LABELS.get(class_id, f"Class_{class_id}") | |
category_folder = get_category_folder(class_id) | |
filename = f"{class_id:02d}_{class_name.replace(' ', '_')}_{info['pixel_count']}px.png" | |
temp_filepath = os.path.join(temp_dir, filename) | |
cv2.imwrite(temp_filepath, final_image) | |
# Create public_id for Cloudinary | |
public_id = f"{timestamp}_{session_id}_{category_folder}_{class_id:02d}_{class_name.replace(' ', '_')}" | |
# Upload to Cloudinary | |
cloudinary_result = upload_to_cloudinary( | |
temp_filepath, | |
public_id, | |
folder=f"fashion_segments/{category_folder}" | |
) | |
extracted_segments.append({ | |
'class_id': class_id, | |
'class_name': class_name, | |
'filename': filename, | |
'category': category_folder, | |
'pixel_count': info['pixel_count'], | |
'coverage_percent': info['coverage_percent'], | |
'cloudinary_url': cloudinary_result['optimized_url'], | |
'public_id': cloudinary_result['public_id'] | |
}) | |
logger.info(f"Extracted and uploaded: {class_name} ({info['pixel_count']:,} pixels, {info['coverage_percent']:.1f}% coverage)") | |
finally: | |
# Cleanup temporary directory | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
return extracted_segments | |
async def startup_event(): | |
"""Load model on startup""" | |
await load_model() | |
async def health_check(): | |
return { | |
"status": "healthy", | |
"model_loaded": "model" in model_cache, | |
"cloudinary_configured": bool(CLOUDINARY_CONFIG["cloud_name"]) | |
} | |
async def segment_fashion_items(request: SegmentationRequest): | |
""" | |
Segment fashion items from an image URL and return Cloudinary URLs for extracted segments | |
""" | |
start_time = datetime.now() | |
try: | |
# Load model | |
processor, model = await load_model() | |
# Download image | |
logger.info(f"Downloading image from: {request.image_url}") | |
image = download_image(request.image_url) | |
original_size = image.size | |
# Process segmentation in thread pool | |
loop = asyncio.get_event_loop() | |
pred_seg, segment_data = await loop.run_in_executor( | |
executor, process_segmentation, image, processor, model, request.settings | |
) | |
# Extract segments and upload to Cloudinary | |
extracted_segments = await loop.run_in_executor( | |
executor, extract_segments, image, pred_seg, segment_data, request.settings | |
) | |
# Calculate processing time | |
end_time = datetime.now() | |
processing_time = (end_time - start_time).total_seconds() | |
# Prepare response | |
segments = [SegmentInfo(**segment) for segment in extracted_segments] | |
metadata = { | |
'processing_time': processing_time, | |
'image_size': original_size, | |
'total_segments': len(segments), | |
'settings': request.settings, | |
'timestamp': datetime.now().isoformat(), | |
'storage_provider': 'cloudinary' | |
} | |
logger.info(f"Processing complete: {len(segments)} segments extracted and uploaded in {processing_time:.2f}s") | |
return SegmentationResponse( | |
success=True, | |
processing_time=processing_time, | |
total_segments=len(segments), | |
segments=segments, | |
metadata=metadata | |
) | |
except Exception as e: | |
logger.error(f"Processing failed: {e}") | |
return SegmentationResponse( | |
success=False, | |
processing_time=(datetime.now() - start_time).total_seconds(), | |
total_segments=0, | |
segments=[], | |
metadata={"error": str(e), "storage_provider": "cloudinary"} | |
) | |
async def segment_multiple_images(image_urls: List[HttpUrl]): | |
""" | |
Process multiple images in batch | |
""" | |
results = [] | |
for url in image_urls: | |
try: | |
request = SegmentationRequest(image_url=url) | |
result = await segment_fashion_items(request) | |
results.append({"url": str(url), "result": result}) | |
except Exception as e: | |
results.append({"url": str(url), "error": str(e)}) | |
return {"batch_results": results} | |
async def delete_segment(public_id: str): | |
""" | |
Delete a segment from Cloudinary by public_id | |
""" | |
try: | |
result = cloudinary.uploader.destroy(public_id) | |
return {"success": True, "result": result} | |
except Exception as e: | |
logger.error(f"Failed to delete {public_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Deletion failed: {e}") | |
async def get_transformed_url( | |
public_id: str, | |
width: Optional[int] = None, | |
height: Optional[int] = None, | |
quality: Optional[str] = "auto", | |
format: Optional[str] = "auto" | |
): | |
""" | |
Get a transformed URL for a segment with specified dimensions and quality | |
""" | |
try: | |
transformations = { | |
"quality": quality, | |
"fetch_format": format | |
} | |
if width: | |
transformations["width"] = width | |
if height: | |
transformations["height"] = height | |
url, options = cloudinary_url(public_id, **transformations) | |
return { | |
"original_public_id": public_id, | |
"transformed_url": url, | |
"transformations": transformations | |
} | |
except Exception as e: | |
logger.error(f"Failed to generate transformed URL: {e}") | |
raise HTTPException(status_code=500, detail=f"URL generation failed: {e}") | |
async def root(): | |
request = { | |
"image_url": "https://res.cloudinary.com/dyvuvklpk/image/upload/v1751009512/MEN-Denim-id_00000089-46_7_additional_ow2h0l.png", | |
"settings": { | |
"padding": 15, | |
"background": "white", | |
"quality": "high", | |
"outline": "grey_2px" | |
} | |
} | |
await segment_fashion_items(SegmentationRequest(**request)) | |
return {"message": "Successfully Finished Execution!", "version": "1.0.0"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) |