Spaces:
Running
Running
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Fine-tuning script for DeepSeek-R1-Distill-Qwen-14B-bnb-4bit using unsloth | |
RESEARCH TRAINING PHASE ONLY - No output generation | |
WORKS WITH PRE-TOKENIZED DATASET - No re-tokenization | |
""" | |
import os | |
import json | |
import logging | |
import argparse | |
import numpy as np | |
from dotenv import load_dotenv | |
import torch | |
from datasets import load_dataset | |
import transformers | |
from transformers import AutoTokenizer, TrainingArguments, Trainer, AutoModelForCausalLM | |
from transformers.data.data_collator import DataCollatorMixin | |
from peft import LoraConfig | |
from unsloth import FastLanguageModel | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler("training.log") | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Default dataset path - use the correct path with username | |
DEFAULT_DATASET = "George-API/phi4-cognitive-dataset" | |
def load_config(config_path): | |
"""Load the transformers config from JSON file""" | |
logger.info(f"Loading config from {config_path}") | |
with open(config_path, 'r') as f: | |
config = json.load(f) | |
return config | |
def load_and_prepare_dataset(dataset_name, config): | |
""" | |
Load and prepare the dataset for fine-tuning. | |
Sort entries by prompt_number as required. | |
NO TOKENIZATION - DATASET IS ALREADY TOKENIZED | |
""" | |
# Use the default dataset path if no specific path is provided | |
if dataset_name == "phi4-cognitive-dataset": | |
dataset_name = DEFAULT_DATASET | |
logger.info(f"Loading dataset: {dataset_name}") | |
try: | |
# Load dataset | |
dataset = load_dataset(dataset_name) | |
# Extract the split we want to use (usually 'train') | |
if 'train' in dataset: | |
dataset = dataset['train'] | |
# Get the dataset config | |
dataset_config = config.get("dataset_config", {}) | |
sort_field = dataset_config.get("sort_by_field", "prompt_number") | |
sort_direction = dataset_config.get("sort_direction", "ascending") | |
# Sort the dataset by prompt_number | |
logger.info(f"Sorting dataset by {sort_field} in {sort_direction} order") | |
if sort_direction == "ascending": | |
dataset = dataset.sort(sort_field) | |
else: | |
dataset = dataset.sort(sort_field, reverse=True) | |
# Add shuffle with fixed seed if specified | |
if "shuffle_seed" in dataset_config: | |
shuffle_seed = dataset_config.get("shuffle_seed") | |
logger.info(f"Shuffling dataset with seed {shuffle_seed}") | |
dataset = dataset.shuffle(seed=shuffle_seed) | |
logger.info(f"Dataset loaded with {len(dataset)} entries") | |
return dataset | |
except Exception as e: | |
logger.error(f"Error loading dataset: {str(e)}") | |
logger.info("Available datasets in the Hub:") | |
# Print a more helpful error message | |
print(f"Failed to load dataset: {dataset_name}") | |
print(f"Make sure the dataset exists and is accessible.") | |
print(f"If it's a private dataset, ensure your HF_TOKEN has access to it.") | |
raise | |
# Data collator for pre-tokenized dataset | |
class PreTokenizedCollator(DataCollatorMixin): | |
""" | |
Data collator for pre-tokenized datasets. | |
Expects input_ids and labels already tokenized. | |
""" | |
def __init__(self, pad_token_id=0): | |
self.pad_token_id = pad_token_id | |
def __call__(self, features): | |
# Determine max length in this batch | |
batch_max_len = max(len(x["input_ids"]) for x in features) | |
# Initialize batch tensors | |
batch = { | |
"input_ids": torch.ones((len(features), batch_max_len), dtype=torch.long) * self.pad_token_id, | |
"attention_mask": torch.zeros((len(features), batch_max_len), dtype=torch.long), | |
"labels": torch.ones((len(features), batch_max_len), dtype=torch.long) * -100 # -100 is ignored in loss | |
} | |
# Fill batch tensors | |
for i, feature in enumerate(features): | |
input_ids = feature["input_ids"] | |
seq_len = len(input_ids) | |
# Convert to tensor if it's a list | |
if isinstance(input_ids, list): | |
input_ids = torch.tensor(input_ids, dtype=torch.long) | |
# Copy data to batch tensors | |
batch["input_ids"][i, :seq_len] = input_ids | |
batch["attention_mask"][i, :seq_len] = 1 | |
# If there are labels, use them, otherwise use input_ids | |
if "labels" in feature: | |
labels = feature["labels"] | |
if isinstance(labels, list): | |
labels = torch.tensor(labels, dtype=torch.long) | |
batch["labels"][i, :len(labels)] = labels | |
else: | |
batch["labels"][i, :seq_len] = input_ids | |
return batch | |
def create_training_marker(output_dir): | |
"""Create a marker file to indicate training is active""" | |
# Create in current directory for app.py to find | |
with open("TRAINING_ACTIVE", "w") as f: | |
f.write(f"Training active in {output_dir}") | |
# Also create in output directory | |
os.makedirs(output_dir, exist_ok=True) | |
with open(os.path.join(output_dir, "RESEARCH_TRAINING_ONLY"), "w") as f: | |
f.write("This model is for research training only. No interactive outputs.") | |
def remove_training_marker(): | |
"""Remove the training marker file""" | |
if os.path.exists("TRAINING_ACTIVE"): | |
os.remove("TRAINING_ACTIVE") | |
logger.info("Removed training active marker") | |
def load_model_safely(model_name, max_seq_length, dtype=None): | |
""" | |
Load the model in a safe way that works with Qwen models | |
by trying different loading strategies. | |
""" | |
try: | |
logger.info(f"Attempting to load model with unsloth optimizations: {model_name}") | |
# First try the standard unsloth loading | |
try: | |
# Try loading with unsloth but without the problematic parameter | |
model, tokenizer = FastLanguageModel.from_pretrained( | |
model_name=model_name, | |
max_seq_length=max_seq_length, | |
dtype=dtype, | |
load_in_4bit=True, # This should work for already quantized models | |
) | |
logger.info("Model loaded successfully with unsloth with 4-bit quantization") | |
return model, tokenizer | |
except TypeError as e: | |
# If we get a TypeError about unexpected keyword arguments | |
if "unexpected keyword argument" in str(e): | |
logger.warning(f"Unsloth loading error with 4-bit: {e}") | |
logger.info("Trying alternative loading method for Qwen model...") | |
# Try loading with different parameters for Qwen model | |
model, tokenizer = FastLanguageModel.from_pretrained( | |
model_name=model_name, | |
max_seq_length=max_seq_length, | |
dtype=dtype, | |
) | |
logger.info("Model loaded successfully with unsloth using alternative method") | |
return model, tokenizer | |
else: | |
# Re-raise if it's a different type error | |
raise | |
except Exception as e: | |
# Fallback to standard loading if unsloth methods fail | |
logger.warning(f"Unsloth loading failed: {e}") | |
logger.info("Falling back to standard Hugging Face loading...") | |
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
device_map="auto", | |
torch_dtype=dtype or torch.float16, | |
load_in_4bit=True, | |
) | |
logger.info("Model loaded successfully with standard HF loading") | |
return model, tokenizer | |
def train(config_path, dataset_name, output_dir): | |
"""Main training function - RESEARCH TRAINING PHASE ONLY""" | |
# Load environment variables | |
load_dotenv() | |
config = load_config(config_path) | |
# Extract configs | |
model_config = config.get("model_config", {}) | |
training_config = config.get("training_config", {}) | |
hardware_config = config.get("hardware_config", {}) | |
lora_config = config.get("lora_config", {}) | |
dataset_config = config.get("dataset_config", {}) | |
# Verify this is training phase only | |
training_phase_only = dataset_config.get("training_phase_only", True) | |
if not training_phase_only: | |
logger.warning("This script is meant for research training phase only") | |
logger.warning("Setting training_phase_only=True") | |
# Verify dataset is pre-tokenized | |
logger.info("IMPORTANT: Using pre-tokenized dataset - No tokenization will be performed") | |
# Set the output directory | |
output_dir = output_dir or training_config.get("output_dir", "fine_tuned_model") | |
os.makedirs(output_dir, exist_ok=True) | |
# Create training marker | |
create_training_marker(output_dir) | |
try: | |
# Print configuration summary | |
logger.info("RESEARCH TRAINING PHASE ACTIVE - No output generation") | |
logger.info("Configuration Summary:") | |
model_name = model_config.get("model_name_or_path") | |
logger.info(f"Model: {model_name}") | |
logger.info(f"Dataset: {dataset_name if dataset_name != 'phi4-cognitive-dataset' else DEFAULT_DATASET}") | |
logger.info(f"Output directory: {output_dir}") | |
logger.info("IMPORTANT: Using already 4-bit quantized model - not re-quantizing") | |
# Load and prepare the dataset | |
dataset = load_and_prepare_dataset(dataset_name, config) | |
# Initialize tokenizer (just for model initialization, not for tokenizing data) | |
logger.info("Loading tokenizer (for model initialization only, not for tokenizing data)") | |
tokenizer = AutoTokenizer.from_pretrained( | |
model_name, | |
trust_remote_code=True | |
) | |
tokenizer.pad_token = tokenizer.eos_token | |
# Initialize model with unsloth | |
logger.info("Initializing model with unsloth (preserving 4-bit quantization)") | |
max_seq_length = training_config.get("max_seq_length", 2048) | |
# Create LoRA config | |
peft_config = LoraConfig( | |
r=lora_config.get("r", 16), | |
lora_alpha=lora_config.get("lora_alpha", 32), | |
lora_dropout=lora_config.get("lora_dropout", 0.05), | |
bias=lora_config.get("bias", "none"), | |
target_modules=lora_config.get("target_modules", ["q_proj", "k_proj", "v_proj", "o_proj"]) | |
) | |
# Initialize model with our safe loading function | |
logger.info("Loading pre-quantized model safely") | |
dtype = torch.float16 if hardware_config.get("fp16", True) else None | |
model, tokenizer = load_model_safely(model_name, max_seq_length, dtype) | |
# Apply LoRA | |
logger.info("Applying LoRA to model") | |
model = FastLanguageModel.get_peft_model( | |
model, | |
peft_config=peft_config, | |
tokenizer=tokenizer, | |
use_gradient_checkpointing=hardware_config.get("gradient_checkpointing", True) | |
) | |
# No need to format the dataset - it's already pre-tokenized | |
logger.info("Using pre-tokenized dataset - skipping tokenization step") | |
training_dataset = dataset | |
# Configure wandb if API key is available | |
reports = ["tensorboard"] | |
if os.getenv("WANDB_API_KEY"): | |
reports.append("wandb") | |
logger.info("Wandb API key found, enabling wandb reporting") | |
else: | |
logger.info("No Wandb API key found, using tensorboard only") | |
# Set up training arguments | |
training_args = TrainingArguments( | |
output_dir=output_dir, | |
num_train_epochs=training_config.get("num_train_epochs", 3), | |
per_device_train_batch_size=training_config.get("per_device_train_batch_size", 2), | |
gradient_accumulation_steps=training_config.get("gradient_accumulation_steps", 4), | |
learning_rate=training_config.get("learning_rate", 2e-5), | |
lr_scheduler_type=training_config.get("lr_scheduler_type", "cosine"), | |
warmup_ratio=training_config.get("warmup_ratio", 0.03), | |
weight_decay=training_config.get("weight_decay", 0.01), | |
optim=training_config.get("optim", "adamw_torch"), | |
logging_steps=training_config.get("logging_steps", 10), | |
save_steps=training_config.get("save_steps", 200), | |
save_total_limit=training_config.get("save_total_limit", 3), | |
fp16=hardware_config.get("fp16", True), | |
bf16=hardware_config.get("bf16", False), | |
max_grad_norm=training_config.get("max_grad_norm", 0.3), | |
report_to=reports, | |
logging_first_step=training_config.get("logging_first_step", True), | |
disable_tqdm=training_config.get("disable_tqdm", False) | |
) | |
# Create trainer with pre-tokenized collator | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=training_dataset, | |
data_collator=PreTokenizedCollator(pad_token_id=tokenizer.pad_token_id), | |
) | |
# Start training | |
logger.info("Starting training - RESEARCH PHASE ONLY") | |
trainer.train() | |
# Save the model | |
logger.info(f"Saving model to {output_dir}") | |
trainer.save_model(output_dir) | |
# Save LoRA adapter separately for easier deployment | |
lora_output_dir = os.path.join(output_dir, "lora_adapter") | |
model.save_pretrained(lora_output_dir) | |
logger.info(f"Saved LoRA adapter to {lora_output_dir}") | |
# Save tokenizer for completeness | |
tokenizer_output_dir = os.path.join(output_dir, "tokenizer") | |
tokenizer.save_pretrained(tokenizer_output_dir) | |
logger.info(f"Saved tokenizer to {tokenizer_output_dir}") | |
# Copy config file for reference | |
with open(os.path.join(output_dir, "training_config.json"), "w") as f: | |
json.dump(config, f, indent=2) | |
logger.info("Training complete - RESEARCH PHASE ONLY") | |
return output_dir | |
finally: | |
# Always remove the training marker when done | |
remove_training_marker() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Fine-tune Unsloth/DeepSeek-R1-Distill-Qwen-14B-4bit model (RESEARCH ONLY)") | |
parser.add_argument("--config", type=str, default="transformers_config.json", | |
help="Path to the transformers config JSON file") | |
parser.add_argument("--dataset", type=str, default="phi4-cognitive-dataset", | |
help="Dataset name or path") | |
parser.add_argument("--output_dir", type=str, default=None, | |
help="Output directory for the fine-tuned model") | |
args = parser.parse_args() | |
# Run training - Research phase only | |
try: | |
output_path = train(args.config, args.dataset, args.output_dir) | |
print(f"Research training completed. Model saved to: {output_path}") | |
except Exception as e: | |
logger.error(f"Training failed: {str(e)}") | |
remove_training_marker() # Clean up marker if training fails | |
raise |