|
import base64 |
|
import io |
|
import json |
|
import numpy as np |
|
import torch |
|
from fastapi import FastAPI, HTTPException |
|
from huggingface_hub import hf_hub_download |
|
from PIL import Image |
|
from pydantic import BaseModel |
|
from torchvision import transforms |
|
|
|
import timm |
|
import segmentation_models_pytorch as smp |
|
import albumentations as A |
|
from albumentations.pytorch import ToTensorV2 |
|
import os |
|
|
|
|
|
app = FastAPI(title="AI Skin Lesion Analyzer API") |
|
|
|
|
|
DEVICE = "cpu" |
|
segmentation_model = None |
|
classification_model = None |
|
knowledge_base = None |
|
idx_to_class_abbr = {0: 'MEL', 1: 'NV', 2: 'BCC', 3: 'AKIEC', 4: 'BKL', 5: 'DF', 6: 'VASC'} |
|
transform_segment = None |
|
transform_classify = None |
|
|
|
class ImageRequest(BaseModel): |
|
image_base64: str |
|
|
|
@app.on_event("startup") |
|
def load_assets(): |
|
global segmentation_model, classification_model, knowledge_base, transform_segment, transform_classify |
|
|
|
|
|
if segmentation_model is not None and classification_model is not None and knowledge_base is not None: |
|
print("🔁 Models and knowledge base already loaded. Skipping reloading.") |
|
return |
|
|
|
print("--> API starting up: This may take a few minutes...") |
|
|
|
|
|
cache_dir = "/tmp/models_cache" |
|
os.makedirs(cache_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
print(" Downloading UNet segmentation model...") |
|
seg_model_path = hf_hub_download( |
|
repo_id="sheikh987/unet-isic2018", |
|
filename="unet_full_data_best_model.pth", |
|
cache_dir=cache_dir |
|
) |
|
segmentation_model = smp.Unet("resnet34", encoder_weights=None, in_channels=3, classes=1).to(DEVICE) |
|
segmentation_model.load_state_dict(torch.load(seg_model_path, map_location=DEVICE)) |
|
segmentation_model.eval() |
|
print(" ✅ Segmentation model loaded.") |
|
except Exception as e: |
|
print(f"!!! FATAL: Could not load segmentation model: {e}") |
|
|
|
|
|
try: |
|
print(" Downloading EfficientNet classification model...") |
|
class_model_path = hf_hub_download( |
|
repo_id="sheikh987/efficientnet-isic", |
|
filename="efficientnet_augmented_best.pth", |
|
cache_dir=cache_dir |
|
) |
|
classification_model = timm.create_model('efficientnet_b3', pretrained=False, num_classes=7).to(DEVICE) |
|
classification_model.load_state_dict(torch.load(class_model_path, map_location=DEVICE)) |
|
classification_model.eval() |
|
print(" ✅ Classification model loaded.") |
|
except Exception as e: |
|
print(f"!!! FATAL: Could not load classification model: {e}") |
|
|
|
|
|
try: |
|
with open('knowledge_base.json', 'r') as f: |
|
knowledge_base = json.load(f) |
|
print(" ✅ Knowledge base loaded.") |
|
except Exception as e: |
|
print(f"!!! FATAL: Could not load knowledge_base.json: {e}") |
|
|
|
|
|
transform_segment = A.Compose([ |
|
A.Resize(256, 256), |
|
A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255.0), |
|
ToTensorV2() |
|
]) |
|
|
|
transform_classify = transforms.Compose([ |
|
transforms.Resize((300, 300)), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
|
]) |
|
|
|
print("\n--> API is ready to accept requests.") |
|
|
|
@app.post("/analyze/") |
|
async def analyze_image(request: ImageRequest): |
|
if not all([segmentation_model, classification_model, knowledge_base]): |
|
raise HTTPException(status_code=503, detail="Models not loaded yet. Please retry shortly.") |
|
|
|
try: |
|
image_data = base64.b64decode(request.image_base64) |
|
image = Image.open(io.BytesIO(image_data)).convert("RGB") |
|
image_np = np.array(image) |
|
except Exception: |
|
raise HTTPException(status_code=400, detail="Invalid base64 image data provided.") |
|
|
|
|
|
augmented_seg = transform_segment(image=image_np) |
|
seg_input_tensor = augmented_seg['image'].unsqueeze(0).to(DEVICE) |
|
with torch.no_grad(): |
|
seg_logits = segmentation_model(seg_input_tensor) |
|
seg_mask = (torch.sigmoid(seg_logits) > 0.5).float().squeeze().cpu().numpy() |
|
|
|
if seg_mask.sum() < 200: |
|
return {"status": "Failed", "message": "No lesion could be clearly identified in the image."} |
|
|
|
|
|
rows, cols = np.any(seg_mask, axis=1), np.any(seg_mask, axis=0) |
|
rmin, rmax = np.where(rows)[0][[0, -1]] |
|
cmin, cmax = np.where(cols)[0][[0, -1]] |
|
padding = 15 |
|
rmin, rmax = max(0, rmin - padding), min(image_np.shape[0], rmax + padding) |
|
cmin, cmax = max(0, cmin - padding), min(image_np.shape[1], cmax + padding) |
|
cropped_image_pil = Image.fromarray(image_np[rmin:rmax, cmin:cmax]) |
|
|
|
class_input_tensor = transform_classify(cropped_image_pil).unsqueeze(0).to(DEVICE) |
|
with torch.no_grad(): |
|
class_logits = classification_model(class_input_tensor) |
|
probabilities = torch.nn.functional.softmax(class_logits, dim=1) |
|
|
|
confidence, predicted_idx = torch.max(probabilities, 1) |
|
confidence_percent = confidence.item() * 100 |
|
|
|
if confidence_percent < 50.0: |
|
return {"status": "Inconclusive", "message": f"Model confidence ({confidence_percent:.2f}%) is below the 75% threshold."} |
|
|
|
|
|
predicted_abbr = idx_to_class_abbr[predicted_idx.item()] |
|
info = knowledge_base.get(predicted_abbr, {}) |
|
|
|
return { |
|
"status": "Success", |
|
"prediction": info, |
|
"abbreviation": predicted_abbr, |
|
"confidence": f"{confidence_percent:.2f}%" |
|
} |
|
|
|
@app.get("/") |
|
def root(): |
|
return {"message": "AI Skin Lesion Analyzer API is running."} |
|
|