# Copyright (c) 2025 NVIDIA CORPORATION. # Licensed under the MIT license. # Adapted from https://github.com/NVlabs/VILA/tree/main under the Apache 2.0 license. # LICENSE is in incl_licenses directory. # -------------------------------------------------------- # InternVL # Copyright (c) 2023 OpenGVLab # Licensed under The MIT License [see LICENSE for details] # -------------------------------------------------------- from typing import Optional, Tuple, Union import torch import torch.nn.functional as F import torch.utils.checkpoint from einops import rearrange from torch import nn from transformers.activations import ACT2FN from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling from transformers.modeling_utils import PreTrainedModel from transformers.utils import logging from llava.model.multimodal_encoder.intern.configuration_intern_vit import InternVisionConfig from .flash_attention import FlashAttention has_flash_attn = True logger = logging.get_logger(__name__) """ DropBlock, DropPath PyTorch implementations of DropBlock and DropPath (Stochastic Depth) regularization layers. Papers: DropBlock: A regularization method for convolutional networks (https://arxiv.org/abs/1810.12890) Deep Networks with Stochastic Depth (https://arxiv.org/abs/1603.09382) Code: DropBlock impl inspired by two Tensorflow impl that I liked: - https://github.com/tensorflow/tpu/blob/master/models/official/resnet/resnet_model.py#L74 - https://github.com/clovaai/assembled-cnn/blob/master/nets/blocks.py Hacked together by / Copyright 2020 Ross Wightman """ import torch import torch.nn as nn import torch.nn.functional as F def ndgrid(*tensors) -> Tuple[torch.Tensor, ...]: """generate N-D grid in dimension order. The ndgrid function is like meshgrid except that the order of the first two input arguments are switched. That is, the statement [X1,X2,X3] = ndgrid(x1,x2,x3) produces the same result as [X2,X1,X3] = meshgrid(x2,x1,x3) This naming is based on MATLAB, the purpose is to avoid confusion due to torch's change to make torch.meshgrid behaviour move from matching ndgrid ('ij') indexing to numpy meshgrid defaults of ('xy'). """ try: return torch.meshgrid(*tensors, indexing="ij") except TypeError: # old PyTorch < 1.10 will follow this path as it does not have indexing arg, # the old behaviour of meshgrid was 'ij' return torch.meshgrid(*tensors) def drop_block_2d( x, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, batchwise: bool = False, ): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf DropBlock with an experimental gaussian noise option. This layer has been tested on a few training runs with success, but needs further validation and possibly optimization for lower runtime impact. """ B, C, H, W = x.shape total_size = W * H clipped_block_size = min(block_size, min(W, H)) # seed_drop_rate, the gamma parameter gamma = ( gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((W - block_size + 1) * (H - block_size + 1)) ) # Forces the block to be inside the feature map. w_i, h_i = ndgrid(torch.arange(W, device=x.device), torch.arange(H, device=x.device)) valid_block = ((w_i >= clipped_block_size // 2) & (w_i < W - (clipped_block_size - 1) // 2)) & ( (h_i >= clipped_block_size // 2) & (h_i < H - (clipped_block_size - 1) // 2) ) valid_block = torch.reshape(valid_block, (1, 1, H, W)).to(dtype=x.dtype) if batchwise: # one mask for whole batch, quite a bit faster uniform_noise = torch.rand((1, C, H, W), dtype=x.dtype, device=x.device) else: uniform_noise = torch.rand_like(x) block_mask = ((2 - gamma - valid_block + uniform_noise) >= 1).to(dtype=x.dtype) block_mask = -F.max_pool2d( -block_mask, kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 # block_size, ) if with_noise: normal_noise = torch.randn((1, C, H, W), dtype=x.dtype, device=x.device) if batchwise else torch.randn_like(x) if inplace: x.mul_(block_mask).add_(normal_noise * (1 - block_mask)) else: x = x * block_mask + normal_noise * (1 - block_mask) else: normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)).to(x.dtype) if inplace: x.mul_(block_mask * normalize_scale) else: x = x * block_mask * normalize_scale return x def drop_block_fast_2d( x: torch.Tensor, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, ): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf DropBlock with an experimental gaussian noise option. Simplied from above without concern for valid block mask at edges. """ B, C, H, W = x.shape total_size = W * H clipped_block_size = min(block_size, min(W, H)) gamma = ( gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((W - block_size + 1) * (H - block_size + 1)) ) block_mask = torch.empty_like(x).bernoulli_(gamma) block_mask = F.max_pool2d( block_mask.to(x.dtype), kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 ) if with_noise: normal_noise = torch.empty_like(x).normal_() if inplace: x.mul_(1.0 - block_mask).add_(normal_noise * block_mask) else: x = x * (1.0 - block_mask) + normal_noise * block_mask else: block_mask = 1 - block_mask normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-6)).to(dtype=x.dtype) if inplace: x.mul_(block_mask * normalize_scale) else: x = x * block_mask * normalize_scale return x class DropBlock2d(nn.Module): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf""" def __init__( self, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, batchwise: bool = False, fast: bool = True, ): super().__init__() self.drop_prob = drop_prob self.gamma_scale = gamma_scale self.block_size = block_size self.with_noise = with_noise self.inplace = inplace self.batchwise = batchwise self.fast = fast # FIXME finish comparisons of fast vs not def forward(self, x): if not self.training or not self.drop_prob: return x if self.fast: return drop_block_fast_2d( x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace ) else: return drop_block_2d( x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace, self.batchwise ) def drop_path(x, drop_prob: float = 0.0, training: bool = False, scale_by_keep: bool = True): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0.0 or not training: return x keep_prob = 1 - drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets random_tensor = x.new_empty(shape).bernoulli_(keep_prob) if keep_prob > 0.0 and scale_by_keep: random_tensor.div_(keep_prob) return x * random_tensor class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" def __init__(self, drop_prob: float = 0.0, scale_by_keep: bool = True): super().__init__() self.drop_prob = drop_prob self.scale_by_keep = scale_by_keep def forward(self, x): return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) def extra_repr(self): return f"drop_prob={round(self.drop_prob,3):0.3f}" class InternRMSNorm(nn.Module): def __init__(self, hidden_size, eps=1e-6): super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon = eps def forward(self, hidden_states): input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) return self.weight * hidden_states.to(input_dtype) try: from apex.normalization import FusedRMSNorm InternRMSNorm = FusedRMSNorm # noqa logger.info("Discovered apex.normalization.FusedRMSNorm - will use it instead of InternRMSNorm") except ImportError: # using the normal InternRMSNorm pass except Exception: logger.warning("discovered apex but it failed to load, falling back to InternRMSNorm") pass class InternVisionEmbeddings(nn.Module): def __init__(self, config: InternVisionConfig): super().__init__() self.config = config self.embed_dim = config.hidden_size self.image_size = config.image_size self.patch_size = config.patch_size self.class_embedding = nn.Parameter( torch.randn(1, 1, self.embed_dim), ) self.patch_embedding = nn.Conv2d( in_channels=3, out_channels=self.embed_dim, kernel_size=self.patch_size, stride=self.patch_size ) self.num_patches = (self.image_size // self.patch_size) ** 2 self.num_positions = self.num_patches + 1 self.position_embedding = nn.Parameter(torch.randn(1, self.num_positions, self.embed_dim)) def forward(self, pixel_values: torch.FloatTensor) -> torch.Tensor: batch_size = pixel_values.shape[0] target_dtype = self.patch_embedding.weight.dtype patch_embeds = self.patch_embedding(pixel_values) # shape = [*, width, grid, grid] patch_embeds = patch_embeds.flatten(2).transpose(1, 2) class_embeds = self.class_embedding.expand(batch_size, 1, -1).to(target_dtype) embeddings = torch.cat([class_embeds, patch_embeds], dim=1) embeddings = embeddings + self.position_embedding.to(target_dtype) return embeddings class InternAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: InternVisionConfig): super().__init__() self.config = config self.embed_dim = config.hidden_size self.num_heads = config.num_attention_heads self.use_flash_attn = config.use_flash_attn and has_flash_attn if config.use_flash_attn and not has_flash_attn: print("Warning: Flash Attention is not available, use_flash_attn is set to False.") self.head_dim = self.embed_dim // self.num_heads if self.head_dim * self.num_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {self.num_heads})." ) self.scale = self.head_dim**-0.5 self.qkv = nn.Linear(self.embed_dim, 3 * self.embed_dim, bias=config.qkv_bias) self.attn_drop = nn.Dropout(config.attention_dropout) self.proj_drop = nn.Dropout(config.dropout) self.qk_normalization = config.qk_normalization if self.qk_normalization: self.q_norm = InternRMSNorm(self.embed_dim, eps=config.layer_norm_eps) self.k_norm = InternRMSNorm(self.embed_dim, eps=config.layer_norm_eps) if self.use_flash_attn: self.inner_attn = FlashAttention(attention_dropout=config.attention_dropout) self.proj = nn.Linear(self.embed_dim, self.embed_dim) def _naive_attn(self, x): B, N, C = x.shape qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) q, k, v = qkv.unbind(0) # make torchscript happy (cannot use tensor as tuple) if self.qk_normalization: B_, H_, N_, D_ = q.shape q = self.q_norm(q.transpose(1, 2).flatten(-2, -1)).view(B_, N_, H_, D_).transpose(1, 2) k = self.k_norm(k.transpose(1, 2).flatten(-2, -1)).view(B_, N_, H_, D_).transpose(1, 2) attn = (q * self.scale) @ k.transpose(-2, -1) attn = attn.softmax(dim=-1) attn = self.attn_drop(attn) x = (attn @ v).transpose(1, 2).reshape(B, N, C) x = self.proj(x) x = self.proj_drop(x) return x def _flash_attn(self, x, key_padding_mask=None, need_weights=False): qkv = self.qkv(x) qkv = rearrange(qkv, "b s (three h d) -> b s three h d", three=3, h=self.num_heads) if self.qk_normalization: q, k, v = qkv.unbind(2) q = self.q_norm(q.flatten(-2, -1)).view(q.shape) k = self.k_norm(k.flatten(-2, -1)).view(k.shape) qkv = torch.stack([q, k, v], dim=2) context, _ = self.inner_attn(qkv, key_padding_mask=key_padding_mask, need_weights=need_weights, causal=False) outs = self.proj(rearrange(context, "b s h d -> b s (h d)")) outs = self.proj_drop(outs) return outs def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: x = self._naive_attn(hidden_states) if not self.use_flash_attn else self._flash_attn(hidden_states) return x class InternMLP(nn.Module): def __init__(self, config: InternVisionConfig): super().__init__() self.config = config self.act = ACT2FN[config.hidden_act] self.fc1 = nn.Linear(config.hidden_size, config.intermediate_size) self.fc2 = nn.Linear(config.intermediate_size, config.hidden_size) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: hidden_states = self.fc1(hidden_states) hidden_states = self.act(hidden_states) hidden_states = self.fc2(hidden_states) return hidden_states class InternVisionEncoderLayer(nn.Module): def __init__(self, config: InternVisionConfig, drop_path_rate: float): super().__init__() self.embed_dim = config.hidden_size self.intermediate_size = config.intermediate_size self.attn = InternAttention(config) self.mlp = InternMLP(config) self.norm1 = InternRMSNorm(self.embed_dim, eps=config.layer_norm_eps) self.norm2 = InternRMSNorm(self.embed_dim, eps=config.layer_norm_eps) self.ls1 = nn.Parameter(config.initializer_factor * torch.ones(self.embed_dim)) self.ls2 = nn.Parameter(config.initializer_factor * torch.ones(self.embed_dim)) self.drop_path1 = DropPath(drop_path_rate) if drop_path_rate > 0.0 else nn.Identity() self.drop_path2 = DropPath(drop_path_rate) if drop_path_rate > 0.0 else nn.Identity() def forward( self, hidden_states: torch.Tensor, ) -> Tuple[torch.FloatTensor, Optional[torch.FloatTensor], Optional[Tuple[torch.FloatTensor]]]: """ Args: hidden_states (`Tuple[torch.FloatTensor, Optional[torch.FloatTensor]]`): input to the layer of shape `(batch, seq_len, embed_dim)` """ hidden_states = hidden_states + self.drop_path1(self.attn(self.norm1(hidden_states)) * self.ls1) hidden_states = hidden_states + self.drop_path2(self.mlp(self.norm2(hidden_states)) * self.ls2) return hidden_states class InternVisionEncoder(nn.Module): """ Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a [`InternEncoderLayer`]. Args: config (`InternConfig`): The corresponding vision configuration for the `InternEncoder`. """ def __init__(self, config: InternVisionConfig): super().__init__() self.config = config # stochastic depth decay rule dpr = [x.item() for x in torch.linspace(0, config.drop_path_rate, config.num_hidden_layers)] self.layers = nn.ModuleList( [InternVisionEncoderLayer(config, dpr[idx]) for idx in range(config.num_hidden_layers)] ) self.gradient_checkpointing = True def forward( self, inputs_embeds, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, ) -> Union[Tuple, BaseModelOutput]: r""" Args: inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Embedded representation of the inputs. Should be float, not int tokens. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. return_dict (`bool`, *optional*): Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. """ output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict encoder_states = () if output_hidden_states else None hidden_states = inputs_embeds for idx, encoder_layer in enumerate(self.layers): if output_hidden_states: encoder_states = encoder_states + (hidden_states,) if self.gradient_checkpointing and self.training: layer_outputs = torch.utils.checkpoint.checkpoint(encoder_layer, hidden_states) else: layer_outputs = encoder_layer( hidden_states, ) hidden_states = layer_outputs if output_hidden_states: encoder_states = encoder_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, encoder_states] if v is not None) return BaseModelOutput(last_hidden_state=hidden_states, hidden_states=encoder_states) class InternVisionModel(PreTrainedModel): main_input_name = "pixel_values" config_class = InternVisionConfig _no_split_modules = ["InternVisionEncoderLayer"] def __init__(self, config: InternVisionConfig): super().__init__(config) self.config = config self.embeddings = InternVisionEmbeddings(config) self.encoder = InternVisionEncoder(config) def resize_pos_embeddings(self, old_size, new_size, patch_size): pos_emb = self.embeddings.position_embedding _, num_positions, embed_dim = pos_emb.shape cls_emb = pos_emb[:, :1, :] pos_emb = pos_emb[:, 1:, :].reshape(1, old_size // patch_size, old_size // patch_size, -1).permute(0, 3, 1, 2) pos_emb = F.interpolate(pos_emb.float(), size=new_size // patch_size, mode="bicubic", align_corners=False) pos_emb = pos_emb.to(cls_emb.dtype).reshape(1, embed_dim, -1).permute(0, 2, 1) pos_emb = torch.cat([cls_emb, pos_emb], dim=1) self.embeddings.position_embedding = nn.Parameter(pos_emb) logger.info(f"Resized position embeddings from {old_size} to {new_size}") def get_input_embeddings(self): return self.embeddings def forward( self, pixel_values: Optional[torch.FloatTensor] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, pixel_embeds: Optional[torch.FloatTensor] = None, ) -> Union[Tuple, BaseModelOutputWithPooling]: output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict if pixel_values is None and pixel_embeds is None: raise ValueError("You have to specify pixel_values or pixel_embeds") if pixel_embeds is not None: hidden_states = pixel_embeds else: if len(pixel_values.shape) == 4: hidden_states = self.embeddings(pixel_values) else: raise ValueError(f"wrong pixel_values size: {pixel_values.shape}") encoder_outputs = self.encoder( inputs_embeds=hidden_states, output_hidden_states=output_hidden_states, return_dict=return_dict, ) last_hidden_state = encoder_outputs.last_hidden_state pooled_output = last_hidden_state[:, 0, :] if not return_dict: return (last_hidden_state, pooled_output) + encoder_outputs[1:] return BaseModelOutputWithPooling( last_hidden_state=last_hidden_state, pooler_output=pooled_output, hidden_states=encoder_outputs.hidden_states, attentions=encoder_outputs.attentions, )