Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import torch | |
import torch.nn as nn | |
import numpy as np | |
from torch.utils.data import Sampler | |
from transformers import Trainer | |
from transformers.trainer import ( | |
is_sagemaker_mp_enabled, | |
get_parameter_names, | |
has_length, | |
ALL_LAYERNORM_LAYERS, | |
logger, | |
) | |
from typing import List, Optional | |
from transformers.utils import is_torch_xla_available | |
if is_torch_xla_available(): | |
import torch_xla.core.xla_model as xm | |
import torch_xla.debug.metrics as met | |
from torch_xla import __version__ as XLA_VERSION | |
IS_XLA_FSDPV2_POST_2_2 = version.parse(XLA_VERSION) >= version.parse(XLA_FSDPV2_MIN_VERSION) | |
if IS_XLA_FSDPV2_POST_2_2: | |
import torch_xla.distributed.spmd as xs | |
import torch_xla.runtime as xr | |
else: | |
IS_XLA_FSDPV2_POST_2_2 = False | |
def maybe_zero_3(param, ignore_status=False, name=None): | |
from deepspeed import zero | |
from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus | |
if hasattr(param, "ds_id"): | |
if param.ds_status == ZeroParamStatus.NOT_AVAILABLE: | |
if not ignore_status: | |
print(name, "no ignore status") | |
with zero.GatheredParameters([param]): | |
param = param.data.detach().cpu().clone() | |
else: | |
param = param.detach().cpu().clone() | |
return param | |
def get_mm_adapter_state_maybe_zero_3(named_params, keys_to_match): | |
to_return = {k: t for k, t in named_params if any(key_match in k for key_match in keys_to_match)} | |
to_return = {k: maybe_zero_3(v, ignore_status=True, name=k).cpu() for k, v in to_return.items()} | |
return to_return | |
def split_to_even_chunks(indices, lengths, num_chunks): | |
""" | |
Split a list of indices into `chunks` chunks of roughly equal lengths. | |
""" | |
if len(indices) % num_chunks != 0: | |
return [indices[i::num_chunks] for i in range(num_chunks)] | |
num_indices_per_chunk = len(indices) // num_chunks | |
chunks = [[] for _ in range(num_chunks)] | |
chunks_lengths = [0 for _ in range(num_chunks)] | |
for index in indices: | |
shortest_chunk = chunks_lengths.index(min(chunks_lengths)) | |
chunks[shortest_chunk].append(index) | |
chunks_lengths[shortest_chunk] += lengths[index] | |
if len(chunks[shortest_chunk]) == num_indices_per_chunk: | |
chunks_lengths[shortest_chunk] = float("inf") | |
return chunks | |
def get_modality_length_grouped_indices(lengths, batch_size, world_size, generator=None): | |
# We need to use torch for the random part as a distributed sampler will set the random seed for torch. | |
assert all(l != 0 for l in lengths), "Should not have zero length." | |
if all(l > 0 for l in lengths) or all(l < 0 for l in lengths): | |
# all samples are in the same modality | |
return get_length_grouped_indices(lengths, batch_size, world_size, generator=generator) | |
mm_indices, mm_lengths = zip(*[(i, l) for i, l in enumerate(lengths) if l > 0]) | |
lang_indices, lang_lengths = zip(*[(i, -l) for i, l in enumerate(lengths) if l < 0]) | |
mm_shuffle = [mm_indices[i] for i in get_length_grouped_indices(mm_lengths, batch_size, world_size, generator=None)] | |
lang_shuffle = [lang_indices[i] for i in get_length_grouped_indices(lang_lengths, batch_size, world_size, generator=None)] | |
megabatch_size = world_size * batch_size | |
mm_megabatches = [mm_shuffle[i : i + megabatch_size] for i in range(0, len(mm_shuffle), megabatch_size)] | |
lang_megabatches = [lang_shuffle[i : i + megabatch_size] for i in range(0, len(lang_shuffle), megabatch_size)] | |
last_mm = mm_megabatches[-1] | |
last_lang = lang_megabatches[-1] | |
additional_batch = last_mm + last_lang | |
megabatches = mm_megabatches[:-1] + lang_megabatches[:-1] | |
megabatch_indices = torch.randperm(len(megabatches), generator=generator) | |
megabatches = [megabatches[i] for i in megabatch_indices] | |
if len(additional_batch) > 0: | |
megabatches.append(sorted(additional_batch)) | |
return [i for megabatch in megabatches for i in megabatch] | |
def get_length_grouped_indices(lengths, batch_size, world_size, generator=None, merge=True): | |
# We need to use torch for the random part as a distributed sampler will set the random seed for torch. | |
indices = torch.randperm(len(lengths), generator=generator) | |
megabatch_size = world_size * batch_size | |
megabatches = [indices[i : i + megabatch_size].tolist() for i in range(0, len(lengths), megabatch_size)] | |
megabatches = [sorted(megabatch, key=lambda i: lengths[i], reverse=True) for megabatch in megabatches] | |
megabatches = [split_to_even_chunks(megabatch, lengths, world_size) for megabatch in megabatches] | |
return [i for megabatch in megabatches for batch in megabatch for i in batch] | |
class LengthGroupedSampler(Sampler): | |
r""" | |
Sampler that samples indices in a way that groups together features of the dataset of roughly the same length while | |
keeping a bit of randomness. | |
""" | |
def __init__( | |
self, | |
batch_size: int, | |
world_size: int, | |
lengths: Optional[List[int]] = None, | |
generator=None, | |
group_by_modality: bool = False, | |
): | |
if lengths is None: | |
raise ValueError("Lengths must be provided.") | |
self.batch_size = batch_size | |
self.world_size = world_size | |
self.lengths = lengths | |
self.generator = generator | |
self.group_by_modality = group_by_modality | |
def __len__(self): | |
return len(self.lengths) | |
def __iter__(self): | |
if self.group_by_modality: | |
indices = get_modality_length_grouped_indices(self.lengths, self.batch_size, self.world_size, generator=self.generator) | |
else: | |
indices = get_length_grouped_indices(self.lengths, self.batch_size, self.world_size, generator=self.generator) | |
return iter(indices) | |
class blip3oTrainer(Trainer): | |
def _get_train_sampler(self) -> Optional[torch.utils.data.Sampler]: | |
if self.train_dataset is None or not has_length(self.train_dataset): | |
return None | |
if self.args.group_by_modality_length: | |
lengths = self.train_dataset.modality_lengths | |
return LengthGroupedSampler( | |
self.args.train_batch_size, | |
world_size=self.args.world_size * self.args.gradient_accumulation_steps, | |
lengths=lengths, | |
group_by_modality=True, | |
) | |
else: | |
return super()._get_train_sampler() | |
# def _maybe_log_save_evaluate(self, tr_loss, grad_norm, model, trial, epoch, ignore_keys_for_eval, start_time): | |
# if not hasattr(self, "largest_loss"): | |
# self.largest_loss = tr_loss.item() | |
# self.largest_grad_norm = grad_norm | |
# self.latest_grad_norm = grad_norm | |
# else: | |
# if tr_loss.item() > 10 * self.largest_loss: | |
# print(f"Loss Spiked: {tr_loss.item()} -> {self.largest_loss}") | |
# self.control.should_training_stop = True | |
# if grad_norm > 10 * self.latest_grad_norm and grad_norm > 3: | |
# print(f"Grad Norm Spiked: {grad_norm} -> {self.latest_grad_norm}") | |
# self.control.should_training_stop = True | |
# self.largest_loss = max(tr_loss.item(), self.largest_loss) | |
# self.largest_grad_norm = max(grad_norm, self.largest_grad_norm) | |
# self.latest_grad_norm = grad_norm | |
# if np.isnan(grad_norm) or grad_norm > 1e6: | |
# print(f"NaN grad norm detected in process {self.args.process_index} on {os.uname().nodename}") | |
# self.control.should_training_stop = True | |
# print(f"Shut Down Training") | |
# if self.control.should_log and self.state.global_step > self._globalstep_last_logged: | |
# if is_torch_xla_available(): | |
# xm.mark_step() | |
# logs: Dict[str, float] = {} | |
# # all_gather + mean() to get average loss over all processes | |
# tr_loss_scalar = self._nested_gather(tr_loss).mean().item() | |
# # reset tr_loss to zero | |
# tr_loss -= tr_loss | |
# logs["loss"] = round(tr_loss_scalar / (self.state.global_step - self._globalstep_last_logged), 4) | |
# if grad_norm is not None: | |
# logs["grad_norm"] = grad_norm.detach().item() if isinstance(grad_norm, torch.Tensor) else grad_norm | |
# logs["learning_rate"] = self._get_learning_rate() | |
# self._total_loss_scalar += tr_loss_scalar | |
# self._globalstep_last_logged = self.state.global_step | |
# self.store_flos() | |
# self.log(logs, start_time) | |
# metrics = None | |
# if self.control.should_evaluate: | |
# metrics = self._evaluate(trial, ignore_keys_for_eval) | |
# is_new_best_metric = self._determine_best_metric(metrics=metrics, trial=trial) | |
# if self.args.save_strategy == SaveStrategy.BEST: | |
# self.control.should_save = is_new_best_metric | |
# if self.control.should_save: | |
# self._save_checkpoint(model, trial) | |
# self.control = self.callback_handler.on_save(self.args, self.state, self.control) | |
def create_optimizer(self): | |
""" | |
Setup the optimizer. | |
We provide a reasonable default that works well. If you want to use something else, you can pass a tuple in the | |
Trainer's init through `optimizers`, or subclass and override this method in a subclass. | |
""" | |
if is_sagemaker_mp_enabled(): | |
return super().create_optimizer() | |
opt_model = self.model | |
if self.optimizer is None: | |
decay_parameters = get_parameter_names(opt_model, ALL_LAYERNORM_LAYERS) | |
decay_parameters = [name for name in decay_parameters if "bias" not in name] | |
if self.args.mm_projector_lr is not None: | |
projector_parameters = [name for name, _ in opt_model.named_parameters() if "mm_projector" in name] | |
optimizer_grouped_parameters = [ | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n in decay_parameters and n not in projector_parameters and p.requires_grad)], | |
"weight_decay": self.args.weight_decay, | |
}, | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n not in decay_parameters and n not in projector_parameters and p.requires_grad)], | |
"weight_decay": 0.0, | |
}, | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n in decay_parameters and n in projector_parameters and p.requires_grad)], | |
"weight_decay": self.args.weight_decay, | |
"lr": self.args.mm_projector_lr, | |
}, | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n not in decay_parameters and n in projector_parameters and p.requires_grad)], | |
"weight_decay": 0.0, | |
"lr": self.args.mm_projector_lr, | |
}, | |
] | |
else: | |
optimizer_grouped_parameters = [ | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n in decay_parameters and p.requires_grad)], | |
"weight_decay": self.args.weight_decay, | |
}, | |
{ | |
"params": [p for n, p in opt_model.named_parameters() if (n not in decay_parameters and p.requires_grad)], | |
"weight_decay": 0.0, | |
}, | |
] | |
optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs(self.args) | |
self.optimizer = optimizer_cls(optimizer_grouped_parameters, **optimizer_kwargs) | |
if optimizer_cls.__name__ == "Adam8bit": | |
import bitsandbytes | |
manager = bitsandbytes.optim.GlobalOptimManager.get_instance() | |
skipped = 0 | |
for module in opt_model.modules(): | |
if isinstance(module, nn.Embedding): | |
skipped += sum({p.data_ptr(): p.numel() for p in module.parameters()}.values()) | |
logger.info(f"skipped {module}: {skipped/2**20}M params") | |
manager.register_module_override(module, "weight", {"optim_bits": 32}) | |
logger.debug(f"bitsandbytes: will optimize {module} in fp32") | |
logger.info(f"skipped: {skipped/2**20}M params") | |
return self.optimizer | |