Ops-MoA-Yuan-embedding-1.0 / modeling_adaptor.py
xzx22's picture
sbert_integration (#1)
2e57fd1 verified
import os
import json
import torch
import torch.nn as nn
import torch.nn.functional as F
class MoAGate(nn.Module):
def __init__(self, num_adaptors, hidden_dim):
super().__init__()
self.routing_vectors = nn.Parameter(
torch.empty(num_adaptors, hidden_dim, dtype=torch.float32),
requires_grad=False
)
def forward(self, hidden_states):
if self.routing_vectors.device == torch.device('cpu'):
self.routing_vectors = self.routing_vectors.to(hidden_states.device)
hidden_states = hidden_states.unsqueeze(1)
batch_size, seq_len, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
distances = torch.cdist(hidden_states, self.routing_vectors)
_, cluster_indices = torch.min(distances, dim=1)
cluster_indices = cluster_indices.view(-1, 1)
topk_indices = cluster_indices
topk_indices = torch.zeros_like(topk_indices, device=hidden_states.device)
topk_weights = torch.ones_like(topk_indices, device=hidden_states.device)
return topk_indices, topk_weights
class LinearLayer(nn.Module):
def __init__(self, input_dim, output_dim):
super().__init__()
self.linear = nn.Linear(input_dim, output_dim)
def forward(self, x):
return self.linear(x)
class MixtureOfAdaptors(nn.Module):
def __init__(self, num_adaptors, hidden_dim):
super().__init__()
self.adaptors = nn.ModuleList([
LinearLayer(input_dim=hidden_dim, output_dim=hidden_dim)
for _ in range(num_adaptors)
])
self.gate = MoAGate(num_adaptors, hidden_dim)
def forward(self, inputs):
if isinstance(inputs, dict):
hidden_states = inputs['sentence_embedding']
else:
hidden_states = inputs
residual = hidden_states
original_shape = hidden_states.shape
topk_indices, topk_weights = self.gate(hidden_states)
hidden_states = hidden_states.view(-1, hidden_states.shape[-1])
flat_topk_indices = topk_indices.view(-1)
output = self.moa_inference(hidden_states, flat_topk_indices, topk_weights.view(-1, 1)).view(*original_shape)
if isinstance(inputs, dict):
inputs['sentence_embedding'] = output
return inputs
return output
@torch.no_grad()
def moa_inference(self, x, flat_adaptor_indices, flat_adaptor_weights):
adaptor_cache = torch.zeros_like(x)
sorted_indices = flat_adaptor_indices.argsort()
tokens_per_adaptor = flat_adaptor_indices.bincount().cpu().numpy().cumsum(0)
token_indices = sorted_indices
for i, end_idx in enumerate(tokens_per_adaptor):
start_idx = 0 if i == 0 else tokens_per_adaptor[i-1]
if start_idx == end_idx:
continue
adaptor = self.adaptors[i]
adaptor_token_indices = token_indices[start_idx:end_idx]
adaptor_tokens = x[adaptor_token_indices]
adaptor_output = adaptor(adaptor_tokens)
adaptor_output.mul_(flat_adaptor_weights[sorted_indices[start_idx:end_idx]])
adaptor_cache.scatter_reduce_(
0,
adaptor_token_indices.view(-1, 1).repeat(1, x.shape[-1]),
adaptor_output,
reduce='sum'
)
return adaptor_cache
@classmethod
def load(cls, input_path):
with open(os.path.join(input_path, "config.json")) as fIn:
config = json.load(fIn)
adaptor = cls(**config)
adaptor.load_state_dict(
torch.load(
os.path.join(input_path, "adaptor.pth"), weights_only=True
)
)
return adaptor