import torch from safetensors.torch import load_file from torch import nn, Tensor, einsum, IntTensor, FloatTensor, BoolTensor from torchcubicspline import natural_cubic_spline_coeffs, NaturalCubicSpline from torch.nn.utils import remove_weight_norm def load_ckpt_state_dict(ckpt_path, prefix=None): if ckpt_path.endswith(".safetensors"): state_dict = load_file(ckpt_path) else: state_dict = torch.load(ckpt_path, map_location="cpu")["state_dict"] # 过滤特定前缀的state_dict filtered_state_dict = {k.replace(f'{prefix}',''): v for k, v in state_dict.items() if k.startswith(prefix)} if prefix is not None else state_dict return filtered_state_dict def remove_weight_norm_from_model(model): for module in model.modules(): if hasattr(module, "weight"): print(f"Removing weight norm from {module}") remove_weight_norm(module) return model # Sampling functions copied from https://github.com/facebookresearch/audiocraft/blob/main/audiocraft/utils/utils.py under MIT license # License can be found in LICENSES/LICENSE_META.txt def multinomial(input: torch.Tensor, num_samples: int, replacement=False, *, generator=None): """torch.multinomial with arbitrary number of dimensions, and number of candidates on the last dimension. Args: input (torch.Tensor): The input tensor containing probabilities. num_samples (int): Number of samples to draw. replacement (bool): Whether to draw with replacement or not. Keywords args: generator (torch.Generator): A pseudorandom number generator for sampling. Returns: torch.Tensor: Last dimension contains num_samples indices sampled from the multinomial probability distribution located in the last dimension of tensor input. """ if num_samples == 1: q = torch.empty_like(input).exponential_(1, generator=generator) return torch.argmax(input / q, dim=-1, keepdim=True).to(torch.int64) input_ = input.reshape(-1, input.shape[-1]) output_ = torch.multinomial(input_, num_samples=num_samples, replacement=replacement, generator=generator) output = output_.reshape(*list(input.shape[:-1]), -1) return output def sample_top_k(probs: torch.Tensor, k: int) -> torch.Tensor: """Sample next token from top K values along the last dimension of the input probs tensor. Args: probs (torch.Tensor): Input probabilities with token candidates on the last dimension. k (int): The k in “top-k”. Returns: torch.Tensor: Sampled tokens. """ top_k_value, _ = torch.topk(probs, k, dim=-1) min_value_top_k = top_k_value[..., [-1]] probs *= (probs >= min_value_top_k).float() probs.div_(probs.sum(dim=-1, keepdim=True)) next_token = multinomial(probs, num_samples=1) return next_token def sample_top_p(probs: torch.Tensor, p: float) -> torch.Tensor: """Sample next token from top P probabilities along the last dimension of the input probs tensor. Args: probs (torch.Tensor): Input probabilities with token candidates on the last dimension. p (int): The p in “top-p”. Returns: torch.Tensor: Sampled tokens. """ probs_sort, probs_idx = torch.sort(probs, dim=-1, descending=True) probs_sum = torch.cumsum(probs_sort, dim=-1) mask = probs_sum - probs_sort > p probs_sort *= (~mask).float() probs_sort.div_(probs_sort.sum(dim=-1, keepdim=True)) next_token = multinomial(probs_sort, num_samples=1) next_token = torch.gather(probs_idx, -1, next_token) return next_token def next_power_of_two(n): return 2 ** (n - 1).bit_length() def next_multiple_of_64(n): return ((n + 63) // 64) * 64 # mask construction helpers def mask_from_start_end_indices( seq_len: int, start: Tensor, end: Tensor ): assert start.shape == end.shape device = start.device seq = torch.arange(seq_len, device = device, dtype = torch.long) seq = seq.reshape(*((-1,) * start.ndim), seq_len) seq = seq.expand(*start.shape, seq_len) mask = seq >= start[..., None].long() mask &= seq < end[..., None].long() return mask def mask_from_frac_lengths( seq_len: int, frac_lengths: Tensor ): device = frac_lengths.device lengths = (frac_lengths * seq_len).long() max_start = seq_len - lengths rand = torch.zeros_like(frac_lengths, device = device).float().uniform_(0, 1) start = (max_start * rand).clamp(min = 0) end = start + lengths return mask_from_start_end_indices(seq_len, start, end) def _build_spline(video_feat, video_t, target_t): # 三次样条插值核心实现 coeffs = natural_cubic_spline_coeffs(video_t, video_feat.permute(0,2,1)) spline = NaturalCubicSpline(coeffs) return spline.evaluate(target_t).permute(0,2,1) def resample(video_feat, audio_latent): """ 9s video_feat: [B, 72, D] audio_latent: [B, D', 194] or int """ B, Tv, D = video_feat.shape if isinstance(audio_latent, torch.Tensor): # audio_latent is a tensor if audio_latent.shape[1] != D: Ta = audio_latent.shape[1] else: Ta = audio_latent.shape[2] elif isinstance(audio_latent, int): # audio_latent is an int Ta = audio_latent else: raise TypeError("audio_latent must be either a tensor or an int") # 构建时间戳 (关键改进点) video_time = torch.linspace(0, 9, Tv, device=video_feat.device) audio_time = torch.linspace(0, 9, Ta, device=video_feat.device) # 三维化处理 (Batch, Feature, Time) video_feat = video_feat.permute(0, 2, 1) # [B, D, Tv] # 三次样条插值 aligned_video = _build_spline(video_feat, video_time, audio_time) # [B, D, Ta] return aligned_video.permute(0, 2, 1) # [B, Ta, D]