Spaces:
Running
on
Zero
Running
on
Zero
import torch | |
import os | |
from torch import nn, Tensor, einsum, IntTensor, FloatTensor, BoolTensor | |
import random | |
def get_rank(): | |
"""Get rank of current process.""" | |
print(os.environ.keys()) | |
if "SLURM_PROCID" in os.environ: | |
return int(os.environ["SLURM_PROCID"]) | |
if not torch.distributed.is_available() or not torch.distributed.is_initialized(): | |
return 0 | |
return torch.distributed.get_rank() | |
class InverseLR(torch.optim.lr_scheduler._LRScheduler): | |
"""Implements an inverse decay learning rate schedule with an optional exponential | |
warmup. When last_epoch=-1, sets initial lr as lr. | |
inv_gamma is the number of steps/epochs required for the learning rate to decay to | |
(1 / 2)**power of its original value. | |
Args: | |
optimizer (Optimizer): Wrapped optimizer. | |
inv_gamma (float): Inverse multiplicative factor of learning rate decay. Default: 1. | |
power (float): Exponential factor of learning rate decay. Default: 1. | |
warmup (float): Exponential warmup factor (0 <= warmup < 1, 0 to disable) | |
Default: 0. | |
final_lr (float): The final learning rate. Default: 0. | |
last_epoch (int): The index of last epoch. Default: -1. | |
verbose (bool): If ``True``, prints a message to stdout for | |
each update. Default: ``False``. | |
""" | |
def __init__(self, optimizer, inv_gamma=1., power=1., warmup=0., final_lr=0., | |
last_epoch=-1, verbose=False): | |
self.inv_gamma = inv_gamma | |
self.power = power | |
if not 0. <= warmup < 1: | |
raise ValueError('Invalid value for warmup') | |
self.warmup = warmup | |
self.final_lr = final_lr | |
super().__init__(optimizer, last_epoch, verbose) | |
def get_lr(self): | |
if not self._get_lr_called_within_step: | |
import warnings | |
warnings.warn("To get the last learning rate computed by the scheduler, " | |
"please use `get_last_lr()`.") | |
return self._get_closed_form_lr() | |
def _get_closed_form_lr(self): | |
warmup = 1 - self.warmup ** (self.last_epoch + 1) | |
lr_mult = (1 + self.last_epoch / self.inv_gamma) ** -self.power | |
return [warmup * max(self.final_lr, base_lr * lr_mult) | |
for base_lr in self.base_lrs] | |
def copy_state_dict(model, state_dict): | |
"""Load state_dict to model, but only for keys that match exactly. | |
Args: | |
model (nn.Module): model to load state_dict. | |
state_dict (OrderedDict): state_dict to load. | |
""" | |
model_state_dict = model.state_dict() | |
# 创建一个列表存储不匹配的参数 | |
missing_keys = [] | |
unexpected_keys = [] | |
# 手动加载并检查不匹配的参数 | |
for key in state_dict: | |
if key not in model_state_dict: | |
unexpected_keys.append(key) | |
elif state_dict[key].shape != model_state_dict[key].shape: | |
unexpected_keys.append(key) | |
for key in model_state_dict: | |
if key not in state_dict: | |
missing_keys.append(key) | |
# 打印不匹配的参数 | |
print("Missing keys in state_dict:", missing_keys) | |
print("Unexpected keys in state_dict:", unexpected_keys) | |
for key in state_dict: | |
if key in model_state_dict and state_dict[key].shape == model_state_dict[key].shape: | |
if isinstance(state_dict[key], torch.nn.Parameter): | |
# backwards compatibility for serialized parameters | |
state_dict[key] = state_dict[key].data | |
model_state_dict[key] = state_dict[key] | |
model.load_state_dict(model_state_dict, strict=False) | |
def create_optimizer_from_config(optimizer_config, parameters): | |
"""Create optimizer from config. | |
Args: | |
parameters (iterable): parameters to optimize. | |
optimizer_config (dict): optimizer config. | |
Returns: | |
torch.optim.Optimizer: optimizer. | |
""" | |
optimizer_type = optimizer_config["type"] | |
if optimizer_type == "FusedAdam": | |
from deepspeed.ops.adam import FusedAdam | |
optimizer = FusedAdam(parameters, **optimizer_config["config"]) | |
else: | |
optimizer_fn = getattr(torch.optim, optimizer_type) | |
optimizer = optimizer_fn(parameters, **optimizer_config["config"]) | |
return optimizer | |
def create_scheduler_from_config(scheduler_config, optimizer): | |
"""Create scheduler from config. | |
Args: | |
scheduler_config (dict): scheduler config. | |
optimizer (torch.optim.Optimizer): optimizer. | |
Returns: | |
torch.optim.lr_scheduler._LRScheduler: scheduler. | |
""" | |
if scheduler_config["type"] == "InverseLR": | |
scheduler_fn = InverseLR | |
else: | |
scheduler_fn = getattr(torch.optim.lr_scheduler, scheduler_config["type"]) | |
scheduler = scheduler_fn(optimizer, **scheduler_config["config"]) | |
return scheduler | |
# mask construction helpers | |
def mask_from_start_end_indices( | |
seq_len: int, | |
start: Tensor, | |
end: Tensor | |
): | |
assert start.shape == end.shape | |
device = start.device | |
seq = torch.arange(seq_len, device = device, dtype = torch.long) | |
seq = seq.reshape(*((-1,) * start.ndim), seq_len) | |
seq = seq.expand(*start.shape, seq_len) | |
mask = seq >= start[..., None].long() | |
mask &= seq < end[..., None].long() | |
return mask | |
def mask_from_frac_lengths( | |
seq_len: int, | |
frac_lengths: Tensor | |
): | |
device = frac_lengths.device | |
lengths = (frac_lengths * seq_len).long() | |
max_start = seq_len - lengths | |
rand = torch.zeros_like(frac_lengths, device = device).float().uniform_(0, 1) | |
start = (max_start * rand).clamp(min = 0) | |
end = start + lengths | |
return mask_from_start_end_indices(seq_len, start, end) | |
def generate_mask(batch_size, seq_len, frac_lengths, min_span_len): | |
# 计算需要掩盖的起始数量 | |
n_mask = (frac_lengths * seq_len // min_span_len).long() # 每个 span 为 10 | |
# 初始化掩码张量,初始为全 0(未掩盖) | |
mask_tensor = torch.zeros((batch_size, seq_len), device=frac_lengths.device, dtype=torch.bool) | |
for b in range(batch_size): | |
# 随机挑选起始帧 | |
start_frames = random.sample(range(0, seq_len - min_span_len + 1), n_mask[b]) # 0 到 seq_len-10 的范围 | |
for start in start_frames: | |
# 将 span 为 10 的区域标记为 1(掩盖) | |
mask_tensor[b, start:start + 10] = 1.0 | |
return mask_tensor | |
def generate_channel_mask(diffusion_input): | |
# 如果 r_drop 小于 threshold,则对每个样本选择一个随机声道进行完全 mask | |
batchsize, num_channels, dim = diffusion_input.shape | |
for i in range(batchsize): | |
channel_means = torch.mean(torch.abs(diffusion_input[i]), dim=1) # Mean of the absolute values for each channel | |
# Determine if any channel is 'small enough' | |
if torch.all(channel_means > 0.01): | |
# If all channels are not 'small enough', apply the mask | |
channel = torch.randint(num_channels, (1,)).item() | |
diffusion_input[i, channel, :] = 1e-8 # Mask the channel by setting its values | |
else: | |
# Optionally log that at least one channel is 'small enough' and no mask is applied | |
print(f"Sample {i}: At least one channel is 'small enough', skipping masking.") | |
return diffusion_input | |