Spaces:
Running
on
A100
Running
on
A100
# Copyright (c) 2025 NVIDIA CORPORATION. | |
# Licensed under the MIT license. | |
# Adapted from https://github.com/NVlabs/VILA/tree/main under the Apache 2.0 license. | |
# LICENSE is in incl_licenses directory. | |
# Copyright 2024 NVIDIA CORPORATION & AFFILIATES | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
import transformers | |
from transformers.image_transforms import ( | |
ChannelDimension, | |
Iterable, | |
Optional, | |
Union, | |
get_channel_dimension_axis, | |
infer_channel_dimension_format, | |
np, | |
to_channel_dimension_format, | |
) | |
def patched_normalize( | |
image: np.ndarray, | |
mean: Union[float, Iterable[float]], | |
std: Union[float, Iterable[float]], | |
data_format: Optional[ChannelDimension] = None, | |
input_data_format: Optional[Union[str, ChannelDimension]] = None, | |
) -> np.ndarray: | |
""" | |
Normalizes `image` using the mean and standard deviation specified by `mean` and `std`. | |
image = (image - mean) / std | |
Args: | |
image (`np.ndarray`): | |
The image to normalize. | |
mean (`float` or `Iterable[float]`): | |
The mean to use for normalization. | |
std (`float` or `Iterable[float]`): | |
The standard deviation to use for normalization. | |
data_format (`ChannelDimension`, *optional*): | |
The channel dimension format of the output image. If unset, will use the inferred format from the input. | |
""" | |
if not isinstance(image, np.ndarray): | |
raise ValueError("image must be a numpy array") | |
input_data_format = infer_channel_dimension_format(image) | |
channel_axis = get_channel_dimension_axis(image) | |
num_channels = image.shape[channel_axis] | |
if isinstance(mean, Iterable): | |
if len(mean) != num_channels: | |
if num_channels == 1: | |
num_channels = 3 | |
image = np.concatenate([image, image, image], axis=channel_axis) | |
else: | |
raise ValueError(f"mean must have {num_channels} elements if it is an iterable, got {len(mean)}") | |
else: | |
mean = [mean] * num_channels | |
mean = np.array(mean, dtype=image.dtype) | |
if isinstance(std, Iterable): | |
if len(std) != num_channels: | |
raise ValueError(f"std must have {num_channels} elements if it is an iterable, got {len(std)}") | |
else: | |
std = [std] * num_channels | |
std = np.array(std, dtype=image.dtype) | |
if input_data_format == ChannelDimension.LAST: | |
image = (image - mean) / std | |
else: | |
image = ((image.T - mean) / std).T | |
image = to_channel_dimension_format(image, data_format) if data_format is not None else image | |
return image | |
def patch_normalize_preprocess(): | |
transformers.image_transforms.normalize = patched_normalize | |
import os | |
import torch | |
from transformers.trainer_utils import PREFIX_CHECKPOINT_DIR | |
from transformers.utils import logging | |
TRAINER_STATE_NAME = "trainer_state.json" | |
logger = logging.get_logger(__name__) | |
def _save_checkpoint(self, model, trial, metrics=None): | |
# In all cases, including ddp/dp/deepspeed, self.model is always a reference to the model we | |
# want to save except FullyShardedDDP. | |
# assert unwrap_model(model) is self.model, "internal model should be a reference to self.model" | |
# Save model checkpoint | |
checkpoint_folder = f"{PREFIX_CHECKPOINT_DIR}-{self.state.global_step}" | |
if self.hp_search_backend is None and trial is None: | |
self.store_flos() | |
run_dir = self._get_output_dir(trial=trial) | |
output_dir = os.path.join(run_dir, checkpoint_folder) | |
if os.path.exists(output_dir) and len(os.listdir(output_dir)) > 0: | |
logger.warning( | |
f"Checkpoint destination directory {output_dir} already exists and is non-empty." | |
"Saving will proceed but saved results may be invalid." | |
) | |
staging_output_dir = output_dir | |
else: | |
staging_output_dir = os.path.join(run_dir, f"tmp-{checkpoint_folder}") | |
self.save_model(staging_output_dir, _internal_call=True) | |
if not self.args.save_only_model: | |
# Save optimizer and scheduler | |
self._save_optimizer_and_scheduler(staging_output_dir) | |
# Save RNG state | |
self._save_rng_state(staging_output_dir) | |
# Determine the new best metric / best model checkpoint | |
if metrics is not None and self.args.metric_for_best_model is not None: | |
metric_to_check = self.args.metric_for_best_model | |
if not metric_to_check.startswith("eval_"): | |
metric_to_check = f"eval_{metric_to_check}" | |
metric_value = metrics[metric_to_check] | |
operator = np.greater if self.args.greater_is_better else np.less | |
if ( | |
self.state.best_metric is None | |
or self.state.best_model_checkpoint is None | |
or operator(metric_value, self.state.best_metric) | |
): | |
self.state.best_metric = metric_value | |
self.state.best_model_checkpoint = staging_output_dir | |
# Save the Trainer state | |
if self.args.should_save: | |
self.state.save_to_json(os.path.join(staging_output_dir, TRAINER_STATE_NAME)) | |
if self.args.push_to_hub: | |
self._push_from_checkpoint(staging_output_dir) | |
torch.distributed.barrier() | |
if staging_output_dir != output_dir: | |
with self.args.main_process_first( | |
desc="Renaming model checkpoint folder to true location", local=self.args.save_on_each_node | |
): | |
if os.path.exists(staging_output_dir): | |
os.rename(staging_output_dir, output_dir) | |
# Maybe delete some older checkpoints. | |
if self.args.should_save: | |
# Solely rely on numerical checkpoint id for rotation. | |
# mtime is not reliable especially on some fuse fs in cloud environments. | |
self._rotate_checkpoints(use_mtime=False, output_dir=run_dir) | |
from typing import Any, Dict, Union | |
from torch import nn | |
from transformers.training_args import OptimizerNames | |
from transformers.utils import ( | |
is_sagemaker_mp_enabled, | |
is_torch_mlu_available, | |
is_torch_mps_available, | |
is_torch_musa_available, | |
is_torch_npu_available, | |
is_torch_xpu_available, | |
) | |
def training_step( | |
self, model: nn.Module, inputs: Dict[str, Union[torch.Tensor, Any]], num_items_in_batch=None | |
) -> torch.Tensor: | |
""" | |
Perform a training step on a batch of inputs. | |
Subclass and override to inject custom behavior. | |
Args: | |
model (`nn.Module`): | |
The model to train. | |
inputs (`Dict[str, Union[torch.Tensor, Any]]`): | |
The inputs and targets of the model. | |
The dictionary will be unpacked before being fed to the model. Most models expect the targets under the | |
argument `labels`. Check your model's documentation for all accepted arguments. | |
Return: | |
`torch.Tensor`: The tensor with training loss on this batch. | |
""" | |
model.train() | |
if hasattr(self.optimizer, "train") and callable(self.optimizer.train): | |
self.optimizer.train() | |
inputs = self._prepare_inputs(inputs) | |
if is_sagemaker_mp_enabled(): | |
loss_mb = smp_forward_backward(model, inputs, self.args.gradient_accumulation_steps) | |
return loss_mb.reduce_mean().detach().to(self.args.device) | |
with self.compute_loss_context_manager(): | |
loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch) | |
del inputs | |
if ( | |
self.args.torch_empty_cache_steps is not None | |
and self.state.global_step % self.args.torch_empty_cache_steps == 0 | |
): | |
if is_torch_xpu_available(): | |
torch.xpu.empty_cache() | |
elif is_torch_mlu_available(): | |
torch.mlu.empty_cache() | |
elif is_torch_musa_available(): | |
torch.musa.empty_cache() | |
elif is_torch_npu_available(): | |
torch.npu.empty_cache() | |
elif is_torch_mps_available(min_version="2.0"): | |
torch.mps.empty_cache() | |
else: | |
torch.cuda.empty_cache() | |
kwargs = {} | |
# For LOMO optimizers you need to explicitly use the learnign rate | |
if self.args.optim in [OptimizerNames.LOMO, OptimizerNames.ADALOMO]: | |
kwargs["learning_rate"] = self._get_learning_rate() | |
if self.args.n_gpu > 1: | |
loss = loss.mean() # mean() to average on multi-gpu parallel training | |
if self.use_apex: | |
with amp.scale_loss(loss, self.optimizer) as scaled_loss: | |
scaled_loss.backward() | |
else: | |
if num_items_in_batch is not None: | |
if self.compute_loss_func or self.model_accepts_loss_kwargs: | |
loss *= self.args.gradient_accumulation_steps | |
# Average tokens across devices is orthogonal to gradient accumulation | |
loss *= self.args.world_size | |
self.accelerator.backward(loss, **kwargs) | |
return loss.detach() / self.args.gradient_accumulation_steps | |
def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None): | |
""" | |
How the loss is computed by Trainer. By default, all models return the loss in the first element. | |
Subclass and override for custom behavior. | |
""" | |
if (self.label_smoother is not None or self.compute_loss_func is not None) and "labels" in inputs: | |
labels = inputs.pop("labels") | |
else: | |
labels = None | |
if num_items_in_batch is not None: | |
num_items_in_batch_tensor = torch.tensor(num_items_in_batch, device=self.args.device) | |
num_items_in_batch = int(self.accelerator.gather(num_items_in_batch_tensor).sum().cpu()) | |
if self.model_accepts_loss_kwargs: | |
loss_kwargs = {} | |
if num_items_in_batch is not None: | |
loss_kwargs["num_items_in_batch"] = num_items_in_batch | |
inputs = {**inputs, **loss_kwargs} | |
outputs = model(**inputs) | |
# Save past state if it exists | |
# TODO: this needs to be fixed and made cleaner later. | |
if self.args.past_index >= 0: | |
self._past = outputs[self.args.past_index] | |
if labels is not None: | |
unwrapped_model = self.accelerator.unwrap_model(model) | |
if _is_peft_model(unwrapped_model): | |
model_name = unwrapped_model.base_model.model._get_name() | |
else: | |
model_name = unwrapped_model._get_name() | |
# User-defined compute_loss function | |
if self.compute_loss_func is not None: | |
loss = self.compute_loss_func(outputs, labels, num_items_in_batch=num_items_in_batch) | |
elif model_name in MODEL_FOR_CAUSAL_LM_MAPPING_NAMES.values(): | |
loss = self.label_smoother(outputs, labels, shift_labels=True) | |
else: | |
loss = self.label_smoother(outputs, labels) | |
else: | |
if isinstance(outputs, dict) and "loss" not in outputs: | |
raise ValueError( | |
"The model did not return a loss from the inputs, only the following keys: " | |
f"{','.join(outputs.keys())}. For reference, the inputs it received are {','.join(inputs.keys())}." | |
) | |
# We don't use .loss here since the model may return tuples instead of ModelOutput. | |
loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0] | |
return (loss, outputs) if return_outputs else loss | |