neural-os / utils.py
da03
.
8043e65
import torch
from omegaconf import OmegaConf
from ldm.util import instantiate_from_config
from ldm.models.diffusion.ddpm import LatentDiffusion, DDIMSampler
import numpy as np
from PIL import Image
from huggingface_hub import hf_hub_download
import json
import os
import time
DEBUG = False
def load_model_from_config(config_path, model_name, device='cuda', load=True):
# Load the config file
config = OmegaConf.load(config_path)
# Instantiate the model
model = instantiate_from_config(config.model)
# Download the model file from Hugging Face
if load:
model_file = hf_hub_download(repo_id=model_name, filename="model.safetensors", token=os.getenv('HF_TOKEN'))
print(f"Loading model from {model_name}")
# Load the state dict
state_dict = torch.load(model_file, map_location='cpu')
model.load_state_dict(state_dict, strict=False)
model.to(device)
model.eval()
return model
def sample_frame(model: LatentDiffusion, prompt: str, image_sequence: torch.Tensor, pos_maps=None, leftclick_maps=None):
sampler = DDIMSampler(model)
with torch.no_grad():
#u_dict = {'c_crossattn': "", 'c_concat': image_sequence}
#uc = model.get_learned_conditioning(u_dict)
#uc = model.enc_concat_seq(uc, u_dict, 'c_concat')
#c_dict = {'c_crossattn': prompt, 'c_concat': image_sequence}
model.eval()
#c = model.get_learned_conditioning(c_dict)
#print (c['c_crossattn'].shape)
#print (c['c_crossattn'][0])
print (prompt)
# reshape(B, L * C, H, W)
#height, width, channels = image_sequence.shape
# use einsum to reshape
image_sequence = torch.einsum('hwc->chw', image_sequence).unsqueeze(0)
c = {'c_concat': image_sequence}
print (image_sequence.shape, c['c_concat'].shape)
#c = model.enc_concat_seq(c, c_dict, 'c_concat')
# Zero out the corresponding subtensors in c_concat for padding images
#padding_mask = torch.isclose(image_sequence, torch.tensor(-1.0), rtol=1e-5, atol=1e-5).all(dim=(1, 2, 3)).unsqueeze(0)
#print (padding_mask)
#padding_mask = padding_mask.repeat(1, 4) # Repeat mask 4 times for each projected channel
#print (image_sequence.shape, padding_mask.shape, c['c_concat'].shape)
#c['c_concat'] = c['c_concat'] * (~padding_mask.unsqueeze(-1).unsqueeze(-1)) # Zero out the corresponding features
if pos_maps is not None:
pos_map = pos_maps[0]
leftclick_map = torch.cat(leftclick_maps, dim=0)
print (pos_maps[0].shape, c['c_concat'].shape, leftclick_map.shape)
if False and DEBUG:
c['c_concat'] = c['c_concat']*0
c['c_concat'] = torch.cat([c['c_concat'][:, :, :, :], pos_maps[0].to(c['c_concat'].device).unsqueeze(0), leftclick_map.to(c['c_concat'].device).unsqueeze(0)], dim=1)
print ('sleeping')
#time.sleep(120)
print ('finished sleeping')
DDPM = False
DDPM = True
DDPM = False
if DEBUG:
#c['c_concat'] = c['c_concat']*0
print ('utils prompt', prompt, c['c_concat'].shape, c.keys())
print (c['c_concat'].nonzero())
#print (c['c_concat'][0, 0, :, :])
if DDPM:
samples_ddim = model.p_sample_loop(cond=c, shape=[1, 4, 48, 64], return_intermediates=False, verbose=True)
else:
samples_ddim, _ = sampler.sample(S=16,
conditioning=c,
batch_size=1,
shape=[4, 48, 64],
verbose=False)
# unconditional_guidance_scale=5.0,
# unconditional_conditioning=uc,
# eta=0)
print ('dfsf1')
if False and DEBUG:
print ('samples_ddim.shape', samples_ddim.shape)
x_samples_ddim = samples_ddim[:, :3]
# upsample to 512 x 384
x_samples_ddim = torch.nn.functional.interpolate(x_samples_ddim, size=(384, 512), mode='bilinear')
# create a 512 x 384 image and paste the samples_ddim into the center
#x_samples_ddim = torch.zeros((1, 3, 384, 512))
#x_samples_ddim[:, :, 128:128+48, 160:160+64] = samples_ddim[:, :3]
else:
print ('dfsf2')
data_mean = -0.54
data_std = 6.78
data_min = -27.681446075439453
data_max = 30.854148864746094
x_samples_ddim = samples_ddim
x_samples_ddim_feedback = x_samples_ddim
x_samples_ddim = x_samples_ddim * data_std + data_mean
x_samples_ddim = model.decode_first_stage(x_samples_ddim)
print ('dfsf3')
#x_samples_ddim = pos_map.to(c['c_concat'].device).unsqueeze(0).expand(-1, 3, -1, -1)
#x_samples_ddim = model.decode_first_stage(x_samples_ddim)
#x_samples_ddim = torch.clamp((x_samples_ddim + 1.0) / 2.0, min=0.0, max=1.0)
x_samples_ddim = torch.clamp(x_samples_ddim, min=-1.0, max=1.0)
return x_samples_ddim.squeeze(0).cpu().numpy(), x_samples_ddim_feedback.squeeze(0)
# Global variables for model and device
#model = None
#device = None
def initialize_model(config_path, model_name):
#global model, device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = load_model_from_config(config_path, model_name, device)
return model