Spaces:
Running
on
A100
Running
on
A100
# Copyright (c) 2025 NVIDIA CORPORATION. | |
# Licensed under the MIT license. | |
# Adapted from https://github.com/NVlabs/VILA/tree/main under the Apache 2.0 license. | |
# LICENSE is in incl_licenses directory. | |
# Copyright 2024 NVIDIA CORPORATION & AFFILIATES | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
import re | |
import torch | |
import torch.nn as nn | |
from transformers import AutoConfig, AutoModel, PretrainedConfig, PreTrainedModel | |
class IdentityMap(nn.Module): | |
def __init__(self): | |
super().__init__() | |
def forward(self, x, *args, **kwargs): | |
return x | |
def config(self): | |
return {"mm_projector_type": "identity"} | |
class SimpleResBlock(nn.Module): | |
def __init__(self, channels): | |
super().__init__() | |
self.pre_norm = nn.LayerNorm(channels) | |
self.proj = nn.Sequential(nn.Linear(channels, channels), nn.GELU(), nn.Linear(channels, channels)) | |
def forward(self, x): | |
x = self.pre_norm(x) | |
return x + self.proj(x) | |
class DownSampleBlock(nn.Module): | |
def forward(self, x): | |
vit_embeds = x | |
h = w = int(vit_embeds.shape[1] ** 0.5) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | |
vit_embeds = self.flat_square(vit_embeds) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | |
return vit_embeds | |
def flat_square(self, x): | |
n, w, h, c = x.size() | |
if w % 2 == 1: | |
x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | |
n, w, h, c = x.size() | |
if h % 2 == 1: | |
x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | |
n, w, h, c = x.size() | |
x = x.contiguous() | |
x = x.view(n, w, int(h / 2), int(c * 2)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
x = x.view(n, int(h / 2), int(w / 2), int(c * 4)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
return x | |
class DownSample2x2BlockFix(nn.Module): | |
def forward(self, x): | |
vit_embeds = x | |
h = w = int(vit_embeds.shape[1] ** 0.5) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | |
vit_embeds = flat_square_2x2(vit_embeds) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | |
return vit_embeds | |
def flat_square_2x2(x): | |
n, w, h, c = x.size() | |
if w % 2 == 1: | |
x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | |
n, w, h, c = x.size() | |
x = x.contiguous() | |
if h % 2 == 1: | |
x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | |
n, w, h, c = x.size() | |
x = x.view(n, w, int(h / 2), int(c * 2)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
x = x.view(n, int(h / 2), int(w / 2), int(c * 4)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
return x | |
class DownSample3x3BlockFix(nn.Module): | |
def forward(self, x): | |
vit_embeds = x | |
h = w = int(vit_embeds.shape[1] ** 0.5) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | |
vit_embeds = flat_square_3x3(vit_embeds) | |
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | |
return vit_embeds | |
def flat_square_3x3(x): | |
n, w, h, c = x.size() | |
if w % 3 != 0: | |
x = torch.concat([x, torch.zeros((n, 3 - (w % 3), h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() | |
n, w, h, c = x.size() | |
x = x.contiguous() | |
if h % 3 != 0: | |
x = torch.concat([x, torch.zeros((n, w, 3 - (h % 3), c), dtype=x.dtype).to(x.device)], dim=2).contiguous() | |
n, w, h, c = x.size() | |
x = x.view(n, w, int(h / 3), int(c * 3)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
x = x.view(n, int(h / 3), int(w / 3), int(c * 9)) | |
x = x.permute(0, 2, 1, 3).contiguous() | |
return x | |
class MultimodalProjectorConfig(PretrainedConfig): | |
model_type = "v2l_projector" | |
def __init__(self, mm_projector_type: str = None, **kwargs): | |
super().__init__() | |
self.mm_projector_type = mm_projector_type | |
class MultimodalProjector(PreTrainedModel): | |
config_class = MultimodalProjectorConfig | |
def __init__(self, mm_projector_cfg: MultimodalProjectorConfig, config: PretrainedConfig): | |
super().__init__(mm_projector_cfg) | |
mm_projector_type = mm_projector_cfg.mm_projector_type | |
self.downsample_rate = 1 | |
if mm_projector_type == "identity": | |
self.layers = IdentityMap() | |
elif mm_projector_type == "linear": | |
self.layers = nn.Linear(config.mm_hidden_size, config.hidden_size) | |
elif mm_projector_type == "mlp_downsample": | |
self.layers = nn.Sequential( | |
DownSampleBlock(), | |
nn.LayerNorm(config.mm_hidden_size * 4), | |
nn.Linear(config.mm_hidden_size * 4, config.hidden_size), | |
nn.GELU(), | |
nn.Linear(config.hidden_size, config.hidden_size), | |
) | |
self.downsample_rate = 2 | |
elif mm_projector_type == "mlp_downsample_2x2_fix": | |
self.layers = nn.Sequential( | |
DownSample2x2BlockFix(), | |
nn.LayerNorm(config.mm_hidden_size * 4), | |
nn.Linear(config.mm_hidden_size * 4, config.hidden_size), | |
nn.GELU(), | |
nn.Linear(config.hidden_size, config.hidden_size), | |
) | |
self.downsample_rate = 2 | |
elif mm_projector_type == "mlp_downsample_3x3_fix": | |
self.layers = nn.Sequential( | |
DownSample3x3BlockFix(), | |
nn.LayerNorm(config.mm_hidden_size * 9), | |
nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 3), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size * 3), | |
nn.Linear(config.mm_hidden_size * 3, config.hidden_size), | |
nn.GELU(), | |
nn.Linear(config.hidden_size, config.hidden_size), | |
) | |
self.downsample_rate = 3 | |
elif mm_projector_type == "mlp_downsample_3x3_s2": | |
self.layers = nn.Sequential( | |
DownSample3x3BlockFix(), | |
nn.LayerNorm(config.mm_hidden_size * 9), | |
nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 3), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size * 3), | |
nn.Linear(config.mm_hidden_size * 3, config.mm_hidden_size), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size), | |
nn.Linear(config.mm_hidden_size, config.mm_hidden_size // 3), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size // 3), | |
nn.Linear(config.mm_hidden_size // 3, config.hidden_size), | |
nn.GELU(), | |
nn.Linear(config.hidden_size, config.hidden_size), | |
) | |
elif mm_projector_type == "mlp_downsample_3x3_s2_new": | |
self.layers = nn.Sequential( | |
DownSample3x3BlockFix(), | |
nn.LayerNorm(config.mm_hidden_size * 9), | |
nn.Linear(config.mm_hidden_size * 9, config.mm_hidden_size * 4), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size * 4), | |
nn.Linear(config.mm_hidden_size * 4, config.mm_hidden_size * 2), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size * 2), | |
nn.Linear(config.mm_hidden_size * 2, config.mm_hidden_size), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size), | |
nn.Linear(config.mm_hidden_size, config.mm_hidden_size // 3), | |
nn.GELU(), | |
nn.LayerNorm(config.mm_hidden_size // 3), | |
nn.Linear(config.mm_hidden_size // 3, config.hidden_size), | |
nn.GELU(), | |
nn.Linear(config.hidden_size, config.hidden_size), | |
) | |
else: | |
mlp_gelu_match = re.match(r"^mlp(\d+)x_gelu$", mm_projector_type) | |
if mlp_gelu_match: | |
mlp_depth = int(mlp_gelu_match.group(1)) | |
modules = [nn.Linear(config.mm_hidden_size, config.hidden_size)] | |
for _ in range(1, mlp_depth): | |
modules.append(nn.GELU()) | |
modules.append(nn.Linear(config.hidden_size, config.hidden_size)) | |
self.layers = nn.Sequential(*modules) | |
else: | |
raise ValueError(f"Unknown projector type: {mm_projector_type}") | |
def forward(self, x, *args, **kwargs): | |
return self.layers(x) | |
AutoConfig.register("v2l_projector", MultimodalProjectorConfig) | |
AutoModel.register(MultimodalProjectorConfig, MultimodalProjector) | |