trl-4-dnd / tests /slow /test_grpo_slow.py
vishaljoshi24's picture
Initial Commit
a080fe0
# Copyright 2020-2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import os
import tempfile
import unittest
import warnings
import numpy as np
import pytest
import torch
from accelerate.utils.memory import release_memory
from datasets import Dataset, Features, Image, Value, load_dataset
from parameterized import parameterized
from transformers import (
AutoModelForCausalLM,
AutoModelForImageTextToText,
AutoProcessor,
AutoTokenizer,
BitsAndBytesConfig,
)
from transformers.testing_utils import (
backend_empty_cache,
require_bitsandbytes,
require_flash_attn,
require_liger_kernel,
require_peft,
require_torch_accelerator,
torch_device,
)
from transformers.utils import is_peft_available
from trl import GRPOConfig, GRPOTrainer
from trl.trainer.utils import get_kbit_device_map
from ..testing_utils import require_vllm
from .testing_constants import MODELS_TO_TEST
if is_peft_available():
from peft import LoraConfig, PeftModel
@pytest.mark.slow
@require_torch_accelerator
class GRPOTrainerSlowTester(unittest.TestCase):
def setUp(self):
self.train_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train")
self.eval_dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="test")
self.max_length = 128
def tearDown(self):
gc.collect()
backend_empty_cache(torch_device)
gc.collect()
@parameterized.expand(MODELS_TO_TEST)
@require_liger_kernel
def test_training_with_liger_grpo_loss(self, model_name):
with tempfile.TemporaryDirectory() as tmp_dir:
training_args = GRPOConfig(
output_dir=tmp_dir,
per_device_train_batch_size=3,
num_generations=3,
use_liger_loss=True,
max_completion_length=self.max_length,
report_to="none",
logging_strategy="no",
)
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token if tokenizer.pad_token is None else tokenizer.pad_token
trainer = GRPOTrainer(
model=model,
reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5",
args=training_args,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
processing_class=tokenizer,
)
from liger_kernel.chunked_loss import LigerFusedLinearGRPOLoss
assert isinstance(trainer.liger_grpo_loss, LigerFusedLinearGRPOLoss)
previous_trainable_params = {n: param.clone() for n, param in model.named_parameters()}
trainer.train()
for n, param in previous_trainable_params.items():
new_param = model.get_parameter(n)
self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.")
release_memory(model, trainer)
@parameterized.expand(MODELS_TO_TEST)
@require_liger_kernel
@require_peft
def test_training_with_liger_grpo_loss_and_peft(self, model_name):
from peft import LoraConfig, TaskType
with tempfile.TemporaryDirectory() as tmp_dir:
training_args = GRPOConfig(
output_dir=tmp_dir,
per_device_train_batch_size=3,
num_generations=3,
use_liger_loss=True,
max_completion_length=self.max_length,
report_to="none",
logging_strategy="no",
)
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token if tokenizer.pad_token is None else tokenizer.pad_token
# Configure PEFT with LoRA
peft_config = LoraConfig(
task_type=TaskType.CAUSAL_LM,
inference_mode=False,
r=8,
lora_alpha=32,
lora_dropout=0.1,
target_modules=["q_proj", "v_proj"],
)
trainer = GRPOTrainer(
model=model,
reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5",
args=training_args,
train_dataset=self.train_dataset,
eval_dataset=self.eval_dataset,
processing_class=tokenizer,
peft_config=peft_config,
)
from liger_kernel.chunked_loss import LigerFusedLinearGRPOLoss
assert isinstance(trainer.liger_grpo_loss, LigerFusedLinearGRPOLoss)
# Verify PEFT adapter is properly initialized
from peft import PeftModel
self.assertTrue(isinstance(trainer.model, PeftModel), "Model should be wrapped with PEFT")
# Store adapter weights before training
previous_trainable_params = {
n: param.clone() for n, param in trainer.model.named_parameters() if param.requires_grad
}
self.assertTrue(len(previous_trainable_params) > 0, "No trainable parameters found in PEFT model")
trainer.train()
# Verify adapter weights have changed after training
for n, param in previous_trainable_params.items():
new_param = trainer.model.get_parameter(n)
self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.")
release_memory(model, trainer)
@parameterized.expand(MODELS_TO_TEST)
def test_training_with_transformers_paged(self, model_name):
"""Test that training works with transformers paged implementation (requires GPU)."""
with tempfile.TemporaryDirectory() as tmp_dir:
training_args = GRPOConfig(
output_dir=tmp_dir,
learning_rate=0.1, # increase the learning rate to speed up the test
per_device_train_batch_size=3, # reduce the batch size to reduce memory usage
num_generations=3, # reduce the number of generations to reduce memory usage
max_completion_length=8, # reduce the completion length to reduce memory usage
use_transformers_paged=True, # Enable transformers paged implementation
report_to="none",
logging_strategy="no",
)
model = AutoModelForCausalLM.from_pretrained(model_name)
trainer = GRPOTrainer(
model=model,
reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5",
args=training_args,
train_dataset=self.train_dataset,
)
previous_trainable_params = {n: param.clone() for n, param in model.named_parameters()}
trainer.train()
self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"])
# Check that the params have changed
for n, param in previous_trainable_params.items():
new_param = model.get_parameter(n)
self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.")
release_memory(model, trainer)
@require_flash_attn
@require_bitsandbytes
@require_peft
@parameterized.expand(
[
("HuggingFaceTB/SmolVLM-Instruct",), # Only test the smaller model to avoid OOM
]
)
def test_vlm_training(self, model_name):
"""
Test VLM training with aggressive memory optimization.
This test uses multiple memory reduction techniques:
- 4-bit quantization with double quantization
- LoRA with very low rank (r=4)
- Minimal batch size (1) with gradient accumulation
- Small images (64x64 instead of 224x224)
- Short sequences (max_completion_length=8)
- Only 4 training samples
- Only 1 training step
- Gradient checkpointing and bfloat16
"""
# Create processor once outside the data generator
processor = AutoProcessor.from_pretrained(model_name, use_fast=True, padding_side="left")
conversation = [
{
"role": "user",
"content": [
{"type": "image"},
{"type": "text", "text": "What is in the image?"},
],
},
]
prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
def data_gen(num_samples):
for _ in range(num_samples):
yield {
"prompt": prompt,
"image": np.random.uniform(low=0.0, high=255.0, size=(64, 64, 3)).astype(
np.uint8
), # Much smaller images
}
dataset = Dataset.from_generator(
data_gen, gen_kwargs={"num_samples": 4}, features=Features(image=Image(), prompt=Value(dtype="string"))
)
# reduce memory requirements as much as possible
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype="bfloat16",
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_storage="bfloat16",
)
model = AutoModelForImageTextToText.from_pretrained(
model_name,
attn_implementation="flash_attention_2",
torch_dtype="bfloat16",
device_map=get_kbit_device_map(),
quantization_config=quantization_config,
)
def reward_func(prompts, completions, **kwargs):
# simple nonsensical reward
return [-((len(c) - 25) ** 2) + 100 for c in completions]
with tempfile.TemporaryDirectory() as tmp_dir:
training_args = GRPOConfig(
output_dir=tmp_dir,
learning_rate=0.1,
per_device_train_batch_size=1, # Minimal batch size
gradient_accumulation_steps=2, # Maintain effective batch size
num_generations=2,
max_completion_length=8, # Much shorter completions
max_prompt_length=None, # Don't limit prompt length for VLM
bf16=True, # Use bfloat16 precision
max_steps=1, # Only do 1 training step to save time and memory
report_to="none",
logging_strategy="no",
)
lora_config = LoraConfig(
task_type="CAUSAL_LM",
r=4, # Much lower rank for minimal memory
lora_alpha=8, # Reduced alpha proportionally
lora_dropout=0.1,
target_modules=["q_proj", "v_proj"], # Minimal target modules
# For VLM models, we typically want to freeze the vision encoder
# and only adapt the language model parameters
modules_to_save=None,
)
try:
trainer = GRPOTrainer(
model=model,
processing_class=processor,
reward_funcs=[reward_func],
args=training_args,
train_dataset=dataset,
peft_config=lora_config,
)
self.assertIsInstance(trainer.model, PeftModel)
previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()}
trainer.train()
self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"])
# Check that LoRA parameters have changed
# For VLM models, we're more permissive about which parameters can change
lora_params_changed = False
for n, param in previous_trainable_params.items():
new_param = trainer.model.get_parameter(n)
if "lora" in n.lower(): # LoRA parameters should change
if not torch.equal(param, new_param):
lora_params_changed = True
# At least some LoRA parameters should have changed during training
self.assertTrue(lora_params_changed, "No LoRA parameters were updated during training.")
except torch.OutOfMemoryError as e:
self.skipTest(f"Skipping VLM training test due to insufficient GPU memory: {e}")
except Exception as e:
# Check for other memory-related errors
if any(keyword in str(e).lower() for keyword in ["memory", "cuda", "out of memory", "insufficient"]):
self.skipTest(f"Skipping VLM training test due to hardware constraints: {e}")
else:
raise
release_memory(model, trainer)
@require_vllm
@require_bitsandbytes
@require_peft
def test_vlm_processor_vllm_colocate_mode(self):
"""
Test that VLM processors work with vLLM in colocate mode.
This test uses multiple memory optimization techniques to ensure it runs on limited hardware:
- LoRA (Low-Rank Adaptation) with minimal rank (r=4)
- 4-bit quantization with BitsAndBytesConfig
- Gradient checkpointing
- bfloat16 precision
- Minimal batch sizes and sequence lengths
- Very low GPU memory utilization (5%)
"""
dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train")
with tempfile.TemporaryDirectory() as tmp_dir:
config = GRPOConfig(
output_dir=tmp_dir,
per_device_train_batch_size=1, # Minimal batch size
gradient_accumulation_steps=2, # Make effective batch size 2, divisible by num_generations
num_generations=2,
max_completion_length=4, # Very short completions to reduce memory
max_prompt_length=32, # Very short prompts to reduce memory
use_vllm=True, # Enable vLLM
vllm_mode="colocate", # Use colocate mode to avoid server dependency
vllm_gpu_memory_utilization=0.05, # Use minimal GPU memory (5%)
gradient_checkpointing=True, # Enable gradient checkpointing to save memory
bf16=True, # Use bfloat16 to reduce memory
report_to="none",
logging_strategy="no",
)
# Create a VLM processor
processor = AutoProcessor.from_pretrained(
"HuggingFaceTB/SmolVLM-Instruct", use_fast=True, padding_side="left"
)
# Verify processor has both required attributes for VLM detection
self.assertTrue(hasattr(processor, "tokenizer"))
self.assertTrue(hasattr(processor, "image_processor"))
def dummy_reward_func(completions, **kwargs):
return [1.0] * len(completions)
# Use LoRA configuration for memory efficiency
lora_config = LoraConfig(
r=4, # Very low rank for minimal memory
lora_alpha=8,
target_modules=["q_proj", "v_proj"], # Minimal target modules
lora_dropout=0.1,
bias="none",
task_type="CAUSAL_LM",
)
# Use 4-bit quantization for further memory reduction
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
original_env = {}
required_env_vars = {
"RANK": "0",
"LOCAL_RANK": "0",
"WORLD_SIZE": "1",
"LOCAL_WORLD_SIZE": "1",
"MASTER_ADDR": "localhost",
"MASTER_PORT": "12355",
}
for key, value in required_env_vars.items():
original_env[key] = os.environ.get(key)
os.environ[key] = value
try:
# Test VLM processor with vLLM colocate mode
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
try:
# Load model with quantization for memory efficiency
model = AutoModelForCausalLM.from_pretrained(
"trl-internal-testing/tiny-Qwen2ForCausalLM-2.5",
quantization_config=quantization_config,
torch_dtype=torch.bfloat16,
)
trainer = GRPOTrainer(
model=model,
reward_funcs=dummy_reward_func,
args=config,
train_dataset=dataset,
processing_class=processor, # VLM processor
peft_config=lora_config, # Use LoRA for memory efficiency
)
# Should detect VLM processor correctly and allow vLLM
self.assertTrue(trainer.use_vllm, "vLLM should be enabled for VLM processors in colocate mode")
self.assertEqual(trainer.vllm_mode, "colocate", "Should use colocate mode")
# Check if signature columns were set properly
if trainer._signature_columns is not None:
# Should include 'image' in signature columns for VLM processors
self.assertIn(
"image",
trainer._signature_columns,
"Should include 'image' in signature columns for VLM",
)
# Should not emit any warnings about VLM incompatibility
incompatibility_warnings = [
str(w_item.message)
for w_item in w
if "does not support VLMs" in str(w_item.message)
or "not compatible" in str(w_item.message).lower()
]
self.assertEqual(
len(incompatibility_warnings),
0,
f"Should not emit VLM incompatibility warnings, but got: {incompatibility_warnings}",
)
# Test passes if we get this far without exceptions
except Exception as e:
# If vLLM fails to initialize due to hardware constraints or other issues, that's expected
if any(
keyword in str(e).lower()
for keyword in [
"outofmemoryerror",
"cuda",
"memory",
"insufficient",
"no such device",
"free memory",
"gpu memory utilization",
"decrease gpu memory",
]
):
self.skipTest(f"Skipping vLLM colocate test due to hardware constraints: {e}")
elif "KeyError" in str(e) and "RANK" in str(e):
self.skipTest(f"Skipping vLLM colocate test due to environment setup issues: {e}")
elif "ValueError" in str(e) and "memory" in str(e).lower():
self.skipTest(f"Skipping vLLM colocate test due to memory constraints: {e}")
else:
raise
finally:
# Restore original environment variables
for key, original_value in original_env.items():
if original_value is None:
os.environ.pop(key, None)
else:
os.environ[key] = original_value
release_memory(model, trainer)
@require_vllm
def test_training_vllm(self):
"""Test that training works with vLLM for generation."""
dataset = load_dataset("trl-internal-testing/zen", "standard_prompt_only", split="train")
with tempfile.TemporaryDirectory() as tmp_dir:
training_args = GRPOConfig(
output_dir=tmp_dir,
learning_rate=0.1, # increase the learning rate to speed up the test
per_device_train_batch_size=3, # reduce the batch size to reduce memory usage
num_generations=3, # reduce the number of generations to reduce memory usage
max_completion_length=8, # reduce the completion length to reduce memory usage
report_to="none",
logging_strategy="no",
use_vllm=True,
)
try:
trainer = GRPOTrainer(
model="Qwen/Qwen2.5-0.5B-Instruct", # tiny models are too small for vLLM
reward_funcs="trl-internal-testing/tiny-Qwen2ForSequenceClassification-2.5",
args=training_args,
train_dataset=dataset,
)
previous_trainable_params = {n: param.clone() for n, param in trainer.model.named_parameters()}
trainer.train()
self.assertIsNotNone(trainer.state.log_history[-1]["train_loss"])
# Check that the params have changed
for n, param in previous_trainable_params.items():
new_param = trainer.model.get_parameter(n)
self.assertFalse(torch.equal(param, new_param), f"Parameter {n} has not changed.")
except Exception as e:
# If vLLM fails to initialize due to hardware constraints or other issues, that's expected
if any(
keyword in str(e).lower()
for keyword in [
"outofmemoryerror",
"cuda",
"memory",
"insufficient",
"no such device",
"free memory",
"gpu memory utilization",
"decrease gpu memory",
]
):
self.skipTest(f"Skipping vLLM training test due to hardware constraints: {e}")
elif "KeyError" in str(e) and "RANK" in str(e):
self.skipTest(f"Skipping vLLM training test due to environment setup issues: {e}")
elif "ValueError" in str(e) and "memory" in str(e).lower():
self.skipTest(f"Skipping vLLM training test due to memory constraints: {e}")
else:
raise
release_memory(trainer.model, trainer)