import os import json import torch import torch.nn as nn import torch.nn.functional as F class MoAGate(nn.Module): def __init__(self, num_adaptors, hidden_dim): super().__init__() self.routing_vectors = nn.Parameter( torch.empty(num_adaptors, hidden_dim, dtype=torch.float32), requires_grad=False ) def forward(self, hidden_states): if self.routing_vectors.device == torch.device('cpu'): self.routing_vectors = self.routing_vectors.to(hidden_states.device) hidden_states = hidden_states.unsqueeze(1) batch_size, seq_len, hidden_dim = hidden_states.shape hidden_states = hidden_states.view(-1, hidden_dim) distances = torch.cdist(hidden_states, self.routing_vectors) _, cluster_indices = torch.min(distances, dim=1) cluster_indices = cluster_indices.view(-1, 1) topk_indices = cluster_indices topk_indices = torch.zeros_like(topk_indices, device=hidden_states.device) topk_weights = torch.ones_like(topk_indices, device=hidden_states.device) return topk_indices, topk_weights class LinearLayer(nn.Module): def __init__(self, input_dim, output_dim): super().__init__() self.linear = nn.Linear(input_dim, output_dim) def forward(self, x): return self.linear(x) class MixtureOfAdaptors(nn.Module): def __init__(self, num_adaptors, hidden_dim): super().__init__() self.adaptors = nn.ModuleList([ LinearLayer(input_dim=hidden_dim, output_dim=hidden_dim) for _ in range(num_adaptors) ]) self.gate = MoAGate(num_adaptors, hidden_dim) def forward(self, inputs): if isinstance(inputs, dict): hidden_states = inputs['sentence_embedding'] else: hidden_states = inputs residual = hidden_states original_shape = hidden_states.shape topk_indices, topk_weights = self.gate(hidden_states) hidden_states = hidden_states.view(-1, hidden_states.shape[-1]) flat_topk_indices = topk_indices.view(-1) output = self.moa_inference(hidden_states, flat_topk_indices, topk_weights.view(-1, 1)).view(*original_shape) if isinstance(inputs, dict): inputs['sentence_embedding'] = output return inputs return output @torch.no_grad() def moa_inference(self, x, flat_adaptor_indices, flat_adaptor_weights): adaptor_cache = torch.zeros_like(x) sorted_indices = flat_adaptor_indices.argsort() tokens_per_adaptor = flat_adaptor_indices.bincount().cpu().numpy().cumsum(0) token_indices = sorted_indices for i, end_idx in enumerate(tokens_per_adaptor): start_idx = 0 if i == 0 else tokens_per_adaptor[i-1] if start_idx == end_idx: continue adaptor = self.adaptors[i] adaptor_token_indices = token_indices[start_idx:end_idx] adaptor_tokens = x[adaptor_token_indices] adaptor_output = adaptor(adaptor_tokens) adaptor_output.mul_(flat_adaptor_weights[sorted_indices[start_idx:end_idx]]) adaptor_cache.scatter_reduce_( 0, adaptor_token_indices.view(-1, 1).repeat(1, x.shape[-1]), adaptor_output, reduce='sum' ) return adaptor_cache @classmethod def load(cls, input_path): with open(os.path.join(input_path, "config.json")) as fIn: config = json.load(fIn) adaptor = cls(**config) adaptor.load_state_dict( torch.load( os.path.join(input_path, "adaptor.pth"), weights_only=True ) ) return adaptor