|
import torch |
|
from torch import nn |
|
from comfy.ldm.flux.layers import ( |
|
DoubleStreamBlock, |
|
LastLayer, |
|
MLPEmbedder, |
|
SingleStreamBlock, |
|
timestep_embedding, |
|
) |
|
|
|
|
|
class Hunyuan3Dv2(nn.Module): |
|
def __init__( |
|
self, |
|
in_channels=64, |
|
context_in_dim=1536, |
|
hidden_size=1024, |
|
mlp_ratio=4.0, |
|
num_heads=16, |
|
depth=16, |
|
depth_single_blocks=32, |
|
qkv_bias=True, |
|
guidance_embed=False, |
|
image_model=None, |
|
dtype=None, |
|
device=None, |
|
operations=None |
|
): |
|
super().__init__() |
|
self.dtype = dtype |
|
|
|
if hidden_size % num_heads != 0: |
|
raise ValueError( |
|
f"Hidden size {hidden_size} must be divisible by num_heads {num_heads}" |
|
) |
|
|
|
self.max_period = 1000 |
|
self.latent_in = operations.Linear(in_channels, hidden_size, bias=True, dtype=dtype, device=device) |
|
self.time_in = MLPEmbedder(in_dim=256, hidden_dim=hidden_size, dtype=dtype, device=device, operations=operations) |
|
self.guidance_in = ( |
|
MLPEmbedder(in_dim=256, hidden_dim=hidden_size, dtype=dtype, device=device, operations=operations) if guidance_embed else None |
|
) |
|
self.cond_in = operations.Linear(context_in_dim, hidden_size, dtype=dtype, device=device) |
|
self.double_blocks = nn.ModuleList( |
|
[ |
|
DoubleStreamBlock( |
|
hidden_size, |
|
num_heads, |
|
mlp_ratio=mlp_ratio, |
|
qkv_bias=qkv_bias, |
|
dtype=dtype, device=device, operations=operations |
|
) |
|
for _ in range(depth) |
|
] |
|
) |
|
self.single_blocks = nn.ModuleList( |
|
[ |
|
SingleStreamBlock( |
|
hidden_size, |
|
num_heads, |
|
mlp_ratio=mlp_ratio, |
|
dtype=dtype, device=device, operations=operations |
|
) |
|
for _ in range(depth_single_blocks) |
|
] |
|
) |
|
self.final_layer = LastLayer(hidden_size, 1, in_channels, dtype=dtype, device=device, operations=operations) |
|
|
|
def forward(self, x, timestep, context, guidance=None, transformer_options={}, **kwargs): |
|
x = x.movedim(-1, -2) |
|
timestep = 1.0 - timestep |
|
txt = context |
|
img = self.latent_in(x) |
|
|
|
vec = self.time_in(timestep_embedding(timestep, 256, self.max_period).to(dtype=img.dtype)) |
|
if self.guidance_in is not None: |
|
if guidance is not None: |
|
vec = vec + self.guidance_in(timestep_embedding(guidance, 256, self.max_period).to(img.dtype)) |
|
|
|
txt = self.cond_in(txt) |
|
pe = None |
|
attn_mask = None |
|
|
|
patches_replace = transformer_options.get("patches_replace", {}) |
|
blocks_replace = patches_replace.get("dit", {}) |
|
for i, block in enumerate(self.double_blocks): |
|
if ("double_block", i) in blocks_replace: |
|
def block_wrap(args): |
|
out = {} |
|
out["img"], out["txt"] = block(img=args["img"], |
|
txt=args["txt"], |
|
vec=args["vec"], |
|
pe=args["pe"], |
|
attn_mask=args.get("attn_mask")) |
|
return out |
|
|
|
out = blocks_replace[("double_block", i)]({"img": img, |
|
"txt": txt, |
|
"vec": vec, |
|
"pe": pe, |
|
"attn_mask": attn_mask}, |
|
{"original_block": block_wrap}) |
|
txt = out["txt"] |
|
img = out["img"] |
|
else: |
|
img, txt = block(img=img, |
|
txt=txt, |
|
vec=vec, |
|
pe=pe, |
|
attn_mask=attn_mask) |
|
|
|
img = torch.cat((txt, img), 1) |
|
|
|
for i, block in enumerate(self.single_blocks): |
|
if ("single_block", i) in blocks_replace: |
|
def block_wrap(args): |
|
out = {} |
|
out["img"] = block(args["img"], |
|
vec=args["vec"], |
|
pe=args["pe"], |
|
attn_mask=args.get("attn_mask")) |
|
return out |
|
|
|
out = blocks_replace[("single_block", i)]({"img": img, |
|
"vec": vec, |
|
"pe": pe, |
|
"attn_mask": attn_mask}, |
|
{"original_block": block_wrap}) |
|
img = out["img"] |
|
else: |
|
img = block(img, vec=vec, pe=pe, attn_mask=attn_mask) |
|
|
|
img = img[:, txt.shape[1]:, ...] |
|
img = self.final_layer(img, vec) |
|
return img.movedim(-2, -1) * (-1.0) |
|
|