Vedansh-7's picture
Update app.py
06a1915 verified
raw
history blame
13.6 kB
import torch
import torch.nn as nn
import gradio as gr
from PIL import Image
import numpy as np
import math
import os
from threading import Event
import traceback
# Constants
IMG_SIZE = 128
TIMESTEPS = 300
NUM_CLASSES = 2
# Global Cancellation Flag
cancel_event = Event()
# Device Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# --- Model Definitions ---
class SinusoidalPositionEmbeddings(nn.Module):
def __init__(self, dim):
super().__init__()
self.dim = dim
half_dim = dim // 2
emb = math.log(10000) / (half_dim - 1)
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb)
self.register_buffer('embeddings', emb)
def forward(self, time):
embeddings = self.embeddings.to(time.device)
embeddings = time.float()[:, None] * embeddings[None, :]
return torch.cat([embeddings.sin(), embeddings.cos()], dim=-1)
class UNet(nn.Module):
def __init__(self, in_channels=3, out_channels=3, num_classes=2, time_dim=256):
super().__init__()
self.num_classes = num_classes
self.label_embedding = nn.Embedding(num_classes, time_dim)
self.time_mlp = nn.Sequential(
SinusoidalPositionEmbeddings(time_dim),
nn.Linear(time_dim, time_dim),
nn.ReLU(),
nn.Linear(time_dim, time_dim)
)
self.inc = self.double_conv(in_channels, 64)
self.down1 = self.down(64 + time_dim * 2, 128)
self.down2 = self.down(128 + time_dim * 2, 256)
self.down3 = self.down(256 + time_dim * 2, 512)
self.bottleneck = self.double_conv(512 + time_dim * 2, 1024)
self.up1 = nn.ConvTranspose2d(1024, 256, kernel_size=2, stride=2)
self.upconv1 = self.double_conv(256 + 256 + time_dim * 2, 256)
self.up2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
self.upconv2 = self.double_conv(128 + 128 + time_dim * 2, 128)
self.up3 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
self.upconv3 = self.double_conv(64 + 64 + time_dim * 2, 64)
self.outc = nn.Conv2d(64, out_channels, kernel_size=1)
def double_conv(self, in_channels, out_channels):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
nn.ReLU(inplace=True)
)
def down(self, in_channels, out_channels):
return nn.Sequential(
nn.MaxPool2d(2),
self.double_conv(in_channels, out_channels)
)
def forward(self, x, labels, time):
label_indices = torch.argmax(labels, dim=1)
label_emb = self.label_embedding(label_indices)
t_emb = self.time_mlp(time)
combined_emb = torch.cat([t_emb, label_emb], dim=1)
combined_emb = combined_emb.unsqueeze(-1).unsqueeze(-1)
x1 = self.inc(x)
x1_cat = torch.cat([x1, combined_emb.repeat(1, 1, x1.shape[-2], x1.shape[-1])], dim=1)
x2 = self.down1(x1_cat)
x2_cat = torch.cat([x2, combined_emb.repeat(1, 1, x2.shape[-2], x2.shape[-1])], dim=1)
x3 = self.down2(x2_cat)
x3_cat = torch.cat([x3, combined_emb.repeat(1, 1, x3.shape[-2], x3.shape[-1])], dim=1)
x4 = self.down3(x3_cat)
x4_cat = torch.cat([x4, combined_emb.repeat(1, 1, x4.shape[-2], x4.shape[-1])], dim=1)
x5 = self.bottleneck(x4_cat)
x = self.up1(x5)
x = torch.cat([x, x3], dim=1)
x = torch.cat([x, combined_emb.repeat(1, 1, x.shape[-2], x.shape[-1])], dim=1)
x = self.upconv1(x)
x = self.up2(x)
x = torch.cat([x, x2], dim=1)
x = torch.cat([x, combined_emb.repeat(1, 1, x.shape[-2], x.shape[-1])], dim=1)
x = self.upconv2(x)
x = self.up3(x)
x = torch.cat([x, x1], dim=1)
x = torch.cat([x, combined_emb.repeat(1, 1, x.shape[-2], x.shape[-1])], dim=1)
x = self.upconv3(x)
output = self.outc(x)
return output
class DiffusionModel(nn.Module):
def __init__(self, model, timesteps=TIMESTEPS, time_dim=256):
super().__init__()
self.model = model
self.timesteps = timesteps
self.time_dim = time_dim
# Fix 1: Ensure consistent float32 types
scale = 1000 / timesteps
beta_start = scale * 0.0001
beta_end = scale * 0.02
self.betas = torch.linspace(beta_start, beta_end, timesteps, dtype=torch.float32)
self.alphas = 1. - self.betas
self.register_buffer('alpha_bars', torch.cumprod(self.alphas, dim=0))
@torch.no_grad()
def sample(self, num_images, img_size, num_classes, labels, device, progress_callback=None):
# Initialize with noise
x_t = torch.randn((num_images, 3, img_size, img_size), device=device, dtype=torch.float32)
# Convert labels to proper format
if labels.ndim == 1:
labels_one_hot = torch.zeros(num_images, num_classes, device=device)
labels_one_hot[torch.arange(num_images), labels] = 1
labels = labels_one_hot
else:
labels = labels.to(device)
for i in reversed(range(0, self.timesteps)):
if cancel_event.is_set():
return None
t = torch.full((num_images,), i, device=device, dtype=torch.long)
# Model prediction with type stability
pred_noise = self.model(x_t, labels, t.float())
# Calculate diffusion parameters
beta_t = self.betas[t].view(-1, 1, 1, 1).to(device)
alpha_t = self.alphas[t].view(-1, 1, 1, 1).to(device)
alpha_bar_t = self.alpha_bars[t].view(-1, 1, 1, 1).to(device)
# Improved denoising step (Fix 2)
if i > 0:
noise = torch.randn_like(x_t)
else:
noise = torch.zeros_like(x_t)
x_t = (x_t - (1 - alpha_t)/torch.sqrt(1 - alpha_bar_t) * pred_noise) / torch.sqrt(alpha_t)
x_t += noise * torch.sqrt(beta_t)
if progress_callback:
progress_callback((self.timesteps - i) / self.timesteps)
# Fix 3: Simplified scaling
x_t = torch.clamp(x_t, -1., 1.)
return (x_t + 1) / 2 # Scale to [0,1]
def load_model(model_path, device):
unet = UNet(num_classes=NUM_CLASSES).to(device)
diffusion_model = DiffusionModel(unet).to(device)
if os.path.exists(model_path):
try:
checkpoint = torch.load(model_path, map_location=device)
# Handle both full model and state_dict loading
if 'model_state_dict' in checkpoint:
state_dict = checkpoint['model_state_dict']
else:
state_dict = checkpoint
# Handle both prefixed and non-prefixed state dicts
if all(k.startswith('model.') for k in state_dict.keys()):
state_dict = {k[6:]: v for k, v in state_dict.items()}
unet.load_state_dict(state_dict, strict=False)
print("Model loaded successfully")
# Verify model loading
test_input = torch.randn(1, 3, IMG_SIZE, IMG_SIZE).to(device)
test_labels = torch.zeros(1, NUM_CLASSES).to(device)
test_labels[0, 0] = 1
test_time = torch.tensor([1]).to(device)
output = unet(test_input, test_labels, test_time)
print(f"Model test output shape: {output.shape}")
except Exception as e:
traceback.print_exc()
raise ValueError(f"Error loading model: {str(e)}")
else:
raise FileNotFoundError(f"Model weights not found at {model_path}")
diffusion_model.eval()
return diffusion_model
def cancel_generation():
cancel_event.set()
return "Generation cancelled"
def generate_images(label_str, num_images, progress=gr.Progress()):
global loaded_model
cancel_event.clear()
# Input validation
if num_images < 1 or num_images > 10:
raise gr.Error("Number of images must be between 1 and 10")
label_map = {'Pneumonia': 0, 'Pneumothorax': 1}
if label_str not in label_map:
raise gr.Error("Invalid condition selected")
labels = torch.zeros(num_images, NUM_CLASSES, device=device, dtype=torch.float32)
labels[:, label_map[label_str]] = 1
try:
def progress_callback(progress_val):
progress(progress_val, desc="Generating...")
if cancel_event.is_set():
raise gr.Error("Generation was cancelled by user")
with torch.no_grad():
print(f"Generating {num_images} images for {label_str}")
print(f"Labels shape: {labels.shape}, device: {labels.device}")
images = loaded_model.sample(
num_images=num_images,
img_size=IMG_SIZE,
num_classes=NUM_CLASSES,
labels=labels,
device=device,
progress_callback=progress_callback
)
if images is None:
return None, None
# Diagnostic print
print(f"Generated images range: {images.min().item():.3f}, {images.max().item():.3f}")
processed_images = []
for img in images:
# Fix 3: Improved image conversion
img_np = (img.cpu().numpy().transpose(1, 2, 0) * 255).clip(0, 255).astype(np.uint8)
print(f"Image range after conversion: {img_np.min()}, {img_np.max()}")
if img_np.shape[2] == 1: # Handle grayscale if needed
img_np = img_np.squeeze(-1)
pil_img = Image.fromarray(img_np)
processed_images.append(pil_img)
# Return appropriate outputs based on count
if num_images == 1:
return processed_images[0], processed_images
else:
return None, processed_images
except torch.cuda.OutOfMemoryError:
torch.cuda.empty_cache()
raise gr.Error("Out of GPU memory - try generating fewer images")
except Exception as e:
traceback.print_exc()
if str(e) != "Generation was cancelled by user":
raise gr.Error(f"Generation failed: {str(e)}")
return None, None
finally:
torch.cuda.empty_cache()
# Load model
MODEL_DIR = "models"
MODEL_NAME = "diffusion_unet_xray.pth"
model_path = os.path.join(MODEL_DIR, MODEL_NAME)
print("Loading model...")
loaded_model = load_model(model_path, device)
print("Model loaded successfully!")
# Gradio UI
with gr.Blocks(theme=gr.themes.Soft(
primary_hue="violet",
neutral_hue="slate",
font=[gr.themes.GoogleFont("Poppins")],
text_size="md"
)) as demo:
gr.Markdown("""
<center>
<h1>Synthetic X-ray Generator</h1>
<p><em>Generate synthetic chest X-rays conditioned on pathology</em></p>
</center>
""")
with gr.Row():
with gr.Column(scale=1):
condition = gr.Dropdown(
["Pneumonia", "Pneumothorax"],
label="Select Condition",
value="Pneumonia",
interactive=True
)
num_images = gr.Slider(
1, 10, value=1, step=1,
label="Number of Images",
interactive=True
)
with gr.Row():
submit_btn = gr.Button("Generate", variant="primary")
cancel_btn = gr.Button("Cancel", variant="stop")
gr.Markdown("""
<div style="text-align: center; margin-top: 10px;">
<small>Note: Generation may take several seconds per image</small>
</div>
""")
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("Output", id="output_tab"):
single_image = gr.Image(
label="Generated X-ray",
height=400,
visible=True
)
gallery = gr.Gallery(
label="Generated X-rays",
columns=3,
height="auto",
object_fit="contain",
visible=False
)
def update_ui_based_on_count(num_images):
if num_images == 1:
return {
single_image: gr.update(visible=True),
gallery: gr.update(visible=False)
}
else:
return {
single_image: gr.update(visible=False),
gallery: gr.update(visible=True)
}
num_images.change(
fn=update_ui_based_on_count,
inputs=num_images,
outputs=[single_image, gallery]
)
submit_btn.click(
fn=generate_images,
inputs=[condition, num_images],
outputs=[single_image, gallery]
)
cancel_btn.click(
fn=cancel_generation,
outputs=None
)
demo.css = """
.gradio-container {
background: linear-gradient(135deg, #f5f7fa 0%, #e4e8f0 100%);
}
.gallery-container {
background-color: white !important;
}
"""
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)